MySQL · 源码分析 · 一条insert语句的执行过程

本文只分析了insert语句执行的主路径,和路径上部分关键函数,很多细节没有深入,留给读者继续分析

create table t1(id int);

insert into t1 values(1)

略过建立连接,从 mysql_parse() 开始分析

void mysql_parse(THD *thd, char *rawbuf, uint length,
                 Parser_state *parser_state)
{
  /* ...... */

  /* 检查query_cache,如果结果存在于cache中,直接返回 */
  if (query_cache_send_result_to_client(thd, rawbuf, length) <= 0)
  {
     LEX *lex= thd->lex;

 	 /* 解析语句 */
     bool err= parse_sql(thd, parser_state, NULL);

	 /* 整理语句格式,记录 general log */
	 /* ...... */
	 		  /* 执行语句 */
             error= mysql_execute_command(thd);
             /* 提交或回滚没结束的事务(事务可能在mysql_execute_command中提交,用trx_end_by_hint标记事务是否已经提交) */
             if (!thd->trx_end_by_hint)
             {
               if (!error && lex->ci_on_success)
                 trans_commit(thd);

               if (error && lex->rb_on_fail)
                 trans_rollback(thd);
             }

进入 mysql_execute_command()

  /*  */
  /* ...... */

  case SQLCOM_INSERT:
  {  

    /* 检查权限 */
    if ((res= insert_precheck(thd, all_tables)))
      break;

    /* 执行insert */
    res= mysql_insert(thd, all_tables, lex->field_list, lex->many_values,
                      lex->update_list, lex->value_list,
                      lex->duplicates, lex->ignore);

	/* 提交或者回滚事务 */
    if (!res)
    {
      trans_commit_stmt(thd);
      trans_commit(thd);
      thd->trx_end_by_hint= TRUE;
    }
    else if (res)
    {
      trans_rollback_stmt(thd);
      trans_rollback(thd);
      thd->trx_end_by_hint= TRUE;
    }

进入 mysql_insert()

bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields, /* insert 的字段 */
                  List<List_item> &values_list, /* insert 的值 */
                  List<Item> &update_fields,
                  List<Item> &update_values,
                  enum_duplicates duplic,
                  bool ignore)
{
  /*对每条记录调用 write_record */
  while ((values= its++))
  {
	if (lock_type == TL_WRITE_DELAYED)
    {
      LEX_STRING const st_query = { query, thd->query_length() };
      DEBUG_SYNC(thd, "before_write_delayed");
      /* insert delay */
      error= write_delayed(thd, table, st_query, log_on, &info);
      DEBUG_SYNC(thd, "after_write_delayed");
      query=0;
    }
    else
      /* normal insert */
      error= write_record(thd, table, &info, &update);
  }

  /*
    这里还有
    thd->binlog_query()写binlog
    my_ok()返回ok报文,ok报文中包含影响行数
  */

进入 write_record

/*
  COPY_INFO *info 用来处理唯一键冲突,记录影响行数
  COPY_INFO *update 处理 INSERT ON DUPLICATE KEY UPDATE 相关信息
*/
int write_record(THD *thd, TABLE *table, COPY_INFO *info, COPY_INFO *update)
{
  if (duplicate_handling == DUP_REPLACE || duplicate_handling == DUP_UPDATE)
  {
    /* 处理 INSERT ON DUPLICATE KEY UPDATE 等复杂情况 */
  }
  /* 调用存储引擎的接口 */
  else if ((error=table->file->ha_write_row(table->record[0])))
  {
    DEBUG_SYNC(thd, "write_row_noreplace");
    if (!ignore_errors ||
        table->file->is_fatal_error(error, HA_CHECK_DUP))
      goto err;
    table->file->restore_auto_increment(prev_insert_id);
    goto ok_or_after_trg_err;
  }
}

进入ha_write_row、write_row

/* handler 是各个存储引擎的基类,这里我们使用InnoDB引擎*/
int handler::ha_write_row(uchar *buf)
{
  /* 指定log_event类型*/
  Log_func *log_func= Write_rows_log_event::binlog_row_logging_function;
  error= write_row(buf);
}

进入引擎层,这里是innodb引擎,handler对应ha_innobase
插入的表信息保存在handler中

int
ha_innobase::write_row(
/*===================*/
        uchar*  record) /*!< in: a row in MySQL format */
{
		error = row_insert_for_mysql((byte*) record, prebuilt);
}
UNIV_INTERN
dberr_t
row_insert_for_mysql(
/*=================*/
        byte*           mysql_rec,      /*!< in: row in the MySQL format */
        row_prebuilt_t* prebuilt)       /*!< in: prebuilt struct in MySQL
                                        handle */
{
		/*记录格式从MySQL转换成InnoDB*/
		row_mysql_convert_row_to_innobase(node->row, prebuilt, mysql_rec);

        thr->run_node = node;
        thr->prev_node = node;

		/*插入记录*/
        row_ins_step(thr);
}
UNIV_INTERN
que_thr_t*
row_ins_step(
/*=========*/
        que_thr_t*      thr)    /*!< in: query thread */
{
		/*给表加IX锁*/
		err = lock_table(0, node->table, LOCK_IX, thr);

		/*插入记录*/
		err = row_ins(node, thr);
}

InnoDB表是基于B+树的索引组织表

如果InnoDB表没有主键和唯一键,需要分配隐含的row_id组织聚集索引

row_id分配逻辑在row_ins中,这里不详细展开

static __attribute__((nonnull, warn_unused_result))
dberr_t
row_ins(
/*====*/
        ins_node_t*     node,   /*!< in: row insert node */
        que_thr_t*      thr)    /*!< in: query thread */
{
		if (node->state == INS_NODE_ALLOC_ROW_ID) {
				/*若innodb表没有主键和唯一键,用row_id组织索引*/
        		row_ins_alloc_row_id_step(node);

				/*获取row_id的索引*/
                node->index = dict_table_get_first_index(node->table);
                node->entry = UT_LIST_GET_FIRST(node->entry_list);
		}

		/*遍历所有索引,向每个索引中插入记录*/
		while (node->index != NULL) {
                if (node->index->type != DICT_FTS) {
                        /* 向索引中插入记录 */
                        err = row_ins_index_entry_step(node, thr);

                        if (err != DB_SUCCESS) {

                                return(err);
                        }
                }                                                                                                                                                                                           

				/*获取下一个索引*/
                node->index = dict_table_get_next_index(node->index);
                node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);

                }
        }
}

插入单个索引项

static __attribute__((nonnull, warn_unused_result))
dberr_t
row_ins_index_entry_step(
/*=====================*/
        ins_node_t*     node,   /*!< in: row insert node */
        que_thr_t*      thr)    /*!< in: query thread */
{
        dberr_t err;

        /*给索引项赋值*/
        row_ins_index_entry_set_vals(node->index, node->entry, node->row);

		/*插入索引项*/
        err = row_ins_index_entry(node->index, node->entry, thr);

        return(err);
}
static
dberr_t
row_ins_index_entry(
/*================*/
        dict_index_t*   index,  /*!< in: index */
        dtuple_t*       entry,  /*!< in/out: index entry to insert */
        que_thr_t*      thr)    /*!< in: query thread */
{

        if (dict_index_is_clust(index)) {
        		/* 插入聚集索引 */
                return(row_ins_clust_index_entry(index, entry, thr, 0));
        } else {
        		/* 插入二级索引 */
                return(row_ins_sec_index_entry(index, entry, thr));
        }
}

row_ins_clust_index_entry 和 row_ins_sec_index_entry 函数结构类似,只分析插入聚集索引

UNIV_INTERN
dberr_t
row_ins_clust_index_entry(
/*======================*/
        dict_index_t*   index,  /*!< in: clustered index */
        dtuple_t*       entry,  /*!< in/out: index entry to insert */
        que_thr_t*      thr,    /*!< in: query thread */
        ulint           n_ext)  /*!< in: number of externally stored columns */
{
        if (UT_LIST_GET_FIRST(index->table->foreign_list)) {
                err = row_ins_check_foreign_constraints(
                        index->table, index, entry, thr);
                if (err != DB_SUCCESS) {
                        return(err);
                }
        }

        /* flush log,make checkpoint(如果需要) */
        log_free_check();

		/* 先尝试乐观插入,修改叶子节点 BTR_MODIFY_LEAF */
        err = row_ins_clust_index_entry_low(
                0, BTR_MODIFY_LEAF, index, n_uniq, entry, n_ext, thr,
                &page_no, &modify_clock);

        if (err != DB_FAIL) {
                DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
                return(err);
        }    

		/* flush log,make checkpoint(如果需要) */
        log_free_check();

		/* 乐观插入失败,尝试悲观插入 BTR_MODIFY_TREE */
        return(row_ins_clust_index_entry_low(
                        0, BTR_MODIFY_TREE, index, n_uniq, entry, n_ext, thr,
                        &page_no, &modify_clock));

row_ins_clust_index_entry_low 和 row_ins_sec_index_entry_low 函数结构类似,只分析插入聚集索引

UNIV_INTERN
dberr_t
row_ins_clust_index_entry_low(
/*==========================*/
        ulint           flags,  /*!< in: undo logging and locking flags */
        ulint           mode,   /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
                                depending on whether we wish optimistic or
                                pessimistic descent down the index tree */
        dict_index_t*   index,  /*!< in: clustered index */
        ulint           n_uniq, /*!< in: 0 or index->n_uniq */
        dtuple_t*       entry,  /*!< in/out: index entry to insert */
        ulint           n_ext,  /*!< in: number of externally stored columns */
        que_thr_t*      thr,    /*!< in: query thread */
        ulint*          page_no,/*!< *page_no and *modify_clock are used to decide
                                whether to call btr_cur_optimistic_insert() during
                                pessimistic descent down the index tree.
                                in: If this is optimistic descent, then *page_no
                                must be ULINT_UNDEFINED. If it is pessimistic
                                descent, *page_no must be the page_no to which an
                                optimistic insert was attempted last time
                                row_ins_index_entry_low() was called.
                                out: If this is the optimistic descent, *page_no is set
                                to the page_no to which an optimistic insert was
                                attempted. If it is pessimistic descent, this value is
                                not changed. */
        ullint*         modify_clock) /*!< in/out: *modify_clock == ULLINT_UNDEFINED
                                during optimistic descent, and the modify_clock
                                value for the page that was used for optimistic
                                insert during pessimistic descent */
{
		/* 将cursor移动到索引上待插入的位置 */
		btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, mode,
                                    &cursor, 0, __FILE__, __LINE__, &mtr);

                        /*根据不同的flag检查主键冲突*/
                        err = row_ins_duplicate_error_in_clust_online(
                                n_uniq, entry, &cursor,
                                &offsets, &offsets_heap);

                        err = row_ins_duplicate_error_in_clust(
                                flags, &cursor, entry, thr, &mtr);

		/*
		  如果要插入的索引项已存在,则把insert操作改为update操作
		  索引项已存在,且没有主键冲突,是因为之前的索引项对应的数据被标记为已删除
		  本次插入的数据和上次删除的一样,而索引项并未删除,所以变为update操作
		*/
        if (row_ins_must_modify_rec(&cursor)) {
                /* There is already an index entry with a long enough common
                prefix, we must convert the insert into a modify of an
                existing record */
                mem_heap_t*     entry_heap      = mem_heap_create(1024);

				/* 更新数据到存在的索引项 */
                err = row_ins_clust_index_entry_by_modify(
                        flags, mode, &cursor, &offsets, &offsets_heap,
                        entry_heap, &big_rec, entry, thr, &mtr);

                /*如果索引正在online_ddl,先记录insert*/
                if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
                        row_log_table_insert(rec, index, offsets);
                }

				/*提交mini transaction*/
                mtr_commit(&mtr);
                mem_heap_free(entry_heap);
        } else {
                rec_t*  insert_rec;

                if (mode != BTR_MODIFY_TREE) {
                		/*进行一次乐观插入*/
                        err = btr_cur_optimistic_insert(
                                flags, &cursor, &offsets, &offsets_heap,
                                entry, &insert_rec, &big_rec,
                                n_ext, thr, &mtr);
                } else {
                		/*
                		  如果buffer pool余量不足25%,插入失败,返回DB_LOCK_TABLE_FULL
                		  处理DB_LOCK_TABLE_FULL错误时,会回滚事务
                		  防止大事务的锁占满buffer pool(注释里写的)
                		*/
                        if (buf_LRU_buf_pool_running_out()) {

                                err = DB_LOCK_TABLE_FULL;
                                goto err_exit;
                        }

                        if (/*太长了,略*/) {
                        		 /*进行一次乐观插入*/
                                err = btr_cur_optimistic_insert(
                                        flags, &cursor,
                                        &offsets, &offsets_heap,
                                        entry, &insert_rec, &big_rec,
                                        n_ext, thr, &mtr);
                        } else {
                                err = DB_FAIL;
                        }

                        if (err == DB_FAIL) {
                        		 /*乐观插入失败,进行悲观插入*/
                                err = btr_cur_pessimistic_insert(
                                        flags, &cursor,
                                        &offsets, &offsets_heap,
                                        entry, &insert_rec, &big_rec,
                                        n_ext, thr, &mtr);
                        }
                }

}

btr_cur_optimistic_insert 和 btr_cur_pessimistic_insert 涉及B+树的操作,内部细节很多,以后再做分析

时间: 2025-01-26 16:23:25

MySQL · 源码分析 · 一条insert语句的执行过程的相关文章

TokuDB · 源码分析 · 一条query语句的执行过程

Mysql是基于代价cost来选择索引,如果一个表有好几个索引,optimizer会分别计算每个索引访问的代价,选择代价最小的索引进行访问,这个索引也被称为access path. Pickup index Mysql在执行query语句的时候会在server层计算每个可选索引的代价,并选择代价最小的索引作为访问路径(access path)去引擎读取数据. server层的handler类为引擎层提供一个框架来计算索引的代价. scan_time:计算全表扫描需要执行时间 records_in

MySQL · 源码分析 · InnoDB 异步IO工作流程

之前的一篇内核月报InnoDB IO子系统 中介绍了InnoDB IO子系统中包含的同步IO以及异步IO.本篇文章将从源码层面剖析一下InnoDB IO子系统中,数据页的同步IO以及异步IO请求的具体实现过程. 在MySQL5.6中,InnoDB的异步IO主要是用来处理预读以及对数据文件的写请求的.而对于正常的页面数据读取则是通过同步IO进行的.到底二者在代码层面上的实现过程有什么样的区别? 接下来我们将以Linux native io的执行过程为主线,对IO请求的执行过程进行梳理. 重点数据结

MySQL · 源码分析 · Innodb 引擎Redo日志存储格式简介

MySQL有多种日志.不同种类.不同目的的日志会记录在不同的日志文件中,它们可以帮助你找出mysqld内部发生的事情.比如错误日志:用来记录启动.运行或停止mysqld进程时出现的问题:查询日志:记录建立的客户端连接和执行的语句:二进制日志:记录所有更改数据的语句,主要用于逻辑复制:慢日志:记录所有执行时间超过long_query_time秒的所有查询或不使用索引的查询.而对MySQL中最常用的事务引擎innodb,redo日志是保证事务一致性非常重要的.本文结合MySQL版本5.6为分析源码介

MySQL · 源码分析 · 网络通信模块浅析

MySQL 网络通信浅析 MySQL的网络通信协议主要包含以下几个层次,从最上层的MySQL数据包协议层到最底层的socket传输: | THD | Protocol | NET | VIO | SOCKET 本文主要扫一下相关的代码,以下分析基于MySQL5.7. 创建会话 在MySQL5.7中对会话协议层的代码进行了大量的重构以优化性能,并使得代码更加可读.以下这幅图大概展示了几个相关的类关系(未包含诸如windows平台的相关类) 创建用户线程堆栈是从主线程开始的,监听客户端请求并创建处理

MySQL · 源码分析 · SHUTDOWN过程

ORACLE 中的SHUTDOWN MySQL SHUTDOWN LEVEL 暂时只有一种,源码中留了 LEVEL 的坑还没填 在此借用 Oracle 的 SHUTDOWN LEVEL 分析 Oracle SHUTDOWN LEVEL 共有四种:ABORT.IMMEDIATE.NORMAL.TRANSACTIONAL ABORT 立即结束所有SQL 回滚未提交事务 断开所有用户连接 下次启动实例时,需要recovery IMMEDIATE 允许正在运行的SQL执行完毕 回滚未提交事务 断开所有用

MySQL · 源码分析 · InnoDB LRU List刷脏改进之路

之前的一篇内核月报MySQL · 引擎特性 · InnoDB Buffer Pool 中对InnoDB Buffer pool的整体进行了详细的介绍.文章已经提到了LRU List以及刷脏的工作原理.本篇文章着重从MySQL 5.7源码层面对LRU List刷脏的工作原理,以及Percona针对MySQL LRU Flush的一些性能问题所做的改进,进行一下分析. 在MySQL中,如果当前数据库需要操作的数据集比Buffer pool中的空闲页面大的话,当前Buffer pool中的数据页就必须

MySQL · 源码分析 · Query Cache内部剖析

Query Cache背景 Query Cache在其他数据库里面也称为结果集缓存.顾名思义,它的目的是将SELECT语句与其返回结果缓存到Query Cache中,如果重复执行相同的SELECT语句的话,我们可以跳过MySQL的解析.优化.执行阶段,将SELECT的查询结果直接返回到客户端,加速SELECT语句的执行. Query Cache中的主要数据结构 Query_cache 对整个Query Cache进行管理,负责提供接口供Server调用. Query_cache_block Qu

MySQL · 源码分析 · MySQL 半同步复制数据一致性分析

简介 MySQL Replication为MySQL用户提供了高可用性和可扩展性解决方案.本文介绍了MySQL Replication的主要发展历程,然后通过三个参数rpl_semi_sync_master_wait_point.sync_binlog.sync_relay_log的配置简要分析了MySQL半同步的数据一致性. MySQL Replication的发展 在2000年,MySQL 3.23.15版本引入了Replication.Replication作为一种准实时同步方式,得到广泛

MySQL · 源码分析 · MySQL BINLOG半同步复制数据安全性分析

半同步复制(semisynchronous replication)MySQL使用广泛的数据复制方案,相比于MySQL内置的异步复制它保证了数据的安 全,本文从主机在Server层提交事务开始一直到主机确认收到备机回复进行一步步解析,来看MySQL的半同步复制是怎么保证数 据安全的.本文基于MySQL 5.6源码,为了简化本文只分析DML的核心的事务处理过程,并假定事务只涉及innodb存储引擎. MySQL的事务提交流程 在MySQL中事务的提交Server层最后会调用函数ha_commit_