没事瞎思考

没事瞎思考

深入理解 Transducer

如果你来学「高级概念」-— 组合子 高阶函数 等, 那么你走错地方了,本文并不打算引入任何高深概念,我喜欢简单直观的理解。

多数时候,如果你要别人给你讲函数式编程,或者 Clojure 的 transducer,或者 …… , 最终我们都记住了一大堆概念术语,下次别人问你的时候,你也知道怎么做了。然后我们发现,大家都懂了。

Clojure 1.7 引入了 transducer,我在 1.8 的时候, 才开始大量使用,虽然一开始看了 Rich Hickey 的介绍文章,但仍然没有 Get 到要点。像当初使用 Macro 一样,凭借人类超强的模仿能力,我竟然也能使用,至于为什么使用,因为 Rich 说可以提高性能。

因为经常使用,慢慢的熟了,理解加深了。但是我发现还是很难向别人解释清楚,尽管我自己写起来很顺畅。最终我找到一个还不错的方式,在此记录下来,希望对你有帮助。

1 Trans(form)(re)ducer 干什么用的

动词 Transduce 可以看做两个词的组合 -— transform, reduce ,是 Clojure 的一个核心 Function,而名词 Transducer 可以理解为转换器, transducetransducer 作为参数,把他的转换功能应用到数据流 Stream 上,这有点类似于 reducereduce 的第一个参数是一个 reducing funciton, 比如 conj

那这个 tranducer 到底是个什么东西了,根据官网的定义,tranducer 是一个高阶函数,它接受一个 reducing, 返回一个新的 reducing,新的 reducing 可以应用到另外的 transducer 上,这样又得到一个新的 reducing ……。这就是 transducer 可以组合的原因了。

tranducer 接受 reduceing A 返回 reducing BB 具有什么功能了, 这当然看你的实现,你可以实现任何你想要的功能。通常来说无非就是:

  • 转换数据, 比如 (map inc)
  • 直接返回 result,或者传递数据给下一个 reducing A , 比如 (filter even?)
  • 兼具上面两个功能

看完这个描述,是不是发现,这其实跟 Web 开发的中间件没有区别。

  • 这里用到「下一个 reducing A」,对于数据传递来说, A 处在 B 的下游,因为 A 使用与否,取决于 B
  • 通常把 transducer 命名为 xform

2 transduce 怎么实现的

我们直接看定义,从源码扣出,并去掉文档,重命名了部分内部变量,并加以注释

(defn transduce
  ([xform rf coll] (transduce xform rf (rf) coll))
  ([xform rf init coll]
   (let [;; get finalized reducing function
         rf  (xform rf)
         ;; then apply `rf` & `coll` to reduce
         ret (if (instance? clojure.lang.IReduceInit coll)
               (.reduce ^clojure.lang.IReduceInit coll rf init)
               (clojure.core.protocols/coll-reduce coll rf init))]
     ;; finalize result `ret`
     (rf ret))))

从源码可以看出,transducer xform 需要结合一个 reducing function 使用,最终使用的方式就是通过 reduce。

到此,介绍完毕。 像常规一样, 不来个例子, 说了等于白说,为了保证本文的价值,我们接着往下看。

3 列表的元素都加 1, 并过滤出偶数

;; make a transducer, clojure name this `xform`
(def xform (comp (map inc) (filter even?)))

;; apply reducing function `conj` to `xform` product a reducing function
;; actually transduce function is just a reducing function
(def rf (xform conj))

;; use reducing function
(reduce rf [] (range 10))
;;=> [2 4 6 8 10]

上面的例子很简单,下面我们来图解它:

transducer.png

  • 第一列,状态 Box 初始为空 [] , 输入 0, 经过 inc 之后变为 1,filter 的时候, even? 判断失败,直接返回原 Box []
  • 第二列,拿到 filter 返回的 Box 和一个新的输入 1,inc 之后为 2,filter 的时候, even? 判断成功,往下传递给 conj, (conj [] 2) 返回 [2]
  • 第三列,拿到 conj 返回的结果,和新的输入 ……
  • ……

有了上面这个图示,回头看代码,就容易多了。为了简化问题,我们实现简化版的 transducer, 分别是 map-incfilter-even? ,随便提一句,map 并不是一个 transducer, (map inc) 的返回值才是。


(defn map-inc
  "- rf: the next reducing function"
  [rf]
  ;; return new reducing function
  (fn [result input]
    ;; inc input and then apply to next reducing function
    (rf result (inc input))))

(defn filter-even? [rf]
  (fn [result input]
    (if (even? input)
      ;; transfer to next reducing function if `even?` predicate succeed
      (rf result input)

      ;; return immediately if predicate failed
      result)))

(def rf (-> conj filter-even? map-inc))  ;; we got the same rf again


如果你那上面简化版的实现对照 clojure 的源码, 比如 (map inc) 返回值的实现, 发现,源码要复杂的多,因为我只实现了核心逻辑,下面我来实现一个完整功能的 transducer, 并加以注解:

(defn map-inc [rf]
  (fn
    ;; reducing function need an init state `result`, 
    ;; this signature for init
    ;; this case, simply call next reducing function `rf`
    ([] (rf))

    ;; when tranduce stream finished, this signature for `finalize`
    ;; this case, simply call next reducing function `rf`
    ([result] (rf result))

    ;; this for receive from upstream reducing function
    ;; act as reducing function role
    ([result input]
     (rf result (inc input)))))

从上面可以看出,一个能用于 transduce 的 reducing function,并不像传统的 reducing function 那么单纯, 它有 3 个签名,分别 0,1,2 个参数,其作用分别为:

  • 0:初始化, (transduce xform rf xs) -> (transduce xform rf (rf) xs)
  • 1:处理最终结果, 看 transduce 的源码, 可以看到 (rf ret)
  • 2:充当 reducing 角色,能否调用,取决于上游 reducing,是否往下传递取决于它

4 tranducer 实现,一定需要实现 3 个签名吗?

这个问题的答案是否定的,transducer 的重点在于他的可组合型,可重用性。他的实现不同于的 sequence function,不依赖具体的数据结构。对于任何数据流,或者状态的连续变化,只需要提供一个 reducing 适配器就行。

大名鼎鼎 core.async 就是这么重用 transducer 的, core.async 的 reducing adapter 就是 (add! buf item) , 而 add! 并没有实现 0 个参数情况, 这就是为什么你在创建 chan 的时候,如果使用了 xform 一定需要提供 buf。 因此,对于应用到 core.async 上面的 xform , 你是不需要提供 0 个参数方法也可以用的,但是为了保证良好的重用性, 提供才是最佳实践。

transducer 和并发,并行什么关系?

答:没有关系

Comments

comments powered by Disqus