Oozie分布式工作流——从理论和实践分析使用节点间的参数传递

Oozie支持Java Action,因此可以自定义很多的功能。本篇就从理论和实践两方面介绍下Java Action的妙用,另外还涉及到oozie中action之间的参数传递。

本文大致分为以下几个部分:

  • Java Action教程文档
  • 自定义Java Action实践
  • 从源码的角度讲解Java Action与Shell Action的参数传递。

如果你即将或者想要使用oozie,那么本篇的文章将会为你提供很多参考的价值。

Java Action文档

java action会自动执行提供的java classpublic static void main方法, 并且会在hadoop集群启动一个单独的map-reduce的map任务来执行的。因此,如果你自定义了一个java程序,它会提交到集群的某一个节点执行,不会每个节点都执行一遍。

workflow任务会等待java程序执行完继续执行下一个action。当java类正确执行退出后,将会进入ok控制流;当发生异常时,将会进入error控制流。Java程序绝对不能使用System.exit(int n)将会导致action进入error控制流。

在action的配置中,也支持EL表达式。并且使用<capture-output>也可以把数据输出出来,然后后面的action就可以基于EL表达式使用了。

语法规则

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <java>
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[JOB-XML]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <main-class>[MAIN-CLASS]</main-class>
            <java-opts>[JAVA-STARTUP-OPTS]</java-opts>
            <arg>ARGUMENT</arg>
            ...
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
            <capture-output />
        </java>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

prepare元素,支持创建或者删除指定的文件内容。在delete时,支持通配的方式指定特定的路径。java-opts以及java-opt参数提供了执行java应用时分配的JVM。

举个例子:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirstjavajob">
        <java>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.queue.name</name>
                    <value>default</value>
                </property>
            </configuration>
            <main-class>org.apache.oozie.MyFirstMainClass</main-class>
            <java-opts>-Dblah</java-opts>
            <arg>argument1</arg>
            <arg>argument2</arg>
        </java>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

覆盖Main方法

oozie中的很多action都支持这个功能,在configure中指定classpath下的一个类方法,它会覆盖当前action的main方法。这在不想重新编译jar包,而想替换程序时,非常有用。

自定义Java action程序以及部署

Java程序可以任意定义,比如写一个最简单的hellword,然后打包成lib。

然后需要定义oozie脚本:

<action name="java-7cbb">
    <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>default</value>
                </property>
            </configuration>
            <main-class>a.b.c.Main</main-class>
            <arg>arg1</arg>
            <arg>arg2</arg>
            <file>/oozie/lib/ojdbc7.jar#ojdbc7.jar</file>
            <capture-output/>
        </java>
        <ok to="end"/>
        <error to="Kill"/>
    </action>

其中几个比较重要的属性,千万不能拉下:

  • 1 需要指定Map-reduce的队列:mapred.job.queue.name
  • 2 指定Main class<main-class>
  • 3 如果依赖其他的jar,需要添加<file>
  • 4 如果想要捕获输出,需要设置<capture-output>

如果使用HUE图形化配置,就比较简单了:

点击右上角的齿轮,配置其他的属性信息:

基于源码分析参数传递

先从表象来说一下shell action如何传递参数:

你只需要定义一个普通的shell,在里面使用echo把属性输出出来即可,后面的action自动就可以基于EL表达式使用。

test='test123'
echo "test=$test"

这样后面的action就可以直接使用了:

${wf:actionData('action-name').test}或者${wf:actionData('action-name')['test']}

很简单是吧!

在Java里面就没这么容易了:

无论是 System.out.println() 还是 logger.info/error,都无法捕获到数据

上网找了一篇文章,备受启发

从中抄了一段代码:

private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
...
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
        if (oozieProp != null) {
            File propFile = new File(oozieProp);
            Properties props = new Properties();
            props.setProperty(propKey0, propVal0);
            props.setProperty(propKey1, propVal1);
            OutputStream os = new FileOutputStream(propFile);
            props.store(os, "");
            os.close();
        } else
            throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined");

果然就好用了....

为了理解其中的缘由,我们来看看代码。首先在shell action中发现一句话:

<<< Invocation of Main class completed <<<

Oozie Launcher, capturing output data:
=======================

于是全局搜索,果然找到对应的代码,在org.apache.oozie.action.hadoop.LuancherMapper.java中,line275开始:

if (errorMessage == null) {
    handleActionData();
    if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
        System.out.println();
        System.out.println("Oozie Launcher, capturing output data:");
        System.out.println("=======================");
        System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
        System.out.println();
        System.out.println("=======================");
        System.out.println();
    }
    。。。
}

这里的actionData其实就是个普通的MAP

private Map<String,String> actionData;

public LauncherMapper() {
    actionData = new HashMap<String,String>();
}

Map里面保存了很多属性值,其中就包括我们想要捕获的输出内容:

    static final String ACTION_PREFIX = "oozie.action.";
    static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";

    ...
    String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);
    if (outputProp != null) {
        File actionOutputData = new File(outputProp);
        if (actionOutputData.exists()) {
            int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
            actionData.put(ACTION_DATA_OUTPUT_PROPS,
                getLocalFileContentStr(actionOutputData, "Output", maxOutputData));
        }
    }
    ....

    public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException {
        StringBuffer sb = new StringBuffer();
        FileReader reader = new FileReader(file);
        char[] buffer = new char[2048];
        int read;
        int count = 0;
        while ((read = reader.read(buffer)) > -1) {
            count += read;
            if (maxLen > -1 && count > maxLen) {
                throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]");
            }
            sb.append(buffer, 0, read);
        }
        reader.close();
        return sb.toString();
    }

可以看到其实就是从oozie.action.output.properties指定的目录里面去读内容,然后输出出来,后面的action就可以用了。这就是为什么上面抄的那段代码可以使用的原因。

那么问题是,shell为什么直接echo就行,java里面却要这么费劲?

别急,先来看看java action的启动逻辑:

    public static void main(String[] args) throws Exception {
        run(JavaMain.class, args);
    }

    @Override
    protected void run(String[] args) throws Exception {
        ...
        Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
        ...
        Method mainMethod = klass.getMethod("main", String[].class);
        try {
            mainMethod.invoke(null, (Object) args);
        } catch(InvocationTargetException ex) {
            // Get rid of the InvocationTargetException and wrap the Throwable
            throw new JavaMainException(ex.getCause());
        }
    }

它什么也没做,就是启动了目标类的main方法而已。

再来看看shell:

    private int execute(Configuration actionConf) throws Exception {
        ...
        //判断是否要捕获输出
        boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);

        //执行命令
        Process p = builder.start();

        //处理进程
        Thread[] thrArray = handleShellOutput(p, captureOutput);

        ...
        return exitValue;
    }
    protected Thread[] handleShellOutput(Process p, boolean captureOutput)
            throws IOException {
        BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
        BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));

        // 捕获标准输出
        OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
        thrStdout.setDaemon(true);
        thrStdout.start();

        OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
        thrStderr.setDaemon(true);
        thrStderr.start();

        return new Thread[]{ thrStdout, thrStderr };
    }

    class OutputWriteThread extends Thread {
        ...

        @Override
        public void run() {
            String line;
            BufferedWriter os = null;

            //读取数据保存在目标文件中

            try {
                if (needCaptured) {
                    File file = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
                    os = new BufferedWriter(new FileWriter(file));
                }
                while ((line = reader.readLine()) != null) {
                    if (isStdout) { // For stdout
                        // 1. Writing to LM STDOUT
                        System.out.println("Stdoutput " + line);
                        // 2. Writing for capture output
                        if (os != null) {
                            if (Shell.WINDOWS) {
                                line = line.replace("\\u", "\\\\u");
                            }
                            os.write(line);
                            os.newLine();
                        }
                    }
                    else {
                        System.err.println(line); // 1. Writing to LM STDERR
                    }
                }
            }
            catch (IOException e) {
                ...
            }finally {
                ...
            }
        }
    }

这样就很清晰了,shell自动帮我们把输出的内容写入了oozie.action.output.properties文件中。而在java中则需要用户自己来定义写入的过程。

后续将会介绍一下oozie中比较高级的用法——EL表达式

本文转自博客园xingoo的博客,原文链接:Oozie分布式工作流——从理论和实践分析使用节点间的参数传递,如需转载请自行联系原博主。

时间: 2024-07-29 01:35:18

Oozie分布式工作流——从理论和实践分析使用节点间的参数传递的相关文章

Oozie分布式工作流——Action节点

前篇讲述了下什么是流控制节点,本篇继续来说一下什么是 Action Nodes操作节点.Action节点有一些比较通用的特性: Action节点是远程的 所有oozie创建的计算和处理任务都是异步的,没有任何应用是工作在oozie内部的.基本上都是创建一个oozie任务,oozie任务会以map的形式,在各个节点再创建相应的任务.因此当你执行spark任务的时候,就会发现yarn集群监控列表里面会同时有两个任务出现. Action节点是异步的 oozie创建的任务都是异步的,对于大多数的任务来说

Oozie分布式工作流——流控制

最近又开始捅咕上oozie了,所以回头还是翻译一下oozie的文档.文档里面最重要就属这一章了--工作流定义. 一提到工作流,首先想到的应该是工作流都支持哪些工作依赖关系,比如串式的执行,或者一对多,或者多对一,或者条件判断等等.Oozie在这方面支持的很好,它把节点分为控制节点和操作节点两种类型,控制节点用于控制工作流的计算流程,操作节点用于封装计算单元.本篇就主要描述下它的控制节点... 背景 先看看oozie工作流里面的几个定义: action,一个action是一个独立的任务,比如map

Oozie分布式任务的工作流——脚本篇

继前一篇大体上翻译了Email的Action配置,本篇继续看一下Shell的相关配置. Shell Action Shell Action可以执行Shell脚本命令,工作流会等到shell完全执行完毕后退出,再执行下一个节点.为了运行shell,必须配置job-tracker以及name-node,并且设置exec来执行shell. Shell既可以使用job-xml引用一个配置文件,也可以在shell action内直接配置.shell action中的配置会覆盖job-xml中的配置. EL

Oozie分布式任务的工作流——邮件篇

在大数据的当下,各种spark和hadoop的框架层出不穷.各种高端的计算框架,分布式任务如乱花般迷眼.你是否有这种困惑!--有了许多的分布式任务,但是每天需要固定时间跑任务,自己写个调度,既不稳定,又没有可靠的通知. 想要了解Oozie的基础知识,可以参考这里 那么你应该是在找--Oozie. Oozie是一款支持分布式任务调度的开源框架,它支持很多的分布式任务,比如map reduce,spark,sqoop,pig甚至shell等等.你可以以各种方式调度它们,把它们组成工作流.每个工作流节

Oozie分布式任务的工作流——Sqoop篇

Sqoop的使用应该是Oozie里面最常用的了,因为很多BI数据分析都是基于业务数据库来做的,因此需要把mysql或者oracle的数据导入到hdfs中再利用mapreduce或者spark进行ETL,生成报表信息. 因此本篇的Sqoop Action其实就是运行一个sqoop的任务而已. 同样action会等到sqoop执行成功后,才会执行下一个action.为了运行sqoop action,需要提供job-tracker,name-node,command或者arg元素. sqoop act

Oozie分布式任务的工作流——Spark篇

Spark是现在应用最广泛的分布式计算框架,oozie支持在它的调度中执行spark.在我的日常工作中,一部分工作就是基于oozie维护好每天的spark离线任务,合理的设计工作流并分配适合的参数对于spark的稳定运行十分重要. Spark Action 这个Action允许执行spark任务,需要用户指定job-tracker以及name-node.先看看语法规则: 语法规则 <workflow-app name="[WF-DEF-NAME]" xmlns="uri

大型分布式网站架构设计与实践《概述与大纲》

大型分布式网站架构设计与实践 在大型网站架构的演变过程中,集中式的架构设计出于对系统的可扩展性,可维护性,成本等多方面因素的考虑,逐渐被放弃. 分布式架构的核心思想是采用大量廉价的PC Server ,构建一个低成本,高可用,高可扩展,高吞吐的集群系统,以支撑海量用户的访问和数据存储,理论上具备无限的扩展能力. 分布式系统的设计,是一门复杂的学问,它设计通讯协议,远程调用,服务治理,系统安全,存储,搜索,监控,稳定性保障,性能优化,数据分析,数据挖掘等各个领域. 对任何一个领域的深入挖掘,都能写

Microsoft NLayerApp“.NET研究”案例理论与实践 - 项目简介与环境搭建

项目简介 Microsoft – Spain团队有一个很不错的面向领域多层分布式项目案例:Microsoft – Domain Oriented N-Layered .NET 4.0 App Sample(在本系列文章中,我使用NLayerApp作为该项目的名称进行介绍),在codeplex上的地址是:http://microsoftnlayerapp.codeplex.com/. 它是学习领域驱动设计(DDD)的一个非常不错的案例项目.该项目采用的是经典的DDD架构,而不是CQRS架构,但我觉

Microsoft NLayerApp案例理论与实践 - 项目简“.NET研究”介与环境搭建

项目简介 Microsoft – Spain团队有一个很不错的面向领域多层分布式项目案例:Microsoft – Domain Oriented N-Layered .NET 4.0 App Sample(在本系列文章中,我使用NLayerApp作为该项目的名称进行介绍),在codeplex上的地址是:http://microsoftnlayerapp.codeplex.com/. 它是学习领域驱动设计(DDD)的一个非常不错的案例项目.该项目采用的是经典的DDD架构,而不是CQRS架构,但我觉