Oozie分布式任务的工作流——Spark篇

Spark是现在应用最广泛的分布式计算框架,oozie支持在它的调度中执行spark。在我的日常工作中,一部分工作就是基于oozie维护好每天的spark离线任务,合理的设计工作流并分配适合的参数对于spark的稳定运行十分重要。

Spark Action

这个Action允许执行spark任务,需要用户指定job-tracker以及name-node。先看看语法规则:

语法规则

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
    ...
    <action name="[NODE-NAME]">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[SPARK SETTINGS FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <master>[SPARK MASTER URL]</master>
            <mode>[SPARK MODE]</mode>
            <name>[SPARK JOB NAME]</name>
            <class>[SPARK MAIN CLASS]</class>
            <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
            <spark-opts>[SPARK-OPTIONS]</spark-opts>
            <arg>[ARG-VALUE]</arg>
                ...
            <arg>[ARG-VALUE]</arg>
            ...
        </spark>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

prepare元素

它里面可以执行删除文件或者创建目录的操作,比如

<delete path="hdfs://xxxx/a"/>
<mkdir path="hdfs://xxxx"/>

一般来说,离线的spark任务最重都会生成一些数据,这些数据可能存储到数据库中,也可能直接存储到hdfs,如果存储到hdfs就涉及到清除目录了。比如你可能在测试环境需要频繁的重复运行spark任务,那么每次都需要清除目录文件,创建新的目录才行。

job-xml

spark 任务的参数也可以放在job-xml所在的xml中。

confugration

这里面的配置的参数将会传递给spark任务。

master

spark运行的模式,表示spark连接的集群管理器。默认可以使spark的独立集群(spark://host:port)或者是mesos(mesos://host:port)或者是yarn(yarn),以及本地模式local

mode

因为spark任务也可以看做主节点和工作节点模式,主节点就是驱动程序。这个驱动程序既可以运行在提交任务的机器,也可以放在集群中运行。

这个参数就是用来设置,驱动程序是以客户端的形式运行在本地机器,还是以集群模式运行在集群中。

name

spark应用的名字

class

spark应用的主函数

jar

spark应用的jar包

spark-opts

提交给驱动程序的参数。比如--conf key=value或者是在oozie-site.xml中配置的oozie.service.SparkConfiguationService.spark.configurations

arg

这个参数是用来提交给spark应用的参数

例子

官网给出的例子:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirstsparkjob">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
            </configuration>
            <master>local[*]</master>
            <mode>client<mode>
            <name>Spark Example</name>
            <class>org.apache.spark.examples.mllib.JavaALS</class>
            <jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
            <spark-opts>--executor-memory 20G --num-executors 50</spark-opts>
            <arg>inputpath=hdfs://localhost/input/file.txt</arg>
            <arg>value=2</arg>
        </spark>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

我自己工作时的例子:

<action name="NODE1">
    <spark xmlns="uri:oozie:spark-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <master>yarn</master>
        <mode>cluster</mode>
        <name>NODE1</name>
        <class>com.test.NODE1_Task</class>
        <jar>/lib/dw.jar</jar>
        <spark-opts>--executor-memory 1G --num-executors 6 --executor-cores 1 --conf spark.storage.memoryFraction=0.8</spark-opts>
        <arg>参数1</arg>
        <arg>参数2</arg>
        <arg>参数3</arg>
    </spark>
</action>

日志

spark action日志会重定向到oozie的mapr启动程序的stdout/stderr中。

通过oozie的web控制条,可以看到spark的日志。

spark on yarn

如果想要把spark运行在yarn上,需要按照下面的步骤执行:

  • 在spark action中加载spark-assembly包
  • 指定master为yarn-client或者yarn-master

为了确保spark工作在spark历史服务器中可以查到,需要保证在--conf中或者oozie.service.SparkConfiturationService.spark.configrations中设置下面的三个参数:

  • spark.yarn.historyServer.address=http://spark-host:18088
  • spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
  • spark.eventLog.enabled=true

spark action的schema

<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
           xmlns:spark="uri:oozie:spark-action:0.1" elementFormDefault="qualified"
           targetNamespace="uri:oozie:spark-action:0.1">    <xs:element name="spark" type="spark:ACTION"/>
    <xs:complexType name="ACTION">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="CONFIGURATION">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="PREPARE">
        <xs:sequence>
            <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DELETE">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="MKDIR">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
</xs:schema>

本文转自博客园xingoo的博客,原文链接:Oozie分布式任务的工作流——Spark篇,如需转载请自行联系原博主。

时间: 2024-10-24 18:37:23

Oozie分布式任务的工作流——Spark篇的相关文章

Oozie分布式任务的工作流——邮件篇

在大数据的当下,各种spark和hadoop的框架层出不穷.各种高端的计算框架,分布式任务如乱花般迷眼.你是否有这种困惑!--有了许多的分布式任务,但是每天需要固定时间跑任务,自己写个调度,既不稳定,又没有可靠的通知. 想要了解Oozie的基础知识,可以参考这里 那么你应该是在找--Oozie. Oozie是一款支持分布式任务调度的开源框架,它支持很多的分布式任务,比如map reduce,spark,sqoop,pig甚至shell等等.你可以以各种方式调度它们,把它们组成工作流.每个工作流节

Oozie分布式任务的工作流——Sqoop篇

Sqoop的使用应该是Oozie里面最常用的了,因为很多BI数据分析都是基于业务数据库来做的,因此需要把mysql或者oracle的数据导入到hdfs中再利用mapreduce或者spark进行ETL,生成报表信息. 因此本篇的Sqoop Action其实就是运行一个sqoop的任务而已. 同样action会等到sqoop执行成功后,才会执行下一个action.为了运行sqoop action,需要提供job-tracker,name-node,command或者arg元素. sqoop act

Oozie分布式任务的工作流——脚本篇

继前一篇大体上翻译了Email的Action配置,本篇继续看一下Shell的相关配置. Shell Action Shell Action可以执行Shell脚本命令,工作流会等到shell完全执行完毕后退出,再执行下一个节点.为了运行shell,必须配置job-tracker以及name-node,并且设置exec来执行shell. Shell既可以使用job-xml引用一个配置文件,也可以在shell action内直接配置.shell action中的配置会覆盖job-xml中的配置. EL

Oozie分布式工作流——Action节点

前篇讲述了下什么是流控制节点,本篇继续来说一下什么是 Action Nodes操作节点.Action节点有一些比较通用的特性: Action节点是远程的 所有oozie创建的计算和处理任务都是异步的,没有任何应用是工作在oozie内部的.基本上都是创建一个oozie任务,oozie任务会以map的形式,在各个节点再创建相应的任务.因此当你执行spark任务的时候,就会发现yarn集群监控列表里面会同时有两个任务出现. Action节点是异步的 oozie创建的任务都是异步的,对于大多数的任务来说

Oozie分布式工作流——流控制

最近又开始捅咕上oozie了,所以回头还是翻译一下oozie的文档.文档里面最重要就属这一章了--工作流定义. 一提到工作流,首先想到的应该是工作流都支持哪些工作依赖关系,比如串式的执行,或者一对多,或者多对一,或者条件判断等等.Oozie在这方面支持的很好,它把节点分为控制节点和操作节点两种类型,控制节点用于控制工作流的计算流程,操作节点用于封装计算单元.本篇就主要描述下它的控制节点... 背景 先看看oozie工作流里面的几个定义: action,一个action是一个独立的任务,比如map

Oozie分布式工作流——从理论和实践分析使用节点间的参数传递

Oozie支持Java Action,因此可以自定义很多的功能.本篇就从理论和实践两方面介绍下Java Action的妙用,另外还涉及到oozie中action之间的参数传递. 本文大致分为以下几个部分: Java Action教程文档 自定义Java Action实践 从源码的角度讲解Java Action与Shell Action的参数传递. 如果你即将或者想要使用oozie,那么本篇的文章将会为你提供很多参考的价值. Java Action文档 java action会自动执行提供的jav

大数据基础知识问答----spark篇,大数据生态圈

Spark相关知识点 1.Spark基础知识 1.Spark是什么? UCBerkeley AMPlab所开源的类HadoopMapReduce的通用的并行计算框架 dfsSpark基于mapreduce算法实现的分布式计算,拥有HadoopMapReduce所具有的优点:但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法. 2.Spark与Hadoop的对比(Spar

大数据学习之路(持续更新中...)

在16年8月份至今,一直在努力学习大数据大数据相关的技术,很想了解众多老司机的学习历程.因为大数据涉及的技术很广需要了解的东西也很多,会让很多新手望而却步.所以,我就在自己学习的过程中总结一下学到的内容以及踩到的一些坑,希望得到老司机的指点和新手的借鉴. 前言 在学习大数据之前,先要了解他解决了什么问题,能给我们带来什么价值.一方面,以前IT行业发展没有那么快,系统的应用也不完善,数据库足够支撑业务系统.但是随着行业的发展,系统运行的时间越来越长,搜集到的数据也越来越多,传统的数据库已经不能支撑

《Spark大数据分析实战》——1.4节弹性分布式数据集

1.4 弹性分布式数据集 本节将介绍弹性分布式数据集RDD.Spark是一个分布式计算框架,而RDD是其对分布式内存数据的抽象,可以认为RDD就是Spark分布式算法的数据结构,而RDD之上的操作是Spark分布式算法的核心原语,由数据结构和原语设计上层算法.Spark最终会将算法(RDD上的一连串操作)翻译为DAG形式的工作流进行调度,并进行分布式任务的分发.1.4.1 RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Da