type
Post
status
Published
date
Mar 12, 2023
slug
go-stream
summary
一种全新的设计模式,数学美感与工程实用价值兼备,且不限编程语言 — Go语言实现
tags
Go
Design Pattern
category
技术分享
icon
password
Property
May 15, 2023 03:38 AM
🤔 有意思的文章
最近读到一篇比较有意思的文章《一种新的流:为Java加入生成器(Generator)特性》,这篇文章介绍了给 Java 语言增加生成器特性的实现方法,说到这其实是一种设计模式,是一种简单但是功能强大的设计模式,文中称其为“数学美感与工程实用价值兼备”。
通读下来也的确让人受益匪浅,想动手试试,尝试给 Go 语言也整一个,也算是给这篇文章做一个补充说明和学习笔记。
原文如下
介绍 Generator
生成器(Generator)可以说是一种特殊的迭代器(Iterator),因为他们的表现一样,每次调用都会得到一个元素,但跟迭代器不同的是,生成器元素不是一次性生成返回,而是每次返回一个,使其天然具有惰性求值等特性。
一般支持 Generator 特性的语言,会用类似
yield
关键字来实现,比如文章中的例子def underscore_to_camelcase(s): def camelcase(): yield str.lower while True: yield str.capitalize return ''.join(f(sub) for sub, f in zip(s.split('_'), camelcase()))
我们改写成 JavaScript 是这样子的
function underscoreToCamelcase(s, generator) { function* camelCase() { yield str => str.toLowerCase(); while (true) { yield str => str.substring(0, 1).toUpperCase() + str.substring(1).toLowerCase(); } } const gen = camelCase() return s.split('_').map((sub) => gen.next().value(sub)).join(''); }
这里也可以窥探出 JavaScript Generator 的一个特点:他是可以返回多个函数的函数
给 Go 语言也添加这么一个 Generator 特性
定义流式接口
type Callback[T any] func(T) type Stream[T any] func(Callback[T])
Callback
是一个单纯的消费函数或者说是回调函数,处理接收到的参数,此处为了使该含义更加明确,单独定义一个类型
Stream
是流这个概念的定义,也是一个单纯的消费函数,只不过他所需的参数是另外一个函数,所以他是一个高阶函数,提供了对回调函数的消费,也是文章中所说的 consumer of callback
实现创建函数
func Of[T any](t T) Stream[T] { return func(c Callback[T]) { c(t) } } func From[T any](ts ...T) Stream[T] { return func(c Callback[T]) { for _, t := range ts { c(t) } } }
从这两个方法也可以很清晰的感受到,所谓的 Stream 的本质是一个能够对回调函数进行消费的高阶函数,他会将自己的元素,递交给回调函数处理。
实现 Map
和 FlatMap
type Mapper[T, R any] func(T) R func Map[T, R any](s Stream[T], mapper Mapper[T, R]) Stream[R] { return func(c Callback[R]) { s(func(t T) { r := mapper(t) c(r) }) } } type FlatMapper[T, E any] func(T) Stream[E] func FlatMap[T, E any](s Stream[T], mapper FlatMapper[T, E]) Stream[E] { return func(c Callback[E]) { s(func(t T) { mapper(t)(c) }) } }
Go 语言实现这两个函数需要绕个圈,无法做到链式调用,因为它不支持在实例函数里面再次定义类型参数,如下面写法
func (s Stream[T]) Map[R any](mapper Mapper[T, R]) Stream[R] { return func(c Callback[R]) { s(func(t T) { c(mapper(t)) }) } }
Map 函数的本质是通过
f(T)⇒R
将 Stream[T]
映射成 Stream[R]
FlatMap 函数的本质是通过
f(T)⇒Stream[R]
将 Stream[T]
中的元素替换成 Stream[R]
的元素实现链式调用函数
func (s Stream[T]) Filter(f func(T) bool) Stream[T] { return func(c Callback[T]) { s(func(t T) { if f(t) { c(t) } }) } } func (s Stream[T]) Take(n int) Stream[T] { return func(callback Callback[T]) { s.ConsumeTillStop(func(t T) { if n <= 0 { panic("stop") } callback(t) n-- }) } } func (s Stream[T]) Drop(n int) Stream[T] { return func(callback Callback[T]) { s(func(t T) { if n > 0 { n-- } else { callback(t) } }) } } func (s Stream[T]) ConsumeTillStop(callback Callback[T]) { defer func() { if r := recover(); r != nil { } }() s(func(t T) { callback(t) }) } func Zip[T, E, R any](s Stream[T], els []E, f func(T, E) R) Stream[R] { var i int return func(callback Callback[R]) { s.ConsumeTillStop(func(t T) { if i >= len(els) { panic("stop") } callback(f(t, els[i])) i++ }) } }
ConsumeTillStop
是利用 panic
和 recover
机制实现的中断闭包执行的结构Zip
函数通过 f(T,E)⇒R
函数将流中的元素与一个 iterable
元素两两聚合,然后转换为一个新的流实现终端操作函数
func (s Stream[T]) Join(sep string) string { var ss []string s(func(t T) { ss = append(ss, String(t)) }) return strings.Join(ss, sep) } func (s Stream[T]) Array() []T { var ss []T s(func(t T) { ss = append(ss, t) }) return ss } func String[T any](t T) string { return fmt.Sprintf("%v", t) }
其实就是给流中的元素编写特定的回调函数,然后把状态保存下来处理,如果不用这些终端函数,可以自己来操作
From(1, 2, 3)(func(i int) { //do whatever you want })
实现 Python generator 的例子
我们可以用现有的 Stream 函数来实现跟 Python Generator 等价的函数
func underscoreToCamelCase(str string) string { capitalize := func(s string) string { return strings.ToUpper(s[0:1]) + s[1:] } stream := func(c Callback[func(string) string]) { c(strings.ToLower) for { c(capitalize) } } return Zip(stream, strings.Split(str, "_"), func(f func(string) string, str string) string { return f(str) }).Join("") }
代码
以上所有代码及测试用例已上传至 Github,欢迎指正和探讨