hadoop中实现定制Writable类

Hadoop中有一套Writable实现可以满足大部分需求,但是在有些情况下,我们需要根据自己的需要构造一个新的实现,有了定制的Writable,我们就可以完全控制二进制表示和排序顺序。

为了演示如何新建一个定制的writable类型,我们需要写一个表示一对字符串的实现:

blic class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;

    public TextPair() {
        set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
        set(first, second);
    }

    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public Text getScond() {
        return second;
    }

    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
    }

    public boolean equals(Object o) {
        if(o instanceof TextPair) {
            TextPair tp = (TextPair)o;
            return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
    }

    public String toString() {
        return first + "\t" + second;
    }

    public int compareTo(TextPair tp) {
        int cmp = first.compareTo(tp.first);
        if(cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
    }
}

为速度实现一个RawComparator

还可以进一步的优化,当作为MapReduce里的key,需要进行比较时,因为他已经被序列化,想要比较他们,那么首先要先反序列化成一个对象,
然后再调用compareTo对象进行比较,但是这样效率太低了,有没有可能可以直接比较序列化后的结果呢,答案是肯定的,可以。

RawComparator接口允许执行者比较流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有的开销,其中,compare() 比较时需要的两个参数所对应的记录位于字节数组b1和b2指定开始位置s1和s2,记录长度为l1和l2,代码如下:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

以IntWritable为例,它的RawComparator实现中,compare() 方法通过readInt()直接在字节数组中读入需要比较的两个整数,然后输出Comparable接口要求的比较结果。

值得注意的是,该过程中compare()方法避免使用IntWritable对象,从而避免了不必要的对象分配,相关代码如下:

  /** A Comparator optimized for IntWritable. */
  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(IntWritable.class);
    }

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int thisValue = readInt(b1, s1);
      int thatValue = readInt(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    }
  }

Writablecomparator是RawComparator对WritableComparable类的一个通用实现,它提供两个主要功能:

1、提供了一个RawComparator的compare()默认实现,该实现从数据流中反序列化要进行比较的对象,然后调用对象的compare()方法进行比较

2、它充当了RawComparator实例的一个工厂方法。例如,可以通过下面的代码获得IntWritable的RawComparator:

RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);

我们只需要把EmploeeWritable的序列化后的结果拆成成员对象,然后比较成员对象即可:

class Comparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    public Comparator() {
        super(TextPair.class);
    }
    public int compara(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        try {
            int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
            int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
            if(cmp != 0) {
                return cmp;
            }
            return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 -  firstL2);
        } catch(IOException e) {
            throw new IllegalArgumentException(e);
        }
    }
}

 

定制comparators

有时候,除了默认的comparator,你可能还需要一些自定义的comparator来生成不同的排序队列,看一下下面这个示例:

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        try {
            int firstL1 = WritableUtils.decodeVIntSize(b1[s1])+ readVInt(b1, s1);
            int firstL2 = WritableUtils.decodeVIntSize(b2[s2])+ readVInt(b2, s2);
            return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public int compare(WritableComparable a, WritableComparable b) {
        if(a instanceof Textpair && b instanceof TextPair) {
            return ((TextPair) a).first.compareTo(((TextPair) b).first);
        }
        return super.compare(a, b);
    }
时间: 2025-01-02 17:31:37

hadoop中实现定制Writable类的相关文章

hadoop中典型Writable类详解

Hadoop将很多Writable类归入org.apache.hadoop.io包中,在这些类中,比较重要的有Java基本类.Text.Writable集合.ObjectWritable等,重点介绍Java基本类和ObjectWritable的实现. 1. Java基本类型的Writable封装 目前Java基本类型对应的Writable封装如下表所示.所有这些Writable类都继承自WritableComparable.也就是说,它们是可比较的.同时,它们都有get()和set()方法,用于

hadoop中的序列化与Writable类

hadoop中自带的org.apache.hadoop.io包中有广泛的writable类可供选择,它们形成下图所示的层次结构: java基本类型的Writable封装器 Writable类对java基本类型提供封装,short和char除外,所有的封装包含get()和set()两个方法用于读取或设置封装的值 java基本类型的Writable类 java原生类型 除char类型以外,所有的原生类型都有对应的Writable类,并且通过get和set方法可以他们的值.IntWritable和 L

hadoop中的序列化与Writable接口

简介 序列化和反序列化就是结构化对象和字节流之间的转换,主要用在内部进程的通讯和持久化存储方面. 通讯格式需求 hadoop在节点间的内部通讯使用的是RPC,RPC协议把消息翻译成二进制字节流发送到远程节点,远程节点再通过反序列化把二进制流转成原始的信息.RPC的序列化需要实现以下几点: 1.压缩,可以起到压缩的效果,占用的宽带资源要小. 2.快速,内部进程为分布式系统构建了高速链路,因此在序列化和反序列化间必须是快速的,不能让传输速度成为瓶颈. 3.可扩展的,新的服务端为新的客户端增加了一个参

Hadoop中使用FileStatus类来查看HDFS中文件或目录的元信息

Hadoop中的FileStatus类可以用来查看HDFS中文件或者目录的元信息,任意的文件或者目录都可以拿到对应的FileStatus, 我们这里简单的演示下这个类的相关API: /* */ package com.charles.hadoop.fs; import java.net.URI; import java.sql.Timestamp; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.F

定制并发类(五)在一个Executor对象中使用我们的ThreadFactory

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González     译者:许巧辉 在一个Executor对象中使用我们的ThreadFactory 在前面的指南中,实现ThreadFactory接口生成自定义线程,我们引进了工厂模式和提供如何实现一个实现ThreadFactory接口的线程的工厂例子. 执行者框架(Executor framework)是一种机制,它允许你将线程的创建与执行分离.它是基于Execu

ASP.NET中实现定制自己的委托和事件参数类_实用技巧

本文实例讲述了ASP.NET中实现定制自己的委托和事件参数类的方法,对于学习ASP.NET有很好的参考借鉴价值.具体方法如下: 一般在实际开发中,对于事件不需要传递数据信息时,像上面的KingTextBox控件的事件,在引发事件时传递的参数为EventArgs.Empty,如下所示: OnTextChanged(EventArgs.Empty); 这是因为控件KingTextBox的TextChanged事件比较简单,这里不需要参数对象传递数据.但像一些复杂的控件比如GridView的按钮命令事

hadoop中OutputFormat 接口的设计与实现

OutputFormat 主要用于描述输出数据的格式,它能够将用户提供的 key/value 对写入特定格式的文件中. 本文将介绍 Hadoop 如何设计 OutputFormat 接口 , 以及一些常用的OutputFormat 实现. 1.旧版 API 的 OutputFormat 解析 如图所示, 在旧版 API 中,OutputFormat 是一个接口,它包含两个方法: RecordWriter<K, V> getRecordWriter(FileSystem ignored, Job

hadoop中InputFormat 接口的设计与实现

InputFormat 主要用于描述输入数据的格式, 它提供以下两个功能. 数据切分:按照某个策略将输入数据切分成若干个 split, 以便确定 Map Task 个数以及对应的 split. 为 Mapper 提供输入数据: 给定某个 split, 能将其解析成一个个 key/value 对. 本文将介绍 Hadoop 如何设计 InputFormat 接口,以及提供了哪些常用的 InputFormat实现. 1 .旧版 API 的 InputFormat 解析 如图所示: 在旧版 API 中

定制并发类(一)引言

声明:本文是< Java 7 Concurrency Cookbook>的第七章,作者: Javier Fernández González 译者:许巧辉 在这个文章中,我们将包含: 定制ThreadPoolExecutor类 实现一个基于优先级的Executor类 实现ThreadFactory接口生成自定义的线程 在一个Executor对象中使用我们的ThreadFactory 定制任务运行在一个计划的线程池中 实现ThreadFactory接口生成自定义的线程给Fork/Join框架 定