Hadoop Common源码分析之SerializationFactory、Serialization

        SerializationFactory是Hadoop中的一个序列化器工厂,它除了用来存储序列化器种类、更新序列化器相关配置信息,还提供了根据指定待序列化类获取相匹配的序列化器和反序列化器的功能。

        SerializationFactory中的关键成员变量只有一个,如下:

private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();

        serializations是一个存储实现Serialization接口对象的列表,而Serialization代表了序列化器或反序列化器的公共行为,它实际上是对序列化器/反序列化器对的压缩,列表中的对象其实是(序列化器/反序列化器对)的结合体,提供了以下三个方法:

/**
 * <p>
 * Encapsulates a {@link Serializer}/{@link Deserializer} pair.
 * </p>
 * @param <T>
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Serialization<T> {

  /**
   * Allows clients to test whether this {@link Serialization}
   * supports the given class.
   * 允许客户端去检测这个Serialization是否支持给定Class
   */
  boolean accept(Class<?> c);

  /**
   * 为给定Class获取一个序列化器Serializer
   * @return a {@link Serializer} for the given class.
   */
  Serializer<T> getSerializer(Class<T> c);

  /**
   * 为给定Class获取一个反序列化器Deserializer
   * @return a {@link Deserializer} for the given class.
   */
  Deserializer<T> getDeserializer(Class<T> c);
}

        很明显,accept(Class<?> c)方法允许客户端去检测这个Serialization是否支持给定Class,getSerializer(Class<T> c)方法为给定Class获取一个序列化器Serializer,而getDeserializer(Class<T> c)方法为给定Class获取一个反序列化器Deserializer。

        我们再看SerializationFactory的构造函数,它通过读取配置信息conf中的io.serializations参数来确定Serializations,这个参数是一个逗号隔开的类名列表,代码如下:

  /**
   * <p>
   * Serializations are found by reading the <code>io.serializations</code>
   * property from <code>conf</code>, which is a comma-delimited list of
   * classnames.
   * 通过读取配置信息conf中的io.serializations参数来确定Serializations,这个参数是一个逗号隔开的类名列表
   * </p>
   */
  public SerializationFactory(Configuration conf) {

	// 调用父类构造函数
	super(conf);

    // 遍历参数io.serializations配置的序列器名称serializerName,
    // 参数未配置默认为WritableSerialization、AvroSpecificSerialization、AvroReflectSerialization三个
    for (String serializerName : conf.getStrings(
      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
      new String[]{WritableSerialization.class.getName(),
        AvroSpecificSerialization.class.getName(),
        AvroReflectSerialization.class.getName()})) {

      // 调用add(conf, serializerName)方法添加序列化器
      add(conf, serializerName);
    }
  }

        构造函数先调用父类构造函数,然后遍历参数io.serializations配置的序列器名称serializerName,参数未配置默认为WritableSerialization、AvroSpecificSerialization、AvroReflectSerialization三个,然后调用add(conf, serializerName)方法添加每个序列化器名称。继续追踪add()方法,代码如下:

  @SuppressWarnings("unchecked")
  private void add(Configuration conf, String serializationName) {
    try {
      // 通过类名serializationName获取Class,即serializionClass
      Class<? extends Serialization> serializionClass =
        (Class<? extends Serialization>) conf.getClassByName(serializationName);

      // 通过反射生成类serializionClass的对象,并将其加入到serializations列表
      serializations.add((Serialization)
      ReflectionUtils.newInstance(serializionClass, getConf()));
    } catch (ClassNotFoundException e) {
      LOG.warn("Serialization class not found: ", e);
    }
  }

        add()方法很简单,就做了两件事:

        1、通过类名serializationName获取Class,即serializionClass;

        2、通过反射生成类serializionClass的对象,这个对象其实是(序列化器/反序列化器对)的结合体,并将其加入到serializations列表。

        SerializationFactory还提供了为指定类获取序列化器、反序列化器的getSerializer(Class<T> c)和getDeserializer(Class<T> c)方法,代码如下:

  // 为指定类c获取序列化器Serializer
  public <T> Serializer<T> getSerializer(Class<T> c) {
	// 根据指定类c,调用getSerialization()方法,获取对应序列化器serializer,实际上是确定(序列化器/反序列化器对)对象serializer
    Serialization<T> serializer = getSerialization(c);
    if (serializer != null) {
      // 通过serializer的getSerializer()方法为指定类c获取序列化器的实例
      return serializer.getSerializer(c);
    }
    // 找不到则返回null
    return null;
  }

  // 为指定类c获取反序列化器Deserializer
  public <T> Deserializer<T> getDeserializer(Class<T> c) {
	// 根据指定类c,调用getSerialization()方法,获取对应序列化器serializer,实际上是确定(序列化器/反序列化器对)对象serializer
    Serialization<T> serializer = getSerialization(c);
    if (serializer != null) {
      // 通过serializer的getSerializer()方法为指定类c获取反序列化器的实例
      return serializer.getDeserializer(c);
    }
    // 找不到则返回null
    return null;
  }

  // 为指定类c获取(序列化器/反序列化器对)对象
  @SuppressWarnings("unchecked")
  public <T> Serialization<T> getSerialization(Class<T> c) {
	// 遍历serializations列表,获取其中每个Serialization对象serialization
    for (Serialization serialization : serializations) {

      // 调用serialization的accept()方法,如果其与c相匹配,则返回该serialization
      if (serialization.accept(c)) {
        return (Serialization<T>) serialization;
      }
    }

    // 找不到则返回null
    return null;
  }

        两个方法的处理逻辑一致,大体如下:

        1、根据指定类c,调用getSerialization()方法,获取对应序列化器serializer,实际上是确定(序列化器/反序列化器对)对象serializer;

        2、通过serializer的getSerializer()方法或getDeserializer()方法为指定类c获取序列化器或发序列化器的实例,并返回;

        3、找不到则返回null。

        而两者共用的getSerialization()方法则是为指定类c获取(序列化器/反序列化器对)对象,处理逻辑大致为:遍历serializations列表,获取其中每个Serialization对象serialization,调用serialization的accept()方法,如果其与c相匹配,则返回该serialization,找不到则返回null。

        

时间: 2024-07-29 08:06:16

Hadoop Common源码分析之SerializationFactory、Serialization的相关文章

Hadoop Common源码分析之服务Service

        Service是定义Hadoop中服务生命周期的一个接口.Service内部定义了服务的状态及生命周期,在服务被构造后,其一个生命周期内的状态为NOTINITED未初始化--INITED已初始化--已启动STARTED--已停止STOPPED,而这一生命周期内服务状态的变化,是随着如下方法链的调用而变化的:init()--start()--stop(),服务构造后整体方法调用及状态转移如下图所示:                                          

Hadoop2源码分析-准备篇

1.概述 我们已经能够搭建一个高可用的Hadoop平台了,也熟悉并掌握了一个项目在Hadoop平台下的开发流程,基于Hadoop的一些套件我们也能够使用,并且能利用这些套件进行一些任务的开发.在Hadoop的应用级别上,我们接着往后面去研究学习,那就是Hadoop的源码了,作为Hadoop开发人员,我们得去学习和研究Hadoop得实现原理,底层框架的设计,编码的实现过程等等,下面就开始我们今天的Hadoop源码分析之旅. 2.准备 在分析源码之前,我们需要准备好分析源码的环境,以及如何去分析(分

Hadoop2源码分析-Hadoop V2初识

1.概述 在完成分析Hadoop2源码的准备工作后,我们进入到后续的源码学习阶段.本篇博客给大家分享,让大家对Hadoop V2有个初步认识,博客的目录内容如下所示: Hadoop的渊源 Hadoop V2部分项目图 各个包的功能介绍 本篇文章的源码是基于Hadoop-2.6.0来分析,其他版本的Hadoop的源码可以此作为参考分析. 2.Hadoop的渊源 其实,早年Google的核心竞争力是它的计算平台,Google对外公布的论文有一下内容: GoogleCluster  Chubby  G

mahout源码分析之DistributedLanczosSolver(七) 总结篇

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 看svd算法官网上面使用的是亚马逊的云平台计算的,不过给出了svd算法的调用方式,当算出了eigenVectors后,应该怎么做呢?比如原始数据是600*60(600行,60列)的数据,计算得到的eigenVectors是24*60(其中的24是不大于rank的一个值),那么最后得到的结果应该是original_data乘以eigenVectors的转置这样就会得到一个600*24的矩阵,这样就达到了

深入理解Spark:核心思想与源码分析

大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术丛书) ISBN 978-7-111-52234-8 I. 深- II.耿- III.数据处理软件 IV. TP274 中国版本图书馆CIP数据核字(2015)第280808号 深入理解Spark:核心思想与源码分析 出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

Spark源码分析之四:Stage提交

        各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交.         Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示:         与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那

Alluxio源码分析:RPC框架浅析(一)

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS.HBase等一样,也是由主从节点构成的.而节点之间的通信,一般都是采用的RPC通讯模型.Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答.         一.Alluxio中RPC实现技术支持         Alluxio中的RPC是依靠Thrift实现的,Apache Thrift 是 Facebook 实现的一种高效的.支持多

HDFS源码分析DataXceiver之读数据块

         在<HDFS源码分析DataXceiver之整体流程>一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver.并且,在线程DataXceiver处理请求的主方法run()方法内,会先读取操作符op,然后根据操作符op分别调用相应的方法进行请求的处理.而决定什么样的操作符op该调用何种方法的逻辑,则是在DataX

mahout源码分析之DistributedLanczosSolver(五)

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 1. Job 篇 接上篇,分析到EigenVerificationJob的run方法: public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue, boolean inMemory, Configuration conf) thro