一个golang并行库源码解析

场景

有这样一种场景:四个任务A、B、C, D,其中任务B和C需要并发执行,得到结果1, 任务A执行得到结果2, 结果1和2作为任务D的参数传入,然后执行任务D得到最终结果。我们可以将任务执行顺序用如下图标识:

jobA  jobB   jobC
 \      \     /
  \      \   /
   \      middle
    \      /
     \    /
     jobD

这是一个典型的多任务并发场景,实际上随着任务数量的增多,任务逻辑会更加复杂,如何编写可维护健壮的逻辑代码变得十分重要,虽然golang提供了同步机制,但是需要写很多重复无用的Add/Wait/Done代码,而且代码可读性也很差,这是不能容忍的。

本文介绍一个开源的golang并行库,源码地址https://github.com/buptmiao/parallel

数据结构

1. parallel结构体

type Parallel struct {
        wg        *sync.WaitGroup
        pipes     []*Pipeline
        wgChild   *sync.WaitGroup
        children  []*Parallel
        exception *Handler
}

parallel定义了一个多任务并发实例,主要包括:并发任务管道(pipes)、子任务并发实例(children)、子任务实例等待锁(wgChild)、当前并发任务实例等待锁(wg)

2. pipeline结构体

type Pipeline struct {
        handlers []*Handler
}
type Handler struct {
        f    interface{}
        args []interface{}
        receivers []interface{}
}

这里pipeline实际上是一系列并发任务实例handler,每一个handler包括任务函数f, 传入参数args以及返回结果receivers

parallel相关代码

新建parallel实例

func NewParallel() *Parallel {
        res := new(Parallel)
        res.wg = new(sync.WaitGroup)
        res.wgChild = new(sync.WaitGroup)
        res.pipes = make([]*Pipeline, 0, 10)
        return res
}

注册handler

func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler {
        return p.NewPipeline().Register(f, args...)
}
func (p *Parallel) NewPipeline() *Pipeline {
        pipe := NewPipeline()
        p.Add(pipe)
        return pipe
}
func (p *Parallel) Add(pipes ...*Pipeline) *Parallel {
        p.wg.Add(len(pipes))
        p.pipes = append(p.pipes, pipes...)
        return p
}

新建子parallel实例

func (p *Parallel) NewChild() *Parallel {
        child := NewParallel()
        child.exception = p.exception
        p.AddChildren(child)
        return child
}
func (p *Parallel) AddChildren(children ...*Parallel) *Parallel {
        p.wgChild.Add(len(children))
        p.children = append(p.children, children...)
        return p
}

任务运行

func (p *Parallel) Run() {
        for _, child := range p.children {
                // this func will never panic
                go func(ch *Parallel) {
                        ch.Run()
                        p.wgChild.Done()
                }(child)
        }
        p.wgChild.Wait() //wait children instance done
        p.do() //run
        p.wg.Wait() //wait all job done
}
func (p *Parallel) do() {
        for _, pipe := range p.pipes {
                go p.Do()
        }
}

pipeline相关代码

新建pipeline实例

func NewPipeline() *Pipeline {
        res := new(Pipeline)
        return res
}

注册handler

func (p *Pipeline) Register(f interface{}, args ...interface{}) *Handler {
        h := NewHandler(f, args...)
        p.Add(h)
        return h
}

添加handler

func (p *Pipeline) Add(hs ...*Handler) *Pipeline {
        p.handlers = append(p.handlers, hs...)
        return p
}

任务运行

func (p *Pipeline) Do() {
        for _, h := range p.handlers {
                h.Do()
        }
}

handler相关代码

新建handler实例

func NewHandler(f interface{}, args ...interface{}) *Handler {
        res := new(Handler)
        res.f = f
        res.args = args
        return res
}

运行任务

func (h *Handler) Do() {
        f := reflect.ValueOf(h.f)
        typ := f.Type()
        //check if f is a function
        if typ.Kind() != reflect.Func {
                panic(ErrArgNotFunction)
        }
        //check input length, only check '>' is to allow varargs.
        if typ.NumIn() > len(h.args) {
                panic(ErrInArgLenNotMatch)
        }
        //check output length
        if typ.NumOut() != len(h.receivers) {
                panic(ErrOutArgLenNotMatch)
        }
        //check if output args is ptr
        for _, v := range h.receivers {
                t := reflect.ValueOf(v)
                if t.Type().Kind() != reflect.Ptr {
                        panic(ErrRecvArgTypeNotPtr)
                }
                if t.IsNil() {
                        panic(ErrRecvArgNil)
                }
        }

        inputs := make([]reflect.Value, len(h.args))
        for i := 0; i < len(h.args); i++ {
                if h.args[i] == nil {
                        inputs[i] = reflect.Zero(f.Type().In(i))
                } else {
                        inputs[i] = reflect.ValueOf(h.args[i])
                }
        }
        out := f.Call(inputs)

        for i := 0; i < len(h.receivers); i++ {
                v := reflect.ValueOf(h.receivers[i])
                v.Elem().Set(out[i])
        }
}

demo

package main

import "github.com/buptmiao/parallel"

func testJobA(x, y int) int {
        return x - y
}

func testJobB(x, y int) int {
        return x + y
}

func testJobC(x, y *int, z int) float64 {
        return float64((*x)*(*y)) / float64(z)
}

func main() {
        var x, y int
        var z float64

        p := parallel.NewParallel()

        ch1 := p.NewChild()
        ch1.Register(testJobA, 1, 2).SetReceivers(&x)

        ch2 := p.NewChild()
        ch2.Register(testJobB, 1, 2).SetReceivers(&y)

        p.Register(testJobC, &x, &y, 2).SetReceivers(&z)

        p.Run()

        if x != -1 || y != 3 || z != -1.5 {
                panic("unexpected result")
        }
}
时间: 2024-09-17 04:51:12

一个golang并行库源码解析的相关文章

Fresco源码解析 - 创建一个ImagePipeline(一)

在Fresco源码解析 - 初始化过程分析章节中, 我们分析了Fresco的初始化过程,两个initialize方法中都用到了 ImagePipelineFactory类.   ImagePipelineFactory.initialize(context); 会创建一个所有参数都使用默认值的ImagePipelineConfig来初始化ImagePipeline.   ImagePipelineFactory.initialize(imagePipelineConfig)会首先用 imageP

Android 中 SwipeLayout一个展示条目底层菜单的侧滑控件源码解析_Android

由于项目上的需要侧滑条目展示收藏按钮,记得之前代码家有写过一个厉害的开源控件 AndroidSwipeLayout 本来准备直接拿来使用,但是看过 issue 发现现在有不少使用者反应有不少的 bug ,而且代码家现在貌似也不进行维护了.故自己实现了一个所要效果的一个控件.因为只是实现我需要的效果,所以大家也能看到,代码里有不少地方我是写死的.希望对大家有些帮助.而且暂时也不需要 AndroidSwipeLayout 大而全的功能,算是变相给自己做的项目精简代码了. 完整示例代码请看:GitHu

Step.js 使用教程(附源码解析)

Step.js(https://github.com/creationix/step)是控制流程工具(大小仅 150 行代码),解决回调嵌套层次过多等问题.适用于读文件.查询数据库等回调函数相互依赖,或者分别获取内容最后组合数据返回等应用情景.异步执行简单地可以分为"串行执行"和"并行"执行,下面我们分别去看看. 串行执行 这个库只有一个方法 Step(fns...).Step 方法其参数 fns 允许是多个函数,这些函数被依次执行.Step 利用 this 对象指

Java集合学习(六) Vector详细介绍(源码解析)和使用示例

学完ArrayList和LinkedList之后,我们接着学习Vector.学习方式还是和之前一样,先对Vector有个整体认识,然后再学习它的源码:最后再通过实例来学会使用它. 第1部分 Vector介绍 Vector简介 Vector 是矢量队列,它是JDK1.0版本添加的类.继承于AbstractList,实现了List, RandomAccess, Cloneable这些接口. Vector 继承了AbstractList,实现了List:所以,它是一个队列,支持相关的添加.删除.修改.

Redux入坑进阶之源码解析

预热 redux 函数内部包含了大量柯里化函数以及代码组合思想 柯里化函数(curry) 通俗的来讲,可以用一句话概括柯里化函数:返回函数的函数 // example  const funcA = (a) => {    return const funcB = (b) => {      return a + b    }  };   上述的funcA函数接收一个参数,并返回同样接收一个参数的funcB函数. 柯里化函数有什么好处呢? 避免了给一个函数传入大量的参数--我们可以通过柯里化来构

Java集合学习(十七) TreeSet详细介绍(源码解析)和使用示例

这一章,我们对TreeSet进行学习. 我们先对TreeSet有个整体认识,然后再学习它的源码,最后再通过实例来学会使用TreeSet. 第1部分 TreeSet介绍 TreeSet简介 TreeSet 是一个有序的集合,它的作用是提供有序的Set集合.它继承于AbstractSet抽象类,实现了NavigableSet<E>, Cloneable, java.io.Serializable接口. TreeSet 继承于AbstractSet,所以它是一个Set集合,具有Set的属性和方法.

Java集合学习(十六) HashSet详细介绍(源码解析)和使用示例

这一章,我们对HashSet进行学习. 我们先对HashSet有个整体认识,然后再学习它的源码,最后再通过实例来学会使用HashSet. 第1部分 HashSet介绍 HashSet 简介 HashSet 是一个没有重复元素的集合. 它是由HashMap实现的,不保证元素的顺序,而且HashSet允许使用 null 元素. HashSet是非同步的.如果多个线程同时访问一个哈希 set,而其中至少一个线程修改了该 set,那么它必须 保持外部同步.这通常是通过对自然封装该 set 的对象执行同步

Java集合学习(十三) WeakHashMap详细介绍(源码解析)和使用示例

这一章,我们对WeakHashMap进行学习. 我们先对WeakHashMap有个整体认识,然后再学习它的源码,最后再通过实例来学会使用WeakHashMap. 第1部分 WeakHashMap介绍 WeakHashMap简介    WeakHashMap 继承于AbstractMap,实现了Map接口.    和HashMap一样,WeakHashMap 也是一个散列表,它存储的内容也是键值对(key-value)映射,而且键和值都可以是null.   不过WeakHashMap的键是"弱键&

Java集合学习(十二) TreeMap详细介绍(源码解析)和使用示例

这一章,我们对TreeMap进行学习. 第1部分 TreeMap介绍 TreeMap 简介 TreeMap 是一个有序的key-value集合,它是通过红黑树实现的. TreeMap继承于AbstractMap,所以它是一个Map,即一个key-value集合. TreeMap 实现了NavigableMap接口,意味着它支持一系列的导航方法.比如返回有序的key集合. TreeMap 实现了Cloneable接口,意味着它能被克隆. TreeMap 实现了java.io.Serializabl