1 概述
文件压缩主要有两个好处,一是减少了存储文件所占空间,另一个就是为数据传输提速。在hadoop大数据的背景下这两点尤为重要。hadoop里支持很多种压缩格式:
DEFLATE是同时使用了LZ77算法与哈夫曼编码(Huffman Coding)的一个无损数据压缩算法,源代码可以在zlib库中找到。gzip是以DEFLATE算法为基础扩展出来的一种算法。
压缩算法 |
原始文件大小 |
压缩后的文件大小 |
压缩速度 |
解压缩速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO-bset |
8.3GB |
2GB |
4MB/s |
60.6MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/S |
74.6MB/s |
所有的压缩算法都是空间和时间的转换,更快压缩时间还是更小的压缩比,可以通过参数来指定,-1意味着速度,-9意味着空间。拿gzip做个例子,下面就意味着更快速的压缩:gzip -1 file
gzip在时间和空间上的取舍比较折中,bzip2压缩比gzip更有效,但是速度更慢。bzip2的解压速度比它的压缩速度要快。但是和其他压缩格式比又是最慢的,但是压缩效果明显是最好的。snappy和lz4的解压速度比lzo好很多。splittable表示压缩格式是否可以被分割,也就是说是否支持随即读。压缩数据是否能被mapreduce使用,压缩数据是否能被分割就很关键了。
举例一个未压缩的文件有1GB大小,hdfs默认的block大小是64MB,那么这个文件就会被分为16个block作为mapreduce的输入,每一个单独使用一个map任务。若该文件是已经使用gzip压缩的呢,若分成16个块,每个块做成一个输入,显然是不合适的,因为gzip压缩流的随即读是不可能的。实际上,当mapreduce处理压缩格式的文件的时候它会认识到这是一个gzip的压缩文件,而gzip又不支持随即读,它就会把16个块分给一个map去处理,这里就会有很多非本地处理的map任务,整个过程耗费的时间就会相当长。lzo压缩格式也会是同样的问题,但是通过使用hadoop
lzo库的索引工具以后,lzo就可以支持splittable。bzip2也是支持splittable的。
详见hadoop-2.5.2-src源码中的FileInputFormat类中的getSplits方法
if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[] splitHosts = getSplitHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts)); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { String[] splitHosts = getSplitHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts)); } } else { String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts)); }
那么如何选择压缩格式呢?这取决于文件的大小,你使用的压缩工具,下面是几条选择建议,效率由高到低排序:
1用一些包含了压缩并且支持splittable的文件格式,比如Sequence File,RCFile或者Avro文件,这些文件格式我们之后都会讲到。若为了快速压缩可以使用lzo,lz4或者snappy压缩格式。
2 使用提供splittable的压缩格式,比如,bzip2和索引后可以支持splittable的lzo。
3 提前把文件分成几个块,每个块单独压缩,这样就无需考虑splittable的问题了
4 不要压缩文件。以不支持splittable的压缩格式存储一个很大的数据文件是不合适的,非本地处理效率会非常之低。
2 代码
codec其实就是coder和decoder两个单词的词头组成的缩略词。CompressionCodec定义了压缩和解压接口,所以又叫编码解码器。为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示
压缩格式 |
对应的编码/解码器 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip |
org.apache.hadoop.io.compress.Bzip2Codec |
Snappy |
org.apache.hadoop.io.compress.SnappyCodec |
public class TestCompress { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 将a.txt压缩为a.txt.bz2 compress("E://a.txt", "BZip2Codec"); // 将a.txt.bz2解压为a.txt.bz2.decoded decompres("E://a.txt.bz2"); System.out.println("down"); } /** * 压缩方法 * @param filename 待压缩的文件路径 * @param method 压缩方法名 */ public static void compress(String filePath, String method) throws ClassNotFoundException, IOException { // 需要被压缩的文件 File fileIn = new File(filePath); InputStream in = new FileInputStream(fileIn); // 通过名称反射得到编码解码器 Configuration conf = new Configuration(); Class<?> codecClass = Class.forName("org.apache.hadoop.io.compress." + method); CompressionCodec codec = (CompressionCodec) ReflectionUtils .newInstance(codecClass, conf); // 获取文件扩展名并创建结果文件名 File fileOut = new File(filePath + codec.getDefaultExtension()); OutputStream out = new FileOutputStream(fileOut); // 输出到结果文件,缓冲区设为5MB CompressionOutputStream cout = codec.createOutputStream(out); IOUtils.copyBytes(in, cout, 1024 * 1024 * 5, false); System.out.println("compress success"); in.close(); cout.close(); } /** * 解压方法 * @param filename 待解压的文件 */ public static void decompres(String filePath) throws FileNotFoundException, IOException { // 根据文件后缀获取解码器 Configuration conf = new Configuration(); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(new Path(filePath)); if (null == codec) { System.out.println("Cannot find codec for file " + filePath); return; } // 获取输入流 InputStream cin = codec .createInputStream(new FileInputStream(filePath)); // 输出文件 File fout = new File(filePath + ".decoded"); OutputStream out = new FileOutputStream(fout); IOUtils.copyBytes(cin, out, 1024 * 1024 * 5, false); System.out.println("decompres success"); cin.close(); out.close(); } }
原贴地址:http://blog.csdn.net/lastsweetop/article/details/9162031