高可用Hadoop平台-启航

1.概述

  在上篇博客中,我们搭建了《配置高可用Hadoop平台》,
接下来我们就可以驾着Hadoop这艘巨轮在大数据的海洋中遨游了。工欲善其事,必先利其器。是的,没错;我们开发需要有开发工具(IDE);本篇文章,
我打算讲解如何搭建和使用开发环境,以及编写和讲解WordCount这个例子,给即将在Hadoop的海洋驰骋的童鞋入个门。上次,我在《网站日志统计案例分析与实现》中说会将源码放到Github,后来,我考虑了下,决定将《高可用的Hadoop平台》做一个系列,后面基于这个平台,我会单独写一篇来赘述具体的实现过程,和在实现过程中遇到的一些问题,以及解决这些问题的方案。下面我们开始今天的启航

2.启航

  IDE:JBoss Developer Studio 8.0.0.GA (Eclipse的升级版,Redhat公司出的)

  JDK:1.7(或1.8)

  Hadoop2x-eclipse-plugin:这个插件,本地单元测试或自己做学术研究比较好用

  插件下载地址:https://github.com/smartdengjie/hadoop2x-eclipse-plugin

  由于JBoss Developer Studio 8基本适合于Retina屏,所以,我们这里直接使用JBoss Developer Studio 8,JBoss Developer Studio 7对Retina屏的支持不是很完美,这里就不赘述了。

  附上一张IDE的截图:

2.1安装插件

  下面我们开始安装插件,首先展示首次打开的界面,如下图所示:

  然后,我们到上面给的Github的地址,clone整个工程,里面有编译好的jar和源码,可自行选择(使用已存在的和自己编译对应的版本),这里我直接使用编译好的版本。我们将jar放到IDE的plugins目录下,如下图所示:

  接着,我们重启IDE,界面出现如下图所示的,即表示插件添加成功,若没有,查看IDE的启动日志,根据异常日志定位出原因。

2.2设置Hadoop插件

  配置信息如下所示(已在图中说明):

  添加本地的hadoop源码目录:

  到这里,IDE和插件的搭建就完成了,下面我们进入一段简单的开发,hadoop的源码中提供了许多example让我学习,这里我以WordCount为例子来说明:

3.WordCount

  首先我们看下hadoop的源码文件目录,如下图所示:

3.1源码解读

package cn.hdfs.mr.example;

import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hdfs.utils.ConfigUtils;

/**
 *
 * @author dengjie
 * @date 2015年03月13日
 * @description Wordcount的例子是一个比较经典的mapreduce例子,可以叫做Hadoop版的hello world。
 *              它将文件中的单词分割取出,然后shuffle,sort(map过程),接着进入到汇总统计
 *              (reduce过程),最后写道hdfs中。基本流程就是这样。
 */
public class WordCount {

    private static Logger log = LoggerFactory.getLogger(WordCount.class);

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    /*
     * 源文件:a b b
     *
     * map之后:
     *
     * a 1
     *
     * b 1
     *
     * b 1
     */
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());// 整行读取
        while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());// 按空格分割单词
        context.write(word, one);// 每次统计出来的单词+1
        }
    }
    }

    /*
     * reduce之前:
     *
     * a 1
     *
     * b 1
     *
     * b 1
     *
     * reduce之后:
     *
     * a 1
     *
     * b 2
     */
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();
    Configuration conf2 = new Configuration();
    long random1 = new Random().nextLong();// 重定下输出目录1
    long random2 = new Random().nextLong();// 重定下输出目录2
    log.info("random1 -> " + random1 + ",random2 -> " + random2);
    Job job1 = new Job(conf1, "word count1");
    job1.setJarByClass(WordCount.class);
    job1.setMapperClass(TokenizerMapper.class);// 指定Map计算的类
    job1.setCombinerClass(IntSumReducer.class);// 合并的类
    job1.setReducerClass(IntSumReducer.class);// Reduce的类
    job1.setOutputKeyClass(Text.class);// 输出Key类型
    job1.setOutputValueClass(IntWritable.class);// 输出值类型  

    Job job2 = new Job(conf2, "word count2");
    job2.setJarByClass(WordCount.class);
    job2.setMapperClass(TokenizerMapper.class);
    job2.setCombinerClass(IntSumReducer.class);
    job2.setReducerClass(IntSumReducer.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);
    // FileInputFormat.addInputPath(job, new
    // Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "test.txt")));
    // 指定输入路径
    FileInputFormat.addInputPath(job1, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "word")));
    // 指定输出路径
    FileOutputFormat.setOutputPath(job1, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_OUT, random1)));
    FileInputFormat.addInputPath(job2, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_IN, "word")));
    FileOutputFormat.setOutputPath(job2, new Path(String.format(ConfigUtils.HDFS.WORDCOUNT_OUT, random2)));

    boolean flag1 = job1.waitForCompletion(true);// 执行完MR任务后退出应用
    boolean flag2 = job1.waitForCompletion(true);
    if (flag1 && flag2) {
        System.exit(0);
    } else {
        System.exit(1);
    }

    }
}

4.总结

  这篇文章就和大家分享到这里,如果在研究的过程有什么问题,可以加群讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-08-04 11:56:42

高可用Hadoop平台-启航的相关文章

高可用Hadoop平台-探索

1.概述 上篇<高可用Hadoop平台-启航>博客已经让我们初步了解了Hadoop平台:接下来,我们对Hadoop做进一步的探索,一步一步的揭开Hadoop的神秘面纱.下面,我们开始赘述今天的探索之路. 2.探索 在探索之前,我们来看一下Hadoop解决了什么问题,Hadoop就是解决了大数据(大到单台服务器无法进行存储,单台服务器无法在限定的时间内进行处理)的可靠存储和处理. HDFS:在由普通或廉价的服务器(或PC)组成的集群上提供高可用的文件存储,通过将块保存多个副本的办法解决服务器或硬

高可用Hadoop平台-集成Hive HAProxy

1.概述 这篇博客是接着<高可用Hadoop平台>系列讲,本篇博客是为后面用 Hive 来做数据统计做准备的,介绍如何在 Hadoop HA 平台下集成高可用的 Hive 工具,下面我打算分以下流程来赘述: 环境准备 集成并配置 Hive 工具 使用 Java API 开发 Hive 代码 下面开始进行环境准备. 2.环境准备 Hive版本:<Hive-0.14> HAProxy版本:<HAProxy-1.5.11> 注:前提是 Hadoop 的集群已经搭建完成,若还没

高可用Hadoop平台-实战尾声篇

1.概述 今天这篇博客就是<高可用Hadoop平台>的尾声篇了,从搭建安装到入门运行 Hadoop 版的 HelloWorld(WordCount 可以称的上是 Hadoop 版的 HelloWorld ),在到开发中需要用到的各个套件以及对套件的安装使用,在到 Hadoop 的实战,一路走来我们对在Hadoop平台下开发的基本流程应该都熟悉了.今天我们来完成在高可用Hadoop平台开发的最后一步,导出数据. 2.导出数据目的 首先,我来说明下为什么要导出数据,导出数据的目的是为了干嘛? 我们

高可用Hadoop平台-实战

1.概述 今天继续<高可用的Hadoop平台>系列,今天开始进行小规模的实战下,前面的准备工作完成后,基本用于统计数据的平台都拥有了,关于导出统计结果的文章留到后面赘述.今天要和大家分享的案例是一个基于电商网站的用户行为分析,这里分析的指标包含以下指标: 统计每日PV 每日注册用户 每日IP 跳出用户 其他指标可以参考上述4个指标进行拓展,下面我们开始今天的分析之旅. 2.流程 首先,在开发之前我们需要注意哪些问题?我们不能盲目的按照自己的意愿去开发项目,这样到头来得不到产品的认可,我们的工作

高可用Hadoop平台-Flume NG实战图解篇

1.概述 今天补充一篇关于Flume的博客,前面在讲解高可用的Hadoop平台的时候遗漏了这篇,本篇博客为大家讲述以下内容: Flume NG简述 单点Flume NG搭建.运行 高可用Flume NG搭建 Failover测试 截图预览 下面开始今天的博客介绍. 2.Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.Fl

高可用Hadoop平台-HBase集群搭建

1.概述 今天补充一篇HBase集群的搭建,这个是高可用系列遗漏的一篇博客,今天抽时间补上,今天给大家介绍的主要内容目录如下所示: 基础软件的准备 HBase介绍 HBase集群搭建 单点问题验证 截图预览 那么,接下来我们开始今天的HBase集群搭建学习. 2.基础软件的准备 由于HBase的数据是存放在HDFS上的,所以我们在使用HBase时,确保Hadoop集群已搭建完成,并运行良好.若是为搭建Hadoop集群,请参考我写的<配置高可用的Hadoop平台>来完成Hadoop平台的搭建.另

高可用Hadoop平台-Ganglia安装部署

1.概述 最近,有朋友私密我,Hadoop有什么好的监控工具,其实,Hadoop的监控工具还是蛮多的.今天给大家分享一个老牌监控工具 Ganglia,这个在企业用的也算是比较多的,Hadoop对它的兼容也很好,不过就是监控界面就不是很美观.下次给大家介绍另一款工具--Hue,这 个界面官方称为Hadoop UI,界面美观,功能也比较丰富.今天,在这里主要给大家介绍Ganglia这款监控工具,介绍的内容主要包含如下: Ganglia背景 Ganglia安装部署.配置 Hadoop集群配置Gangl

高可用Hadoop平台-答疑篇

1.概述 这篇博客不涉及到具体的编码,只是解答最近一些朋友心中的疑惑.最近,一些朋友和网友纷纷私密我,我总结了一下,疑问大致包含以下几点: 我学 Hadoop 后能从事什么岗位? 在遇到问题,我该如何去寻求解决方案? 针对以上问题,我在这里赘述下个人的经验,给即将步入 Hadoop 行业的同学做个参考. 2.我学 Hadoop 后能从事什么岗位 目前 Hadoop 相关的工作大致分为三类:应用,运维,二次开发 2.1 应用 这方面的主要工作是编写MapReduce作业,利用Hive之类的套件来进

高可用Hadoop平台-应用JAR部署

1.概述 今天在观察集群时,发现NN节点的负载过高,虽然对NN节点的资源进行了调整,同时对NN节点上的应用程序进行重新打包调整,负载问题暂时得到 缓解.但是,我想了想,这样也不是长久之计.通过这个问题,我重新分析了一下以前应用部署架构图,发现了一些问题的所在,之前的部署架构是,将打包的应用 直接部署在Hadoop集群上,虽然这没什么不好,但是我们分析得知,若是将应用部署在DN节点,那么时间长了应用程序会不会抢占DN节点的资源,那么如 果我们部署在NN节点上,又对NN节点计算任务时造成影响,于是,