MySQL · 源码分析 · Tokudb序列化和反序列化过程

序列化和写盘

Tokudb数据节点写盘主要是由后台线程异步完成的:

  • checkpoint线程:把cachetable(innodb术语buffer pool)中所有脏页写回
  • evictor线程:释放内存,如果victim节点是dirty的,需要先将数据写回。

数据在磁盘上是序列化过的,序列化的过程就是把一个数据结构转换成字节流。

写数据包括两个阶段:

  • 序列化:把结构化数据转成字节流
  • 压缩:对序列化好的数据进行压缩

tokudb序列化和压缩单位是partition,对于internal节点,就是把msg buffer序列化并压缩;对于leaf节点,就是把basement node序列化并压缩。

一个节点(node)在磁盘上是如何存储的呢? 节点数据在写盘时会被写到某个offset开始的位置,这个offset是从blocktable里面分配的一个空闲的空间。我们后面会专门写一篇有关btt(Block Translation Table)和block table的文章。 一个node的数据包含:header,pivot key和partition三部分:

  • header:节点meta信息
  • pivot key:记录了每个partition的key区间
  • partition:排序数据;一个node如果包含多个partition,这些partition是依次顺序存放的

有趣的是,压缩算法的信息是存放在partition压缩buffer的第一个字节。所以,tokudb支持FT索引内部同时使用多种压缩算法。

反序列化和读盘

Tokudb读盘的过程是在cachetable里通过调用get_and_pin系列函数实现

  • 前景线程调用get_and_pin系列函数
  • cleaner线程调用bring_node_fully_into_memory,这个函数调用pf_callback把不在内存中的那些partition读到内存。

数据从磁盘读到内存之前需要进行解压缩,然后对解压缩好的buffer进行反序列化,转换成内存数据结构。反序列化是使用序列化相反的方法把数据解析出来。

前面提过序列化和压缩的单位是partition,反序列化和解压缩的单位也是partition。

酱,节点数据就可以被FT层访问了。

序列化和压缩过程详解

这里顺便提一下BTT (Block Translation Table),这个表记录了节点(blocknum)在FT文件存储位置(offset)的映射关系。

为什么要引入这个表?Tokudb刷脏时,数据被写到一个新的空闲位置,避免了in-place update,简化recovery过程。

toku_ftnode_flush_callback是调用get_and_pin系列函数提供的flush_callback回调,checkpoint线程(也包含checkpoint thread pool的线程,在checkpoint过程中帮助前景线程做节点数据的回写)或evictor线程在这个函数里面会调用toku_serialize_ftnode_to做序列化和压缩工作。

toku_serialize_ftnode_to比较简单,首先调用toku_serialize_ftnode_to_memory执行序列化和压缩,然后调用blocktable.realloc_on_disk,为blocknum分配一个新的offset,最后调用pwrite把压缩的buffer写到盘上,回写完成清node->dirty标记。

这里单独说一下toku_serialize_ftnode_to_memory的第6个参数in_parallel,true表示并行处理序列化和压缩过程,false表示串行处理。

toku_ftnode_flush_callback通常是在evictor或者checkpoint线程上下文调用的,不影响前景线程服务客户端,这个参数一般是false,只有在loader场景下是true。

toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT ft, bool for_checkpoint) {

 size_t n_to_write;
 size_t n_uncompressed_bytes;
 char *compressed_buf = nullptr;

 // because toku_serialize_ftnode_to is only called for // in toku_ftnode_flush_callback, we pass false // for in_parallel. The reasoning is that when we write // nodes to disk via toku_ftnode_flush_callback, we // assume that it is being done on a non-critical // background thread (probably for checkpointing), and therefore // should not hog CPU, // // Should the above facts change, we may want to revisit // passing false for in_parallel here // // alternatively, we could have made in_parallel a parameter // for toku_serialize_ftnode_to, but instead we did this. int r = toku_serialize_ftnode_to_memory(
 node,
 ndd,
 ft->h->basementnodesize,
 ft->h->compression_method,
 do_rebalancing,
 toku_drd_unsafe_fetch(&toku_serialize_in_parallel),
 &n_to_write,
 &n_uncompressed_bytes,
 &compressed_buf
 );
 if (r != 0) {
 return r;
 }

 // If the node has never been written, then write the whole buffer, including the zeros
 invariant(blocknum.b>=0);
 DISKOFF offset;

 // Dirties the ft
 ft->blocktable.realloc_on_disk(blocknum, n_to_write, &offset,
 ft, fd, for_checkpoint);

 tokutime_t t0 = toku_time_now();
 toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
 tokutime_t t1 = toku_time_now();

 tokutime_t io_time = t1 - t0;
 toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);

 toku_free(compressed_buf);
 node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction. return 0;
}

序列化和压缩过程是在toku_serialize_ftnode_to_memory实现,这个函数比较长,我们分成3段来看。

  • partition序列化和压缩
  • pivot key序列化和压缩
  • header序列化

partition序列化和压缩

toku_serialize_ftnode_to_memory的第5个参数do_rebalancing表示leaf节点在写回之前是否要做rebalance,这个参数是在toku_ftnode_flush_callback指定的,如果写回的是数据节点本身,那么是需要做rebalance的。

toku_serialize_ftnode_to_memory首先确保整个数据节点都在内存中,这么做是因为节点的partition数据是依次顺序存放的;然后根据do_rebalancing决定是否要对leaf节点做rebalance;接着是一大段内存分配:

  • sb包含节点partition压缩数据的数组,每个元素包含partition的uncompressed的buffer和compressed的buffer
  • ndd是指针数组,记录了每个partition压缩后数据的offset和size

这里有个小的优化,并没有为每个partition申请compressed的buffer,而是申请了一个足够大的buffer,每个partition使用其中的一段。uncompressed的buffer也是一样处理的。

足够大的buffer是什么意思呢?

  • uncompressed的buffer:各个partition的size总和。
  • compressed的buffer:压缩后的最大可能长度加上8个字节的overhead(每个partition压缩前的size和压缩后的size)

使用不同压缩算法,压缩之后的最大可能长度是不同的。

分配好buffer之后,调用serialize_and_compress_in_parallel或者serialize_and_compress_serially进行序列化和压缩。

int toku_serialize_ftnode_to_memory(FTNODE node,
 FTNODE_DISK_DATA* ndd,
 unsigned int basementnodesize,
 enum toku_compression_method compression_method,
 bool do_rebalancing,
 bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false /*out*/ size_t *n_bytes_to_write,
 /*out*/ size_t *n_uncompressed_bytes,
 /*out*/ char **bytes_to_write) // Effect: Writes out each child to a separate malloc'd buffer, then compresses // all of them, and writes the uncompressed header, to bytes_to_write, // which is malloc'd. // // The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed). // 512-byte padding is for O_DIRECT to work. {
 toku_ftnode_assert_fully_in_memory(node);

 if (do_rebalancing && node->height == 0) {
 toku_ftnode_leaf_rebalance(node, basementnodesize);
 }
 const int npartitions = node->n_children;

 // Each partition represents a compressed sub block // For internal nodes, a sub block is a message buffer // For leaf nodes, a sub block is a basement node
 toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
 struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
 XREALLOC_N(npartitions, *ndd);

 // // First, let's serialize and compress the individual sub blocks // // determine how large our serialization and compression buffers need to be.
 size_t serialize_buf_size = 0, compression_buf_size = 0;
 for (int i = 0; i < node->n_children; i++) {
 sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
 sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
 serialize_buf_size += sb[i].uncompressed_size;
 compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
 }

 // give each sub block a base pointer to enough buffer space for serialization and compression
 toku::scoped_malloc serialize_buf(serialize_buf_size);
 toku::scoped_malloc compression_buf(compression_buf_size);
 for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
 sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
 sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
 uncompressed_offset += sb[i].uncompressed_size;
 compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
 invariant(uncompressed_offset <= serialize_buf_size);
 invariant(compressed_offset <= compression_buf_size);
 }

 // do the actual serialization now that we have buffer space struct serialize_times st = { 0, 0 };
 if (in_parallel) {
 serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
 } else {
 serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
 }

serialize_and_compress_serially就是串行调用serialize_and_compress_partition进行序列化和压缩。

static void serialize_and_compress_serially(FTNODE node,
 int npartitions,
 enum toku_compression_method compression_method,
 struct sub_block sb[],
 struct serialize_times *st) {
 for (int i = 0; i < npartitions; i++) {
 serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
 }
}

serialize_and_compress_in_parallel使用了threadpool来并行执行序列化和压缩,每个partition由一个专门的线程来处理。当前上下文也可以执行序列化和压缩,所以threadpool只创建了(npartitions-1)个线程。

threadpool线程执行的函数也是serialize_and_compress_partition;threadpool线程和当前上下文之间是使用work进行同步的。

static void *
serialize_and_compress_worker(void *arg) {
 struct workset *ws = (struct workset *) arg;
 while (1) {
 struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
 if (w == NULL)
 break;
 int i = w->i;
 serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
 }
 workset_release_ref(ws);
 return arg;
}

static void serialize_and_compress_in_parallel(FTNODE node,
 int npartitions,
 enum toku_compression_method compression_method,
 struct sub_block sb[],
 struct serialize_times *st) {
 if (npartitions == 1) {
 serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
 } else {
 int T = num_cores;
 if (T > npartitions)
 T = npartitions;
 if (T > 0)
 T = T - 1;
 struct workset ws;
 ZERO_STRUCT(ws);
 workset_init(&ws);
 struct serialize_compress_work work[npartitions];
 workset_lock(&ws);
 for (int i = 0; i < npartitions; i++) {
 work[i] = (struct serialize_compress_work) { .base = ,
 .node = node,
 .i = i,
 .compression_method = compression_method,
 .sb = sb,
 .st = { .serialize_time = 0, .compress_time = 0} };
 workset_put_locked(&ws, &work[i].base);
 }
 workset_unlock(&ws);
 toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws);
 workset_add_ref(&ws, T);
 serialize_and_compress_worker(&ws);
 workset_join(&ws);
 workset_destroy(&ws);

 // gather up the statistics from each thread's work item for (int i = 0; i < npartitions; i++) {
 st->serialize_time += work[i].st.serialize_time;
 st->compress_time += work[i].st.compress_time;
 }
 }
}

pivot key序列化和压缩

回到toku_serialize_ftnode_to_memory,序列化partition之后就是序列化pivot key的过程。 sb_node_info存放pivot key压缩数据的信息:

  • uncompressed_ptr和uncompressed_size是未压缩数据的buffer和size
  • compressed_ptr和compressed_size_bound是压缩后数据的buffer和压缩后最大可能的size+8个字节的overhead(未压缩数据size和压缩后数据的size)

前面提到,压缩后的size是由压缩算法决定,不同的压缩算法压缩之后最大可能的size是不同的。

toku_serialize_ftnode_to_memory调用serialize_and_compress_sb_node_info把pivot key信息序列化并压缩。

pivot key的compressed buffer头8个字节分别存储pivot key的compressed size和uncompressed size,从第9个字节开始才是压缩的字节流;而checksum是针对整个compressed buffer做的。

// // Now lets create a sub-block that has the common node information, // This does NOT include the header // // determine how large our serialization and copmression buffers need to be struct sub_block sb_node_info;
 sub_block_init(&sb_node_info);
 size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
 size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
 toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
 toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
 sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
 sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
 sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
 sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();

 // do the actual serialization now that we have buffer space
 serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);

 // // At this point, we have compressed each of our pieces into individual sub_blocks, // we can put the header and all the subblocks into a single buffer and return it. // // update the serialize times, ignore the header for simplicity. we captured all // of the partitions' serialize times so that's probably good enough.
 toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);

header序列化

序列化pivot key之后,toku_serialize_ftnode_to_memory计算节点node压缩前size和压缩后的size。 计算方法很简单:partition的size总和 + pivot key的size + header的size + 4个字节的overhead(pivot key的checksum)。

节点node压缩之后的size是为分配压缩后的数据buffer,为了支持direct I/O,分配的buffer和buffer size必须是512对齐的。

分配的buffer size记在n_bytes_to_write返回给调用函数;压缩之后的数据存储在bytes_to_write指向的buffer中。

节点node压缩之前的size,就是为了返回给调用函数,记在n_uncompressed_bytes参数中。

// The total size of the node is: // size of header + disk size of the n+1 sub_block's created above uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
 + sb_node_info.compressed_size // compressed nodeinfo (without its checksum)
 + 4); // nodeinfo's checksum uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
 + sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum)
 + 4); // nodeinfo's checksum // store the BP_SIZESs for (int i = 0; i < node->n_children; i++) {
 uint32_t len = sb[i].compressed_size + 4; // data and checksum
 BP_SIZE (*ndd,i) = len;
 BP_START(*ndd,i) = total_node_size;
 total_node_size += sb[i].compressed_size + 4;
 total_uncompressed_size += sb[i].uncompressed_size + 4;
 }

 // now create the final serialized node uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes. char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
 char *curr_ptr = data;

前面提到节点node序列化的过程分为3个阶段:

  • partition序列化和压缩
  • pivot key序列化和压缩
  • header序列化

前2个阶段都讨论过了,header的部分是调用serialize_node_header实现的。

到这里其他部分的序列化和压缩工作都做好了,header的序列化直接在前面分配好的压缩后数据buffer上进行,不需要压缩,也不必分配sub_block数据结构。

header处理完,直接把pivot key的sub_block的compressed_ptr数据和checksum拷贝过来。

pivot key处理完,直接把每个partition的compressed_ptr和checksum依次拷贝过来。

pad的部分写0。

// write the header struct wbuf wb;
 wbuf_init(&wb, curr_ptr, serialize_node_header_size(node));
 serialize_node_header(node, *ndd, &wb);
 assert(wb.ndone == wb.size);
 curr_ptr += serialize_node_header_size(node);

 // now write sb_node_info memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size);
 curr_ptr += sb_node_info.compressed_size;
 // write the checksum
 *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum);
 curr_ptr += sizeof(sb_node_info.xsum);

 for (int i = 0; i < npartitions; i++) {
 memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size);
 curr_ptr += sb[i].compressed_size;
 // write the checksum
 *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum);
 curr_ptr += sizeof(sb[i].xsum);
 }
 // Zero the rest of the buffer memset(data + total_node_size, 0, total_buffer_size - total_node_size);

 assert(curr_ptr - data == total_node_size);
 *bytes_to_write = data;
 *n_bytes_to_write = total_buffer_size;
 *n_uncompressed_bytes = total_uncompressed_size;

 invariant(*n_bytes_to_write % 512 == 0);
 invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
 return 0;
}

假若一个node包含2个partition,它的序列化结构如下所示:

反序列化和解压缩过程详解

由于tokudb支持partial fetch(只读某几个partition)和partial evict(即把clean节点的部分partition释放掉),反序列化过程相比序列化过程略复杂一些。

fetch callback通过bfe这个hint告诉toku_deserialize_ftnode_from需要读那些partition。

bfe有五种类型:

  • ftnode_fetch_none:只需要读header和pivot key,不需要读任何partition。只用于optimizer计算cost
  • ftnode_fetch_keymatch:只需要读match某个key的partition,ydb层提供的一个接口,一般不用
  • ftnode_fetch_prefetch:prefetch时使用
  • ftnode_fetch_all:需要把所有partition读上来;写节点时使用(msg inject或者msg apply的子节点)
  • ftnode_fetch_subset:需要读若干个partition,FT search路径上使用。

只有在ft search高度>1以上的中间节点时,read_all_partitions会被设置成true,走老的代码路径deserialize_ftnode_from_fd,一次性把所有partition都读到内存中。

其他情况会调用read_ftnode_header_from_fd_into_rbuf_if_small_enough,把节点的header读到内存中,然后反序列化header并设置ndd(每个partition的offset和size);解压缩和反序列化pivot key设置pivot信息;根据bfe读取需要的partition。

节点的header,pivot key和partition都有自己的checksum信息,解析每个部分时都要确认checksum是匹配的。

enum ftnode_fetch_type {
 ftnode_fetch_none = 1, // no partitions needed.
 ftnode_fetch_subset, // some subset of partitions needed
 ftnode_fetch_prefetch, // this is part of a prefetch call
 ftnode_fetch_all, // every partition is needed
 ftnode_fetch_keymatch, // one child is needed if it holds both keys
};

int toku_deserialize_ftnode_from (int fd,
 BLOCKNUM blocknum,
 uint32_t fullhash,
 FTNODE *ftnode,
 FTNODE_DISK_DATA* ndd,
 ftnode_fetch_extra *bfe
 ) // Effect: Read a node in. If possible, read just the header. {
 int r = 0;
 struct rbuf rb = RBUF_INITIALIZER;

 // each function below takes the appropriate io/decompression/deserialize statistics if (!bfe->read_all_partitions) {
 read_ftnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->ft, &rb, bfe);
 r = deserialize_ftnode_header_from_rbuf_if_small_enough(ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
 } else {
 // force us to do it the old way
 r = -1;
 }
 if (r != 0) {
 // Something went wrong, go back to doing it the old way.
 r = deserialize_ftnode_from_fd(fd, blocknum, fullhash, ftnode, ndd, bfe, NULL);
 }

 toku_free(rb.buf);
 return r;
}

deserialize_ftnode_header_from_rbuf_if_small_enough比较长,基本是toku_serialize_ftnode_to_memory的相反过程。

header部分是不压缩的,直接解析,比较magic number,解析node->n_children和ndd等。

然后比较header的checksum

 node->n_children = rbuf_int(rb);
 // Guaranteed to be have been able to read up to here. If n_children // is too big, we may have a problem, so check that we won't overflow // while reading the partition locations.
 unsigned int nhsize;
 nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in.
 unsigned int needed_size;
 needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo. if (needed_size > rb->size) {
 r = toku_db_badformat();
 goto cleanup;
 }

 XMALLOC_N(node->n_children, node->bp);
 XMALLOC_N(node->n_children, *ndd);
 // read the partition locations for (int i=0; i<node->n_children; i++) {
 BP_START(*ndd,i) = rbuf_int(rb);
 BP_SIZE (*ndd,i) = rbuf_int(rb);
 }

 uint32_t checksum;
 checksum = toku_x1764_memory(rb->buf, rb->ndone);
 uint32_t stored_checksum;
 stored_checksum = rbuf_int(rb);
 if (stored_checksum != checksum) {
 dump_bad_block(rb->buf, rb->size);
 r = TOKUDB_BAD_CHECKSUM;
 goto cleanup;
 }

接着处理pivot key,比较pivot key部分的checksum,解压缩,反序列化,设置pivot信息。

 // Finish reading compressed the sub_block const void **cp;
 cp = (const void **) &sb_node_info.compressed_ptr;
 rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
 sb_node_info.xsum = rbuf_int(rb);
 // let's check the checksum uint32_t actual_xsum;
 actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size);
 if (sb_node_info.xsum != actual_xsum) {
 r = TOKUDB_BAD_CHECKSUM;
 goto cleanup;
 }

 // Now decompress the subblock
 {
 toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
 sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
 tokutime_t decompress_t0 = toku_time_now();
 toku_decompress(
 (Bytef *) sb_node_info.uncompressed_ptr,
 sb_node_info.uncompressed_size,
 (Bytef *) sb_node_info.compressed_ptr,
 sb_node_info.compressed_size
 );
 tokutime_t decompress_t1 = toku_time_now();
 decompress_time = decompress_t1 - decompress_t0;

 // at this point sb->uncompressed_ptr stores the serialized node info.
 r = deserialize_ftnode_info(&sb_node_info, node);
 if (r != 0) {
 goto cleanup;
 }
 }

最后是根据bfe读取需要的partition,读partition是通过调用pf_callback实现的。

 // Now we have the ftnode_info. We have a bunch more stuff in the // rbuf, so we might be able to store the compressed data for some // objects. // We can proceed to deserialize the individual subblocks. // setup the memory of the partitions // for partitions being decompressed, create either message buffer or basement node // for partitions staying compressed, create sub_block
 setup_ftnode_partitions(node, bfe, false);

 // We must capture deserialize and decompression time before // the pf_callback, otherwise we would double-count.
 t1 = toku_time_now();
 deserialize_time = (t1 - t0) - decompress_time;

 // do partial fetch if necessary if (bfe->type != ftnode_fetch_none) {
 PAIR_ATTR attr;
 r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr, NULL);
 if (r != 0) {
 goto cleanup;
 }
 }

deserialize_ftnode_from_fd的部分留给读者自行分析。

时间: 2024-09-20 14:34:57

MySQL · 源码分析 · Tokudb序列化和反序列化过程的相关文章

MySQL · 源码分析 · InnoDB 异步IO工作流程

之前的一篇内核月报InnoDB IO子系统 中介绍了InnoDB IO子系统中包含的同步IO以及异步IO.本篇文章将从源码层面剖析一下InnoDB IO子系统中,数据页的同步IO以及异步IO请求的具体实现过程. 在MySQL5.6中,InnoDB的异步IO主要是用来处理预读以及对数据文件的写请求的.而对于正常的页面数据读取则是通过同步IO进行的.到底二者在代码层面上的实现过程有什么样的区别? 接下来我们将以Linux native io的执行过程为主线,对IO请求的执行过程进行梳理. 重点数据结

Hive源码分析:Driver类运行过程

说明: 本文的源码分析基于hive-0.12.0-cdh5.0.1. 概括 从<hive cli的入口类>中可以知道hive中处理hive命令的处理器一共有以下几种: (1)set SetProcessor,设置修改参数,设置到SessionState的HiveConf里. (2)dfs DfsProcessor,使用hadoop的FsShell运行hadoop的命令. (3)add AddResourceProcessor,添加到SessionState的resource_map里,运行提交

springboot源码分析7-环境属性构造过程(上)

使用springboot的目的就是在项目开发中,快速出东西,因此springboot对于配置文件的格式支持是非常丰富的,最常见的配置文件后缀有如下四种:properties.xml.yml.yaml,比如我们在springboot项目根目录中配置了一个application.properties文件,则springboot项目启动的时候就会自动将该文件的内容解析并设置到环境中,这样后续需要使用该文件中配置的属性的时候,只需要使用@value即可.同理application.xml.applica

Hadoop2源码分析-序列化篇

1.概述 上一篇我们了解了MapReduce的相关流程,包含MapReduce V2的重构思路,新的设计架构,与MapReduce V1的区别等内容,今天我们在来学习下在Hadoop V2中的序列化的相关内容,其目录如下所示: 序列化的由来 Hadoop序列化依赖图详解 Writable常用实现类 下面,我们开始学习今天的内容. 2.序列化的由来 我们知道Java语言对序列化提供了非常友好的支持,在定义一个类时,如果我们需要序列化一个类,只需要实现该类的序列化接口即可.场景:让一个AppInfo

MySQL · 源码分析 · InnoDB LRU List刷脏改进之路

之前的一篇内核月报MySQL · 引擎特性 · InnoDB Buffer Pool 中对InnoDB Buffer pool的整体进行了详细的介绍.文章已经提到了LRU List以及刷脏的工作原理.本篇文章着重从MySQL 5.7源码层面对LRU List刷脏的工作原理,以及Percona针对MySQL LRU Flush的一些性能问题所做的改进,进行一下分析. 在MySQL中,如果当前数据库需要操作的数据集比Buffer pool中的空闲页面大的话,当前Buffer pool中的数据页就必须

MySQL · 源码分析 · Innodb 引擎Redo日志存储格式简介

MySQL有多种日志.不同种类.不同目的的日志会记录在不同的日志文件中,它们可以帮助你找出mysqld内部发生的事情.比如错误日志:用来记录启动.运行或停止mysqld进程时出现的问题:查询日志:记录建立的客户端连接和执行的语句:二进制日志:记录所有更改数据的语句,主要用于逻辑复制:慢日志:记录所有执行时间超过long_query_time秒的所有查询或不使用索引的查询.而对MySQL中最常用的事务引擎innodb,redo日志是保证事务一致性非常重要的.本文结合MySQL版本5.6为分析源码介

MySQL · 源码分析 · SHUTDOWN过程

ORACLE 中的SHUTDOWN MySQL SHUTDOWN LEVEL 暂时只有一种,源码中留了 LEVEL 的坑还没填 在此借用 Oracle 的 SHUTDOWN LEVEL 分析 Oracle SHUTDOWN LEVEL 共有四种:ABORT.IMMEDIATE.NORMAL.TRANSACTIONAL ABORT 立即结束所有SQL 回滚未提交事务 断开所有用户连接 下次启动实例时,需要recovery IMMEDIATE 允许正在运行的SQL执行完毕 回滚未提交事务 断开所有用

MySQL · 源码分析 · MySQL 半同步复制数据一致性分析

简介 MySQL Replication为MySQL用户提供了高可用性和可扩展性解决方案.本文介绍了MySQL Replication的主要发展历程,然后通过三个参数rpl_semi_sync_master_wait_point.sync_binlog.sync_relay_log的配置简要分析了MySQL半同步的数据一致性. MySQL Replication的发展 在2000年,MySQL 3.23.15版本引入了Replication.Replication作为一种准实时同步方式,得到广泛

MySQL · 源码分析 · MySQL BINLOG半同步复制数据安全性分析

半同步复制(semisynchronous replication)MySQL使用广泛的数据复制方案,相比于MySQL内置的异步复制它保证了数据的安 全,本文从主机在Server层提交事务开始一直到主机确认收到备机回复进行一步步解析,来看MySQL的半同步复制是怎么保证数 据安全的.本文基于MySQL 5.6源码,为了简化本文只分析DML的核心的事务处理过程,并假定事务只涉及innodb存储引擎. MySQL的事务提交流程 在MySQL中事务的提交Server层最后会调用函数ha_commit_