Pig源码分析: 逻辑执行计划模块

Whole View

本文分析的是Pig Logical模块的代码(newplan package下),具体每种逻辑执行的实现类不会做具体分析。

Architecture

关键类/接口关系图

下面对关键类/接口具体实现做分析

Operator

public abstract class Operator {
    protected SourceLocation location; // The location of the operator in the original pig script.

    protected String name;
    protected OperatorPlan plan; // plan that contains this operator
    protected Map<String, Object> annotations;
    protected final int hashPrime = 31;

Operator的变量:

对name和plan提供get函数,构造函数传入name和plan。

对annotations提供get,annote,remove方法,来得到、添加、移除注释。

对locaiton提供get和set函数,且构造函数new SourceLocation()。

Operstor抽象方法:

主要方法为accept(PlanVisitor v),在PlanWalker里常用到。

还提供一个isEqual方法。

 

继承结构

主要看两类实现 LogicalExpression和LogicalRelationalOperator

LogicalExpression

public abstract class LogicalExpression extends Operator {

    static long nextUid = 1;
    protected LogicalSchema.LogicalFieldSchema fieldSchema;
    protected LogicalSchema.LogicalFieldSchema uidOnlyFieldSchema;

deepCopy()方法需要子类实现

 

继承结构:

子类实现略

LogicalRelationalOperator

LogicalRelationalOperator代表关系型操作,关系型操作有Schema。以下是主要变量,LogicalRelationalOperator为他们提供了一些get/set方法。

abstract public class LogicalRelationalOperator extends Operator {

    protected LogicalSchema schema;
    protected int requestedParallelism;
    protected String alias;
    protected int lineNum;

    /**
     * Name of the customPartitioner if one is used, this is set to null otherwise.
     */
    protected String mCustomPartitioner = null;

    /**
     * A HashSet to indicate whether an option (such a Join Type) was pinned
     * by the user or can be chosen at runtime by the optimizer.
     */
    protected HashSet<Integer> mPinnedOptions = new HashSet<Integer>();

关于LogicalSchema类:

内部类LogicalFieldSchema具体表示每一个field的结构,可以看到与LogicalSchema是嵌套的。LogicalSchema维护一个List<LogicalFieldSchema>

    public static class LogicalFieldSchema {
        public String alias;
        public byte type;
        public long uid;
        public LogicalSchema schema;

提供基本方法如下:

除了基本方法外,还提供一套merge schema的方法

继承结构:

子类实现略。

OperatorPlan

OperatorPlan是一个接口,定义了对Operator的图操作(Graph Operations)。

罗列了所有方法之后发现,Operator类虽然没有结构,只是一个普通的VO类。但是OperatorPlan这个接口定义的以下这套图操作,使Operstor与Operator组成了一个Graph。

实现结构:

下面展开分析。

OperatorSubPlan

OperatorSubPlan代表的是一个OperatorPlan的一个子集的视图,OperatorSubPlan只有一个实现,使用在Rule的match过程里。所以OperatorSubPlan的作用就是提供一个子Plan,用于匹配操作。

BaseOperatorPlan

BaseOperatorPlan实现了OperatorPlan接口,具体实现了各个图操作方法,把Operator之间的关系(包括softLink关系)用PlanEdge表示,图操作方法都借助PlanEdge类表达和实现。

public abstract class BaseOperatorPlan implements OperatorPlan {

    protected List<Operator> ops;
    protected PlanEdge fromEdges;
    protected PlanEdge toEdges;
    protected PlanEdge softFromEdges;
    protected PlanEdge softToEdges;

    private List<Operator> roots;
    private List<Operator> leaves;

比如:

toEdges.get(op)                  返回op的前辈

toEdges.get(op)==null 
     
的op为root

fromEdges.get(op)             返回op的后辈

fromEdges.get(op)==null  的op为leave

PlanEdge类的实现:

public class PlanEdge extends MultiMap<Operator, Operator>

这里的MultiMap是Pig自己的工具类,Pig表示不使用Apache common的MultiMap是因为不支持序列化。

public class MultiMap<K, V> implements Serializable {

    // Change this if you modify the class.
    static final long serialVersionUID = 2L;

    protected Map<K, ArrayList<V>> mMap = null;

    public MultiMap() {
        mMap = new HashMap<K, ArrayList<V>>();
    }

因为MultiMap的value部分使用的是ArrayList,所以使得某些图操作支持position信息,如:

public Pair<Integer, Integer> disconnect(Operator from, Operator to)
public void connect(Operator from, int fromPos, Operator to, int toPos)

除了实现图操作方法外,BaseOperatorPlan还提供了explain()方法,子类会使用dpumper或printer来打印输出Operators层次结构。

继承结构:

主要看下LogicalPlan和LogicalExpressionPlan两个实现类。

LogicalPlan

LogicalPlan只包含关系型操作,也就是说涉及到的Operator都是LogicalRelationalOperator。

 

explain()方法既支持LogicalPlanPrinter的visit实现,也支持DotLOPrinter的dpump实现。

LogicalPlanPrinter是PlanVisitor的子类, LogicalPlanPrinter内部有一个PrintStream,在visit()过程中边遍历,边记录。

DotLOPrinter是DotPlanDumper的子类,DotPlanDumper是PlanDumper的子类,根据graphviz的dot algorithm,输出符合DOT格式的plan。

LogicalExpressionPlan

LogicalExpressionPlan处理的是LogicalExpressionOperators。

 

explain()方法借助LogicalPlanPrinter实现

PlanVisitor

访问者机制,用于操作一个plan。

内部有一个PlanWalker双向队列,PlanWalker会按照某种顺序遍历访问传入的OperatorPlan,让plan的每个operation accept该Visitor。

PlanVisitor可以进行push和pop walker的操作。visit()方法调用的是walker.walk(this)方法。

继承结构很可观

主要看LogicalExpressionVisitor、LogicalRelationalNodesVisitor这两大体系。前者访问expression plans,后者访问logical plans。

LogicalExpressionVisitor

LogicalExpressionVisitor初始化的时候会判断传入的OperatorPlan是否是LogicalExpressionPlan的子类。visit()方法们通过多态,接受LogicalExpression的子类。

LogicalRelationalNodesVisitor

LogicalRelationalNodesVisitor接受的OperatorPlan必须每个operator都是LogicalRelationalOperator的子类(初始化的时候会得到operator iterator对每个进行校验,不满足就抛异常)。visit()方法们通过多态,接受LogicalRelationalOperator的实现子类。

PlanWalker

PlanWalker提供的是遍历访问一个plan的能力。

PlanWalker的子类主要实现两个方法:

public abstract void walk(PlanVisitor visitor) throws FrontendException;

public abstract PlanWalker spawnChildWalker(OperatorPlan plan);

walk()方法在子类的实现中,会以不同的顺序遍历plan,最后的结果是遍历到的节点Operator会调op.accept(visitor)接受本Visitor。

继承结构

接下来具体介绍各子类遍历能力的实现。

DependencyOrderWalker

DependencyOrderWalker按照依赖顺序访问plan,即一个node被访问的前提是它的前辈们已经被访问过了。这个访问顺序相当于,按照拓扑顺序访问图上的节点。

@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new DependencyOrderWalker(plan);
}

walk()方法通过plan.getSinks()方法得到所有的leave节点,即没有后辈的节点,然后遍历他们,获取每个节点的所有前辈,再递归前辈的前辈,从而实现把所有的节点都访问一遍,最后得到结果就是一个FIFO的List。代码里的这个Graph依赖遍历的方式很不高效,但是因为访问的图的节点少,所以可接受。

递归的过程如下

    protected void doAllPredecessors(Operator node,
                                   Set<Operator> seen,
                                   Collection<Operator> fifo) throws FrontendException {
        if (!seen.contains(node)) {
            // We haven't seen this one before.
            Collection<Operator> preds = Utils.mergeCollection(plan.getPredecessors(node), plan.getSoftLinkPredecessors(node));
            if (preds != null && preds.size() > 0) {
                // Do all our predecessors before ourself
                for (Operator op : preds) {
                    doAllPredecessors(op, seen, fifo);
                }
            }
            // Now do ourself
            seen.add(node);
            fifo.add(node);
        }
    }

DepthFirstWalk

DepthFirstWalker是深度优先遍历(由上而下的深度优先)

@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new DepthFirstWalker(plan);
}

walk()方法通过plan.getSources()得到所有的root节点,然后遍历他们,遍历的时候获取他们的所有后辈,递归遍历。

递归过程如下:

    private void depthFirst(Operator node,
                            Collection<Operator> successors,
                            Set<Operator> seen,
                            PlanVisitor visitor) throws FrontendException {
        if (successors == null) return;

        for (Operator suc : successors) {
            if (seen.add(suc)) {
                suc.accept(visitor);
                Collection<Operator> newSuccessors = Utils.mergeCollection(plan.getSuccessors(suc), plan.getSoftLinkSuccessors(suc));
                depthFirst(suc, newSuccessors, seen, visitor);
            }
        }
    }

PreOrderDepthFirstWalker

PreOrderDepthFirstWalker即前序深度优先(由下而上的深度优先)

子Walker是深度优先

public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new DepthFirstWalker(plan);
}

walk()方法是通过plan.getSinks()得到所有leave节点,然后遍历每个leave节点,获得他的前辈,并递归进行深度优先(向上)遍历。

递归操作如下:

    private void depthFirst(Operator node, Collection<Operator> predecessors, Set<Operator> seen,
            PlanVisitor visitor) throws FrontendException {
        if (predecessors == null)
            return;

        boolean thisBranchFlag = branchFlag;
        for (Operator pred : predecessors) {
            if (seen.add(pred)) {
                branchFlag = thisBranchFlag;
                pred.accept(visitor);
                Collection<Operator> newPredecessors = Utils.mergeCollection(plan.getPredecessors(pred), plan.getSoftLinkPredecessors(pred));
                depthFirst(pred, newPredecessors, seen, visitor);
            }
        }
    }

ReserveDependencyOrderWalker

ReserveDependencyOrderWalker是逆向的依赖顺序遍历,即一个节点访问之后才能访问它依赖的节点,即N节点要想被访问,需要依赖N节点的节点先被访问。

@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new ReverseDependencyOrderWalker(plan);
}

walk()方法的访问模式类似DependencyOrderWalker,区别在于先获得所有的root节点,然后进行遍历操作,遍历root节点的所有后辈,递归后辈的后辈,使root节点最后访问。

递归如下:

    protected void doAllSuccessors(Operator node,
                                   Set<Operator> seen,
                                   Collection<Operator> fifo) throws FrontendException {
        if (!seen.contains(node)) {
            // We haven't seen this one before.
            Collection<Operator> succs = Utils.mergeCollection(plan.getSuccessors(node), plan.getSoftLinkSuccessors(node));
            if (succs != null && succs.size() > 0) {
                // Do all our successors before ourself
                for (Operator op : succs) {
                    doAllSuccessors(op, seen, fifo);
                }
            }
            // Now do ourself
            seen.add(node);
            fifo.add(node);
        }
    }

ReverseDependencyOrderWalkerWOSeenChk

ReverseDependencyOrderWalkerWOSeenChk也是逆向的依赖顺序遍历,同ReserveDependencyOrderWalker一样。

 

子Walker是ReserveDependencyOrderWalker

@Override
public PlanWalker spawnChildWalker(OperatorPlan plan) {
return new ReverseDependencyOrderWalker(plan);
}

walk()方法和ReserveDependencyOrderWalker的区别在于,每次遍历的时候不记录一个seen的Set<Operator>集。

全文完 :)

时间: 2024-08-02 04:21:33

Pig源码分析: 逻辑执行计划模块的相关文章

Pig源码分析: 逻辑执行计划优化

Whole View 本文分析的是逻辑执行计划优化的代码结构,具体每种Rule的实现不做分析. 看本文之前最好参考之前那篇逻辑执行计划模型的文章. Architecture 几个关键类/接口的关系: 每个关键类/接口的实现和继承结构在下面各节展开. Optimizer PlanOptimizer是抽象类,主要和Rule.PlanTransformListener.OperatorPlan打交道. public abstract class PlanOptimizer { protected Li

Pig源码分析: 简析执行计划的生成

摘要 本文通过跟代码的方式,分析从输入一批Pig-latin到输出物理执行计划(与launcher引擎有关,一般是MR执行计划,也可以是Spark RDD的执行算子)的整体流程. 不会具体涉及AST如何解析.如何使用了Anltr.逻辑执行计划如何映射.逻辑执行计划如何优化.MR执行计划如何切分为MR Job,而是从输入一批Pig DSL到待执行的真正执行计划的关键变化步骤(方法和类). 执行计划完整解析 入口处书Main类的main函数 /** * The Main-Class for the

Hadoop2源码分析-HDFS核心模块分析

1.概述 这篇博客接着<Hadoop2源码分析-RPC机制初识> 来讲述,前面我们对MapReduce.序列化.RPC进行了分析和探索,对Hadoop V2的这些模块都有了大致的了解,通过对这些模块的研究,我们明白了MapReduce的运行流程以及内部的实现机制,Hadoop的序列化以及它的通信 机制(RPC).今天我们来研究另一个核心的模块,那就是Hadoop的分布式文件存储系统--HDFS,下面是今天分享的内容目录: HDFS简述 NameNode DataNode 接下来,我们开始今天的

Django源码分析之执行入口

魔法门 一般我们启动django,最简单的方法是进入project 目录,这时目录结构是这样的 然后我们执行python manage.py runserver,程序就开始执行了. 那django是如何从一个命令就启动整个server,启动的流程是如何的? 踏门而入 打开目录下的manage.py,内容是这样的: #!/usr/bin/env python import os import sys if __name__ == "__main__": os.environ.setdef

Spring源码分析——JdbcTemplate执行批量insert操作

最近用到一个方法: @Override public int[] batchUpdate(String sql, final BatchPreparedStatementSetter pss) throws DataAccessException { if (logger.isDebugEnabled()) { logger.debug("Executing SQL batch update [" + sql + "]"); } return execute(sql

Nginx源码分析-Epoll模块

Linux平台上,Nginx使用epoll完成事件驱动,实现高并发:本文将不对epoll本身进行介绍(网上一堆一堆的文章介绍epoll的原理及使用方法,甚至源码分析等),仅看一下Nginx是如何使用epoll的. Nginx在epoll模块中定义了好几个函数,这些函数基本都是作为回调注册到事件抽象层的对应接口上,从而实现了事件驱动的具体化,我们看如下的一段代码: ngx_event_module_t ngx_epoll_module_ctx = { &epoll_name, ngx_epoll_

Spark SQL Columnar模块源码分析

概述 本文介绍Spark SQL增加的Columnar模块代码实现. 首先介绍Columnar内的代码结构和实现,然后介绍在SqlContext里的使用方式. Columnar InMemoryColumnarTableScan 实现 InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划. private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attri

Appium Android Bootstrap源码分析之命令解析执行

通过上一篇文章<Appium Android Bootstrap源码分析之控件AndroidElement>我们知道了Appium从pc端发送过来的命令如果是控件相关的话,最终目标控件在bootstrap中是以AndroidElement对象的方式呈现出来的,并且该控件对象会在AndroidElementHash维护的控件哈希表中保存起来.但是appium触发一个命令除了需要提供是否与控件相关这个信息外,还需要其他的一些信息,比如,这个是什么命令?这个就是我们这篇文章需要讨论的话题了. 下面我

Spark 源码分析 -- task实际执行过程

Spark源码分析 – SparkContext 中的例子, 只分析到sc.runJob 那么最终是怎么执行的? 通过DAGScheduler切分成Stage, 封装成taskset, 提交给TaskScheduler, 然后等待调度, 最终到Executor上执行 val sc = new SparkContext(--) val textFile = sc.textFile("README.md") textFile.filter(line => line.contains(