泛函编程(35)-泛函Stream IO:IO处理过程-IO Process

    IO处理可以说是计算机技术的核心。不是吗?使用计算机的目的就是希望它对输入数据进行运算后向我们输出计算结果。所谓Stream IO简单来说就是对一串按序相同类型的输入数据进行处理后输出计算结果。输入数据源可能是一串键盘字符、鼠标位置坐标、文件字符行、数据库纪录等。如何实现泛函模式的Stream IO处理则是泛函编程不可或缺的技术。

首先,我们先看一段较熟悉的IO程序:

1 import java.io._
 2 def linesGt4k(fileName: String): IO[Boolean] = IO {
 3     val src = io.Source.fromFile(fileName)
 4     try {
 5       var count = 0
 6       val lines: Iterator[String] = src.getLines
 7       while (count <= 4000 && lines.hasNext) {
 8          lines.next
 9        count += 1
10       }
11       count > 4000
12     } finally src.close
13 }                                                 //> linesGt4k: (fileName: String)fpinscala.iomonad.IO[Boolean]

以上例子里有几项是值得提倡的:使用完文件后及时关闭,防止资源流露、没有一次性将整个文件载入内存而是逐行读取文件内容,节省内存资源。虽然整个过程是包嵌在IO类型内,但操作代码直接产生副作用。很明显,起码IO处理过程是由非纯代码组成的,无法实现函数组合,既是无法实现泛函编程的通过重复使用组件灵活组合功能的特点了。可以相像,我们在泛函Stream IO编程中将会通过许多细小组件的各式组合来实现多样性的IO计算功能。

实际上我们想使用以下模式的表达式:

 1 object examples {
 2 //假设我们已经获取了这个Stream[String]
 3     val lines: Stream[String] = sys.error("defined elsewhere!")
 4 //无论40k或者其它数量都很容易得取。只要换个数字就行了
 5     val lgt40k = lines.zipWithIndex.exists(_._2 + 1 >= 40000)
 6 //把空行过滤掉
 7     val lgt40k2 = lines.filter(! _.trim.isEmpty).zipWithIndex.exists(_._2 + 1 >= 40000)
 8 //在40k行内检查是否存在连续11行第一个字母组合为abracadabra
 9     val lgt40k3 = lines.take(40000).map(_.head).indexOfSlice("abracadabra".toList)
10 }

以上代码充分显示了我们所追求的泛函编程模式:简洁、灵活、优雅。

不过,这个Stream[String]就不是表面上那么容易得到的了。我们先把它放一放。

我们现在可以先分析一下泛函Stream IO编程原理。泛函编程的精髓就是把一个程序分解成许多纯代码组件,然后通过各种搭配组合来实现程序整体功能。那么对于Stream IO计算我们希望能先实现那些纯代码的基本组件然后把它们组合起来。我们可以把Stream IO处理过程想象成连成一串的电视信号处理盒子:每个盒子具备一项信号转换或者过滤功能,我们将一个盒子的输出端子接入另一个盒子的输入端子使信号可以经过一连串的处理过程最终输出我们要求的信号。我们可以用一个IO处理过程代表一个信号转换盒子。它的款式是这样的;Process[I,O]。最终的IO程序就是一连串Process[I,O]。当然,第一个Process[I,O]的输入端必须连接一个Stream,而最后一个则接在一个实体的设备。我们先不管这两头,先从Process[I,O]的功能着手,使其能够连成一串并把输入类型I转变成输出类型O。

Process[I,O]的类型款式如下:

1 trait Process[I,O]{}
2 case class Halt[I,O]() extends Process[I,O]
3 case class Emit[I,O](head: O, tail: Process[I,O] = Halt[I,O]()) extends Process[I,O]
4 case class Await[I,O](rcvfn: Option[I] => Process[I,O]) extends Process[I,O] 

每个Process[I,O]都可能处于三种状态之一:

1、Halt() 停止处理IO,退出。

2、Emit(head: O,tail: Process[I,O] = Halt[I,O]()) 输出类型O元素head,进入下一状态tail,默认输出head后完成退出。

3、Await(rcvfn: Option[I] => Process[I,O]) 等待一个类型I元素输入,处理IO,返回Process类型结果

可以看出,Await状态代表了某个Process的功能。Emit只是输出该Process对IO处理的结果。

注意:虽然Process[I,O]的功能是把Stream[I]转变成Stream[O],但它绝不是Stream[I] => Stream[O]类型的函数,而是在以上三种状态中游走的状态机器(State Machine)。

以下代码例子可以作为示范:

 1 trait Process[I,O] {
 2  def apply(sin: Stream[I]): Stream[O] = this match {
 3      case Halt() => Stream()   //返回空的Stream
 4      case Emit(out,next) => out #:: next(sin)  //先输出out,跟着处理剔除out的Stream[I]输入
 5      case Await(iproc) => sin match {
 6          case h #:: t_stream => iproc(Some(h))(t_stream)  //如果sin不为空,接受输入首元素后返回状态为处理剔除首元素的Stream[I]输入
 7          case xs => iproc(None)(xs)  //如果sin为空则返回处理空输入状态
 8      }
 9  }
10 }

按照讨论题目,以上例子中Stream[I]被转变成Stream[O],而实现方式则是按照具体状态来确定输出。

为了实现函数组合(functional composition),我们必须想办法把两个Process像接水管一样连接起来:一头的输出是另一头的输入(function fusion):

 1  def |>[O2](p2: Process[O,O2]): Process[I,O2] =  //p2的输入类型是this的输出O,最终输出为p2的输出O2
 2    p2 match {
 3        case Halt() => Halt()  //下面的动作停了,整个管道都停了
 4        case Emit(out,next) => Emit(out, this |> next) //如果正在输出就先输出然后再连接剩下的数据
 5        case Await(iproc) => this match {  //如果下游正在等待输入元素,那么就要看上游是什么情况了
 6            case Halt() => Halt()  //如果上游停顿那么整个管道都停
 7            case Emit(out,next) => next |> iproc(Some(out)) //上游正在输出,下游收到后进入新状态
 8            case Await(rcvfn) => Await((oi: Option[I]) => rcvfn(oi) |> p2) //假如上游收到输入元素,立即转入新状态再继续连接
 9        }
10    }

以上程序并不难理解。现在我们可以这样编写IO处理语句:proc1 |> proc2 |> proc3。

另外,可以把两个Process的处理过程连接起来:一个Process处理完后接着处理另一个Process:

1   def ++(p2: Process[I,O]): Process[I,O] = //完成了this后接着再运算p2
2     this match {
3         case Halt() => p2  //上一个Process完成后接着运算p2
4         case Emit(out,next) => Emit(out, next ++ p2)  //等上游完成所有输出后再运算p2
5         case Await(iproc) => Await(iproc andThen (_ ++ p2)) //等上游处理完输入后再运算p2
6     }

最基本的一些组件map,flatMap:

 1  def map[O2](f: O => O2): Process[I,O2] = //map Process的输出O
 2     this match {
 3         case Halt() => Halt()  //没什么可以map的
 4         case Emit(out,next) => Emit(f(out),next map f) //先map输入元素,再处理剩下的
 5         case Await(iproc) => Await(iproc andThen (_ map f)) //处理完输入元素后再进行map
 6     }
 7   def flatMap[O2](f: O => Process[I,O2]): Process[I,O2] = //只处理输出端O
 8     this match {
 9         case Halt() => Halt()
10         case Emit(out,next) => f(out) ++ next.flatMap(f) //先处理头元素再flatMap剩下的
11         case Await(iproc) => Await(iproc andThen (_ flatMap f)) //处理完输入后再flatMap剩下的
12     }

我们再试试把一串元素喂入Process:

 1   def feed(ins: Seq[I]): Process[I,O] = {
 2     @annotation.tailrec
 3       def go(ins: Seq[I], curProcess: Process[I,O]): Process[I,O] = //尾递归算法
 4         curProcess match {
 5             case Halt() => Halt()
 6             case Emit(out,next) => Emit(out, next.feed(ins)) //正在输出。就等完成输出后再喂剩下的
 7             case Await(iproc) => {
 8                 if (ins.isEmpty) curProcess  //完成了输入元素串,可以返回结果了
 9                 else go(ins.tail,iproc(Some(ins.head))) //吃下首元素然后再继续
10             }
11         }
12         go(ins,this)
13   }

有时我们希望能重复一些简单的过程:

 1   def repeat: Process[I,O] = { //永远重复下去
 2       def go(p: Process[I,O]): Process[I,O] = //p代表当前更新状态
 3        p match {
 4            case Halt() => go(this) //不要停,重新再来
 5            case Emit(out,next) => Emit(out, go(next)) //完成输出后继续go
 6            case Await(iproc) => Await { //注意{}里是partialfunction。iproc是个函数,而partialfunction是function的子类,因而可以这样写
 7                case None => iproc(None)  //没有输入元素,继续等
 8                case Some(i) => go(iproc(Some(i))) //处理输入元素后转入新状态然后继续
 9            }
10        }
11        go(this)
12   }
13   def repeatN(n: Int): Process[I,O] = { //重复n次
14     def go(n: Int, curProcess: Process[I,O]): Process[I,O] =
15        curProcess match {
16            case Halt() => if (n <= 0) Halt()  //n次后真停
17                           else go(n-1, curProcess) //算一次重复
18            case Emit(out,next) => Emit(out, go(n,next))  //虽然状态更新了,但未完成流程。还不算一次重复
19            case Await(iproc) => Await {
20                case None => iproc(None)  //继续等
21                case Some(i) => go(n,iproc(Some(i))) //更新了状态,但未完成流程,不算一次重复
22            }
23        }
24      go(n,this)
25   }

注意我们在以上代码中使用了PartialFunction来分解输入参数值。如果我们有个Function : intFunction(i: Int): String

我们可以定义它的PartialFunction:

{ case 0: "Zero"

   case 10: "Ten" }

由于Await(iproc)中的iproc >>> Option[I] => Process[I,O], PartialFunction是Function的子类所以我们可以写成:

Await {

 case None => ???

 case Some(i) => ???

}

下面是一组Process的基本方法和组件:

 1 object Process {
 2   case class Halt[I,O]() extends Process[I,O]
 3   case class Emit[I,O](head: O, tail: Process[I,O] = Halt[I,O]()) extends Process[I,O]
 4   case class Await[I,O](rcvfn: Option[I] => Process[I,O]) extends Process[I,O]
 5
 6   def emit[I,O](out: O, next: Process[I,O] = Halt[I,O]()) = Emit(out, next)
 7   def await[I,O](iproc: I => Process[I,O], fallback: Process[I,O] = Halt[I,O]): Process[I,O] =
 8     Await {
 9         case Some(i) => iproc(i)     //使用基本类型I
10         case None => fallback        //定义了没有输入元素时应该怎么处理
11     }
12 }

 我们可以把任何 I => O类型的函数升格成Process[I,O]:

 1   def liftOnce[I,O](f: I => O): Process[I,O] =  //给我一个I=>O,我返回Process[I,O]
 2      Await {
 3           case Some(i) => emit(f(i))    //等到一个输入元素I。把它升成一个状态为输出的Process
 4           case None => Halt()
 5      }
 6   def repeatLift[I,O](f: I => O): Process[I,O] = liftOnce(f).repeat
 7   def lift[I,O](f: I => O): Process[I,O] = //不同实现方式的repeatLift
 8     Await {
 9         case Some(i) => emit(f(i), lift(f))
10         case None => Halt()
11     }

还有些组件可以对输入元素进行过滤的:

1  def filter[I](f: I => Boolean): Process[I,I] = //对输入I进行过滤,不转变I, 所以结果是: Process[I,I]
 2     Await[I,I] {  //用PartialFunction来分解两种输入参数值面对的情况
 3       case None => Halt[I,I]()  //没有输入,停止
 4       case Some(i) if(f(i)) => Emit[I,I](i)
 5     }.repeat   //重复过滤所有输入元素
 6    def take[I](n: Int): Process[I,I] =  //可以中途退出
 7      if (n <= 0) Halt[I,I]()
 8      else Await[I,I] {      //进行输入、输出这种IO操作
 9          case None => Halt[I,I]()  //没有输入就完成退出
10          case Some(i) => Emit[I,I](i,take[I](n-1)) //输出通过过滤的,继续过滤剩下的输入元素
11      }
12    def takeWhile[I](f: I => Boolean): Process[I,I] = //可以中途退出
13      Await[I,I] {
14          case None => Halt[I,I]()  //没有输入就完成退出
15          case Some(i) if(f(i)) => Emit[I,I](i, takeWhile[I](f))
16      }
17    def sendAsIs[I]: Process[I,I] = lift(identity)  //直接输出任何输入元素
18    def drop[I](n: Int): Process[I,I] = //必须浏览所有输入元素。不可中途退出
19      if (n <= 0) sendAsIs[I]
20      else Await[I,I](i => drop[I](n-1))  //收取输入元素,直接扔掉,继续n-1循环
21    def dropWhile[I](f: I => Boolean): Process[I,I] =  //必须浏览所有输入元素。不可中途退出
22      await(i => if (f(i)) dropWhile[I](f)   //注意用await, 不是Await
23                     else emit(i, sendAsIs[I]))  //输出这个元素后继续循环输入元素    

注意以上代码中的处理方式:如果过滤通过才emit,原封不动直接传递输入元素 I => I 用lift(identity)产生Process[I,I],用PartialFunction:

 Await {

  case None => ???

  case Some(i) => 

}

来分别处理可能出现的输入参数值。

我们先尝试些简单的算法:

 1    def count[I]: Process[I,Int] =  //读取输入元素次数
 2    //读入任何东西都转成数字1.0 |> 读一个加一个 |> 读入一个就转成一个Int
 3     lift((i: I) => 1.0 ) |> sum |> lift(_.toInt)  //每一个输入元素都会走完整个管道
 4    def count2[I]: Process[I,Int] = { //递归实现方式
 5        def go(c: Int): Process[I,Int] =
 6          await((i: I) => emit(c+1, go(c+1)))
 7        go(0)
 8    }
 9    def mean: Process[Double,Double] = {
10        def go(s: Double, c: Double): Process[Double,Double] =
11          await((d: Double) => emit((s+d)/(c+1), go(s+d,c+1)))
12        go(0.0,0.0)
13    }
14    //以上的内部函数go都体现了一些共同点:有一个起始值,然后维护状态。我们可以分解出一个新的函数
15    def loop[S,I,O](z: S)(f: (I,S) => (O,S)): Process[I,O] =
16      await((i: I) => f(i,z) match {
17          case (o,s2) => emit(o, loop(s2)(f))
18      })
19    //用loop来实现上面的函数
20    def sum2: Process[Double,Double] =
21      loop(0.0)((i:Double,s) => (s+i,s+1))
22    def count3[I]: Process[I,Int] =
23      loop(0)((_: I, c) => (c+1, c+1))

再写一些逻辑小组件:

1    def any: Process[Boolean, Boolean] =  //检查是否收到过true值。即使收到true还是会继续收取输入直至完成读取
2      loop(false)((b: Boolean, s) => ( b || s, b || s))
3    def exists[I](f: I => Boolean): Process[I,Boolean] = //不能中途退出
4      lift(f) |> any  //重复检查输入然后确定是否true. 一旦遇到true永远返回true
5    def echo[I]: Process[I,I] = await(i => emit(i))
6    def skip[I,O]: Process[I,O] = await(i => Halt())
7    def ignore[I,O]: Process[I,O] = skip.repeat

也可以过滤输出元素:

1  def filter(f: O => Boolean): Process[I,O] = //过滤输出元素
2     this |> Process.filter(f)  //this的输出接到下一个Process的输入端然后过滤它的输入元素

zip两个Process:

 1    def feedOne[I,O](oi: Option[I])(p: Process[I,O]): Process[I,O] = //把一个元素输入p
 2      p match {
 3        case Halt() => p  //无法输入,它还是它
 4        case Emit(out,next) => Emit(out, feedOne(oi)(next)) //正在输出。输出完当前元素再开始喂入
 5        case Await(iproc) => iproc(oi)  //直接喂入
 6      }
 7
 8    def zip[I,O,O2](p1: Process[I,O], p2: Process[I,O2]): Process[I,(O,O2)] = //同一串输入元素同时喂入p1,p2。合并输出2tuple
 9     (p1,p2) match {
10         case (Halt(), _) => Halt()
11         case (_, Halt()) => Halt()
12         case (Emit(h1,t1), Emit(h2,t2)) => Emit((h1,h2), zip(t1,t2))
13         case (Await(iproc), _) => Await((oi: Option[I]) => zip(iproc(oi), feedOne(oi)(p2)))
14         case (_, Await(iproc)) => Await((oi: Option[I]) => zip(feedOne(oi)(p1), iproc(oi)))
15     }
16    val mean2 = zip[Double,Double,Int](sum,count) |> lift {case (s,c) => s/c}

还有那个熟悉的zipWithIndex:

1    def zip[O2](p2: Process[I,O2]): Process[I,(O,O2)] =
2      Process.zip(this,p2)
3    def zipWithIndex: Process[I,(O,Int)] =
4      this zip (count map {_ + 1})  //zip从0开始

现在我们肯定可以使用这样的表达式:

count |> exists {_ > 40000}。

当然我们还没有开始讨论这个管道两头的数据源。因为我们要分开独立讨论它。

下面是以上示范代码汇总:

 1 trait Process[I,O] {
  2 import Process._
  3  def apply(sin: Stream[I]): Stream[O] = this match {
  4      case Halt() => Stream()   //返回空的Stream
  5      case Emit(out,next) => out #:: next(sin)  //先输出out,跟着处理剔除out的Stream[I]输入
  6      case Await(iproc) => sin match {
  7          case h #:: t_stream => iproc(Some(h))(t_stream)  //如果sin不为空,接受输入首元素后返回状态为处理剔除首元素的Stream[I]输入
  8          case xs => iproc(None)(xs)  //如果sin为空则返回处理空输入状态
  9      }
 10  }
 11  def |>[O2](p2: Process[O,O2]): Process[I,O2] =  //p2的输入类型是this的输出O,最终输出为p2的输出O2
 12    p2 match {
 13        case Halt() => Halt()  //下面的动作停了,整个管道都停了
 14        case Emit(out,next) => Emit(out, this |> next) //如果正在输出就先输出然后再连接剩下的数据
 15        case Await(iproc) => this match {  //如果下游正在等待输入元素,那么就要看上游是什么情况了
 16            case Halt() => Halt()  //如果上游停顿那么整个管道都停
 17            case Emit(out,next) => next |> iproc(Some(out)) //上游正在输出,下游收到后进入新状态
 18            case Await(rcvfn) => Await((oi: Option[I]) => rcvfn(oi) |> p2) //假如上游收到输入元素,立即转入新状态再继续连接
 19        }
 20    }
 21   def ++(p2: Process[I,O]): Process[I,O] = //完成了this后接着再运算p2
 22     this match {
 23         case Halt() => p2  //上一个Process完成后接着运算p2
 24         case Emit(out,next) => Emit(out, next ++ p2)  //等上游完成所有输出后再运算p2
 25         case Await(iproc) => Await(iproc andThen (_ ++ p2)) //等上游处理完输入后再运算p2
 26     }
 27   def map[O2](f: O => O2): Process[I,O2] = //map Process的输出O
 28     this match {
 29         case Halt() => Halt()  //没什么可以map的
 30         case Emit(out,next) => Emit(f(out),next map f) //先map输入元素,再处理剩下的
 31         case Await(iproc) => Await(iproc andThen (_ map f)) //处理完输入元素后再进行map
 32     }
 33   def flatMap[O2](f: O => Process[I,O2]): Process[I,O2] = //只处理输出端O
 34     this match {
 35         case Halt() => Halt()
 36         case Emit(out,next) => f(out) ++ next.flatMap(f) //先处理头元素再flatMap剩下的
 37         case Await(iproc) => Await(iproc andThen (_ flatMap f)) //处理完输入后再flatMap剩下的
 38     }
 39   def feed(ins: Seq[I]): Process[I,O] = {
 40     @annotation.tailrec
 41       def go(ins: Seq[I], curProcess: Process[I,O]): Process[I,O] = //尾递归算法
 42         curProcess match {
 43             case Halt() => Halt()
 44             case Emit(out,next) => Emit(out, next.feed(ins)) //正在输出。就等完成输出后再喂剩下的
 45             case Await(iproc) => {
 46                 if (ins.isEmpty) curProcess  //完成了输入元素串,可以返回结果了
 47                 else go(ins.tail,iproc(Some(ins.head))) //吃下首元素然后再继续
 48             }
 49         }
 50         go(ins,this)
 51   }
 52   def repeat: Process[I,O] = { //永远重复下去
 53       def go(p: Process[I,O]): Process[I,O] = //p代表当前更新状态
 54        p match {
 55            case Halt() => go(this) //不要停,重新再来
 56            case Emit(out,next) => Emit(out, go(next)) //完成输出后继续go
 57            case Await(iproc) => Await { //注意{}里是partialfunction。iproc是个函数,而partialfunction是function的子类,因而可以这样写
 58                case None => iproc(None)  //没有输入元素,继续等
 59                case Some(i) => go(iproc(Some(i))) //处理输入元素后转入新状态然后继续
 60            }
 61        }
 62        go(this)
 63   }
 64   def repeatN(n: Int): Process[I,O] = { //重复n次
 65     def go(n: Int, curProcess: Process[I,O]): Process[I,O] =
 66        curProcess match {
 67            case Halt() => if (n <= 0) Halt()  //n次后真停
 68                           else go(n-1, curProcess) //算一次重复
 69            case Emit(out,next) => Emit(out, go(n,next))  //虽然状态更新了,但未完成流程。还不算一次重复
 70            case Await(iproc) => Await {
 71                case None => iproc(None)  //继续等
 72                case Some(i) => go(n,iproc(Some(i))) //更新了状态,但未完成流程,不算一次重复
 73            }
 74        }
 75      go(n,this)
 76   }
 77   def filter(f: O => Boolean): Process[I,O] = //过滤输出元素
 78     this |> Process.filter(f)  //this的输出接到下一个Process的输入端然后过滤它的输入元素
 79   def orElse(p: Process[I,O]): Process[I,O] =
 80     this match {
 81         case Halt() => p
 82         case Await(iproc) => Await {
 83             case None => p
 84             case x => iproc(x)
 85         }
 86         case _ => this
 87     }
 88    def zip[O2](p2: Process[I,O2]): Process[I,(O,O2)] =
 89      Process.zip(this,p2)
 90    def zipWithIndex: Process[I,(O,Int)] =
 91      this zip (count map {_ + 1})  //zip从0开始
 92 }
 93 object Process {
 94   case class Halt[I,O]() extends Process[I,O]
 95   case class Emit[I,O](head: O, tail: Process[I,O] = Halt[I,O]()) extends Process[I,O]
 96   case class Await[I,O](rcvfn: Option[I] => Process[I,O]) extends Process[I,O]
 97
 98   def emit[I,O](out: O, next: Process[I,O] = Halt[I,O]()) = Emit(out, next)
 99   def await[I,O](iproc: I => Process[I,O], fallback: Process[I,O] = Halt[I,O]): Process[I,O] =
100     Await {
101         case Some(i) => iproc(i)     //使用基本类型I
102         case None => fallback        //定义了没有输入元素时应该怎么处理
103     }
104   def liftOnce[I,O](f: I => O): Process[I,O] =  //给我一个I=>O,我返回Process[I,O]
105      Await {
106           case Some(i) => emit(f(i))    //等到一个输入元素I。把它升成一个状态为输出的Process
107           case None => Halt()
108      }
109   def repeatLift[I,O](f: I => O): Process[I,O] = liftOnce(f).repeat
110   def lift[I,O](f: I => O): Process[I,O] = //不同实现方式的repeatLift
111     Await {
112         case Some(i) => emit(f(i), lift(f))
113         case None => Halt()
114     }
115   def filter[I](f: I => Boolean): Process[I,I] = //对输入I进行过滤,不转变I, 所以结果是: Process[I,I]
116     Await[I,I] {  //用PartialFunction来分解两种输入参数值面对的情况
117       case None => Halt[I,I]()  //没有输入,停止
118       case Some(i) if(f(i)) => Emit[I,I](i)
119     }.repeat   //重复过滤所有输入元素
120    def take[I](n: Int): Process[I,I] =  //可以中途退出
121      if (n <= 0) Halt[I,I]()
122      else Await[I,I] {      //进行输入、输出这种IO操作
123          case None => Halt[I,I]()  //没有输入就完成退出
124          case Some(i) => Emit[I,I](i,take[I](n-1)) //输出通过过滤的,继续过滤剩下的输入元素
125      }
126    def takeWhile[I](f: I => Boolean): Process[I,I] = //可以中途退出
127      Await[I,I] {
128          case None => Halt[I,I]()  //没有输入就完成退出
129          case Some(i) if(f(i)) => Emit[I,I](i, takeWhile[I](f))
130      }
131    def sendAsIs[I]: Process[I,I] = lift(identity)  //直接输出任何输入元素
132    def drop[I](n: Int): Process[I,I] = //必须浏览所有输入元素。不可中途退出
133      if (n <= 0) sendAsIs[I]
134      else Await[I,I](i => drop[I](n-1))  //收取输入元素,直接扔掉,继续n-1循环
135    def dropWhile[I](f: I => Boolean): Process[I,I] =  //必须浏览所有输入元素。不可中途退出
136      await(i => if (f(i)) dropWhile[I](f)   //注意用await, 不是Await
137                     else emit(i, sendAsIs[I]))  //输出这个元素后继续循环输入元素
138
139    def sum: Process[Double,Double] = { //读进数字,输出当前总数
140         def go(acc: Double): Process[Double,Double] =
141           await(d => emit(acc+d, go(acc+d)))
142         go(0.0)
143    }
144    def count[I]: Process[I,Int] =  //读取输入元素次数
145    //读入任何东西都转成数字1.0 |> 读一个加一个 |> 读入一个就转成一个Int
146     lift((i: I) => 1.0 ) |> sum |> lift(_.toInt)  //每一个输入元素都会走完整个管道
147    def count2[I]: Process[I,Int] = { //递归实现方式
148        def go(c: Int): Process[I,Int] =
149          await((i: I) => emit(c+1, go(c+1)))
150        go(0)
151    }
152    def mean: Process[Double,Double] = {
153        def go(s: Double, c: Double): Process[Double,Double] =
154          await((d: Double) => emit((s+d)/(c+1), go(s+d,c+1)))
155        go(0.0,0.0)
156    }
157    //以上的内部函数go都体现了一些共同点:有一个起始值,然后维护状态。我们可以分解出一个新的函数
158    def loop[S,I,O](z: S)(f: (I,S) => (O,S)): Process[I,O] =
159      await((i: I) => f(i,z) match {
160          case (o,s2) => emit(o, loop(s2)(f))
161      })
162    //用loop来实现上面的函数
163    def sum2: Process[Double,Double] =
164      loop(0.0)((i:Double,s) => (s+i,s+1))
165    def count3[I]: Process[I,Int] =
166      loop(0)((_: I, c) => (c+1, c+1))
167    def any: Process[Boolean, Boolean] =  //检查是否收到过true值。即使收到true还是会继续收取输入直至完成读取
168      loop(false)((b: Boolean, s) => ( b || s, b || s))
169    def exists[I](f: I => Boolean): Process[I,Boolean] = //不能中途退出
170      lift(f) |> any  //重复检查输入然后确定是否true. 一旦遇到true永远返回true
171    def echo[I]: Process[I,I] = await(i => emit(i))
172    def skip[I,O]: Process[I,O] = await(i => Halt())
173    def ignore[I,O]: Process[I,O] = skip.repeat
174
175    def feedOne[I,O](oi: Option[I])(p: Process[I,O]): Process[I,O] = //把一个元素输入p
176      p match {
177        case Halt() => p  //无法输入,它还是它
178        case Emit(out,next) => Emit(out, feedOne(oi)(next)) //正在输出。输出完当前元素再开始喂入
179        case Await(iproc) => iproc(oi)  //直接喂入
180      }
181
182    def zip[I,O,O2](p1: Process[I,O], p2: Process[I,O2]): Process[I,(O,O2)] = //同一串输入元素同时喂入p1,p2。合并输出2tuple
183     (p1,p2) match {
184         case (Halt(), _) => Halt()
185         case (_, Halt()) => Halt()
186         case (Emit(h1,t1), Emit(h2,t2)) => Emit((h1,h2), zip(t1,t2))
187         case (Await(iproc), _) => Await((oi: Option[I]) => zip(iproc(oi), feedOne(oi)(p2)))
188         case (_, Await(iproc)) => Await((oi: Option[I]) => zip(feedOne(oi)(p1), iproc(oi)))
189     }
190    val mean2 = zip[Double,Double,Int](sum,count) |> lift {case (s,c) => s/c}
191
192 count |> exists {_ > 40000}
193 }
时间: 2025-01-01 09:05:57

泛函编程(35)-泛函Stream IO:IO处理过程-IO Process的相关文章

泛函编程(5)-数据结构(Functional Data Structures)

  编程即是编制对数据进行运算的过程.特殊的运算必须用特定的数据结构来支持有效运算.如果没有数据结构的支持,我们就只能为每条数据申明一个内存地址了,然后使用这些地址来操作这些数据,也就是我们熟悉的申明变量再对变量进行读写这个过程了.试想想如果没有数据结构,那我们要申明多少个变量呢.所以说,数据结构是任何编程不可缺少的元素.     泛函编程使用泛函数据结构(Functional Data Structure)来支持泛函程序.泛函数据结构的特点是"不可变特性"(Immutability)

泛函编程(38)-泛函Stream IO:IO Process in action

 在前面的几节讨论里我们终于得出了一个概括又通用的IO Process类型Process[F[_],O].这个类型同时可以代表数据源(Source)和数据终端(Sink).在这节讨论里我们将针对Process[F,O]的特性通过一些应用实例来示范它的组合性(composibility)和由数据源到接收终端IO全过程的功能完整性.   我们已经在前面的讨论中对IO Process的各种函数组合进行了调研和尝试,现在我们先探讨一下数据源设计方案:为了实现资源使用的安全性和IO程序的可组合性,我们必须

泛函编程(36)-泛函Stream IO:IO数据源-IO Source &amp; Sink

 上期我们讨论了IO处理过程:Process[I,O].我们说Process就像电视信号盒子一样有输入端和输出端两头.Process之间可以用一个Process的输出端与另一个Process的输入端连接起来形成一串具备多项数据处理功能的完整IO过程.但合成的IO过程两头输入端则需要接到一个数据源,而另外一端则可能会接到一个数据接收设备如文件.显示屏等.我们在这篇简单地先介绍一下IO数据源Source和IO数据接收端Sink. 我们先用一个独立的数据类型来代表数据源Source进行简单的示范说明,

泛函编程(37)-泛函Stream IO:通用的IO处理过程-Free Process

  在上两篇讨论中我们介绍了IO Process:Process[I,O],它的工作原理.函数组合等.很容易想象,一个完整的IO程序是由 数据源+处理过程+数据终点: Source->Process->Sink所组成的.我们发现:Process[I,O]本身是无法兼顾Source和Sink的功能.而独立附加的Source和Sink又无法有效地与Process[I,O]进行函数组合(functional composition).   实际上Process[I,O]是一种固定单一输入类型(sin

泛函编程(32)-泛函IO:IO Monad

 由于泛函编程非常重视函数组合(function composition),任何带有副作用(side effect)的函数都无法实现函数组合,所以必须把包含外界影响(effectful)副作用不纯代码(impure code)函数中的纯代码部分(pure code)抽离出来形成独立的另一个纯函数.我们通过代码抽离把不纯代码逐步抽离向外推并在程序里形成一个纯代码核心(pure core).这样我们就可以顺利地在这个纯代码核心中实现函数组合.IO Monad就是泛函编程处理副作用代码的一种手段.我们

泛函编程(30)-泛函IO:Free Monad-Monad生产线

 在上节我们介绍了Trampoline.它主要是为了解决堆栈溢出(StackOverflow)错误而设计的.Trampoline类型是一种数据结构,它的设计思路是以heap换stack:对应传统递归算法运行时在堆栈上寄存程序状态,用Trampoline进行递归算法时程序状态是保存在Trampoline的数据结构里的.数据结构是在heap上的,所以可以实现以heap换stack的效果.这种以数据结构代替函数调用来解决问题的方式又为泛函编程提供了更广阔的发展空间.     我们知道,任何涉及IO的运

泛函编程(6)-数据结构-List基础

    List是一种最普通的泛函数据结构,比较直观,有良好的示范基础.List就像一个管子,里面可以装载一长条任何类型的东西.如需要对管子里的东西进行处理,则必须在管子内按直线顺序一个一个的来,这符合泛函编程的风格.与其它的泛函数据结构设计思路一样,设计List时先考虑List的两种状态:空或不为空两种类型.这两种类型可以用case class 来表现: 1 trait List[+A] {} 2 case class Cons[+A](head: A, tail: List[A]) exte

泛函编程(27)-泛函编程模式-Monad Transformer

  经过了一段时间的学习,我们了解了一系列泛函数据类型.我们知道,在所有编程语言中,数据类型是支持软件编程的基础.同样,泛函数据类型Foldable,Monoid,Functor,Applicative,Traversable,Monad也是我们将来进入实际泛函编程的必需.在前面对这些数据类型的探讨中我们发现: 1.Monoid的主要用途是在进行折叠(Foldable)算法时对可折叠结构内元素进行函数施用(function application). 2.Functor可以对任何高阶数据类型F[

泛函编程(21)-泛函数据类型-Monoid

    Monoid是数学范畴理论(category theory)中的一个特殊范畴(category).不过我并没有打算花时间从范畴理论的角度去介绍Monoid,而是希望从一个程序员的角度去分析Monoid以及它在泛函编程里的作用.从这个思路出发我们很自然得出Monoid就是一种数据类型,或者是一种在泛函编程过程中经常会遇到的数据类型:当我们针对List或者loop进行一个数值的积累操作时我们就会使用到Monoid.实际上Monoid就是List[A] => A的抽象模型.好了,我们就不要越描