《Streaming System》

之前就提到过,最近在看《Streaming Systems》这本书。趁着过年期间再加几个周末,终于看完了。自从《Hadoop Application Architectures》之后,很少这样细致的看一本英文大部头了。

我对流处理并不很熟悉,随便写写感想,可能有错误,也并不全是流处理相关的。

关于本书

本书并不是针对某个系统去讲解原理或使用,也并不是针对的讲某种技术,而是从比较抽象的角度去阐述“流处理系统应该是什么样子的”、“应该具备哪些能力”,可以作为系统设计时的参考。如果一定要类比的话,有点类似于《数据库系统概念》这种书。
所以本书并不适合初学者读,如果没有一些数据处理的实际经验(无论stream还是batch),上来就直接看各种概念,很容易懵逼,不知道为什么要这样设计。但反过来说,如果你有一些经验再来看这本书,会有种融会贯通的感觉。就好像你看到的不只是流处理系统,而是整个数据处理领域/相关系统的发展脉络和设计哲学,虽然这么说比较玄学。。。

看书时的几点提示:

  • Safari Books Online真是非常好用,试用期内可以免费在线看书,不过还是推荐充值。
  • Safari Books Online的另一个好处在于,本书的很多示意图是动态的,如果你只看纸质版本,很容易看不懂。而且很多概念本身有点抽象,如果你刚碰到时看不懂,很可能后面的章节也就完全不懂。所以一定要看懂动图、一定要看懂动图。
  • 可以和google的Dataflow论文结合着看。Dataflow可以指的是google的数据处理系统,也可以指的是它提出的那套模型和API,下文中我会混用。
  • 作者的用词有一点奇怪,经常会出现一些很生僻的单词。。。而且有些英文的梗实在是不太懂,大概因为我不是native english speaker吧。

话说,阿里的哥们不是说在翻译中文版么,说是年末上市,这都9102年3月了,也没看到。。。

啥是framework

看书的过程中,我一直在想,各种流处理系统之间的差别到底是啥,为什么以前storm一片繁荣,而现在面对flink败下阵来。如果直接对比功能特性,我们当然能列举出很多:window啊、exactly-once啊、event-time啊。但如果一定要较真,其他框架不能实现这些么?应该可以,但是有些side effect,而且需要用户自己去操心很多事情。如果用storm去实现event-time窗口,需要很小心的设计,很容易出错,而这实际上跟业务逻辑无关,实际的业务代码仅仅是系统代码很小一部分,不值得让用户付出心力。
前段时间看了介绍Jeff Dean的一些文章,讲了MapReduce的一些历史。无非就是以前大家在处理数据时都要从头开始写很多代码,各种烟囱式开发,而MR将这些抽象出来,让用户更能关注自己的逻辑。虽然以现在的眼光来看,MR也是很low-level的API了。
为什么SQL大行其道,很重要的原因在于,用户只是在“表意”,只关心自己的逻辑就可以了。

从这个角度来讲,framework或者说框架,其意义在于屏蔽底层(尤其是infra)的各种细节,把各种脏活累活统一做掉。最典型的脏活就是failover,让用户自己去操心节点失败后如何恢复,闹呢?
框架做的越多,应用层需要做的就越少,这个边界其实挺难划的,而且不同时代需要的框架能力也不同。flink由于支持dataflow模型,这条线划的比较高,或者说抽象程度更高一些,恰逢其时。

不只是数据处理领域,各种JavaEE框架、各种ORM、各种算法框架,甚至于各种library/utils、微服务、RPC、操作系统、编程语言,都是同样的演进过程。所以计算机科学,是不是就是一门关于“抽象”的科学?这在其他学科中好像还是比较少见的。
正如本书作者所说:Hooray for layers of abstraction!

甚至在非技术领域也是如此,比如著名的KISS原则。我们在设计自己的产品时,是否真的帮用户把所有他不需要关心的事屏蔽掉了?说来惭愧,总是有人吐槽我们的系统不好用/反人类,我觉得就是暴露给用户的、需要用户操心的东西太多了。

不过话又说回来,过度的封装也可能带来问题。比如spark/flink中user code可能被并行的执行,如果API层面不显式的让用户感知到这一点,用户的理解偏差可能带来各种奇怪的bug。很久之前看过的一篇文章《RPC之恶》也表达过类似的观点:过度的抽象看似简化,其实会增加整体的复杂性。

这些道理虽然很简单很朴素,但却很容易被忽略。

流 vs 批

一个很本真的问题:流处理和批处理到底差别在哪里?

因为我们都是搞工程的,所以我们很容易列举出一些区别:

  • 批处理的数据是已知的、不可变的,流处理的数据往往是未知的、实时输入的
  • 流处理结果更实时,但相对并不准确,所以才会有所谓的lambda-architecture
  • 流处理需要有daemon进程常驻
  • 批处理往往是无状态的,而流往往是有状态的
  • 因为没有状态,批处理的failover只要重跑就可以了,而流的错误处理会更麻烦
  • 批处理由于能获取一些“全局”状态,可以做一些针对性优化,吞吐量更高
  • 批处理的join实现已经比较完善了,而流的join一直是个大难题
  • 等等等等

但所有这些区别,更像是“结果”,是工程实现的差别,而不是本源。大家之所以觉得流处理系统不准确、吞吐量不高等等,只是因为它被实现成了那个样子。
本书最重要的一个理念,就是从更高的抽象层面,论证了批和流实际上是能统一的(作者实际上说的是stream是batch的超集,但我有点存疑,或者作者说的stream其实是更广义上的?)。我们不应该从工程实现的角度来分类,而应该从处理的问题域来划分。

在真实的场景中,大多数数据其实都是无界(unbounded)+ 无序(unordered)的。很多我们用批处理系统(比如MR)去处理的有界数据,其实是一种“劣化”。亲身经历的场景:

  • late data问题。以前用MR去计算各种报表,由于移动端的特殊性,上传必然是有延迟的。计算T+1报表也必然会少一部分数据,于是需要每天重跑过去3天的数据。为什么是3天呢,拍脑袋定的。。。日志延迟半个月都有可能,但重跑半个月之前的报表已经没啥意义了。
  • session计算问题。以前通过MR去计算T+1的session,但用户的session很可能是跨天的,甚至可能跨几天。一般会多取几个小时的日志,但我也不知道应该多取多久的。只能“一刀切”:凌晨2点之后的日志全部不算在内。

从这个角度来讲,面对这种unbounded数据,批处理只能划分成一个个小的batch来处理,结果其实是不准确的。而流处理天生就考虑到了数据的无界性,理论上准确性应该是优于批的。

那为啥大家还会留下批处理结果准确、流处理结果不准确的印象嘞?这其实是两个层面的概念:accuracy和completeness。批处理的“准确”,指的是计算的accuracy,更多是系统层面:恒定的数据+恒定的计算逻辑=恒定的结果,就像不可变函数一样。即使这个最终结果不准确,也是计算逻辑或者数据的锅,而且误差也是相对恒定可接受的;流处理的“准确”,指的是数据的completeness,理论上能计算更准确的结果,但以前的各种流处理系统由于不能保证(或是实现代价很高)exactly-once,所以实际上算出来的可能不准。即使是同样的数据+同样的处理逻辑,你跑多次结果都可能是不一样的。。。但这不是流处理语义上的锅。

即使是有界(bounded)数据,按无界数据的方式来处理,准确性(accuracy)上也没有什么问题。而且很多我们所谓的有界数据,其实都不是真正的有界。

所以stream和batch更像是处理同一个问题的两种方式,真正的区别在于latency和throughput,这是一种trade-off(有句名言怎么说的,“distributed systems are all about trade-offs”):

  • latency:用stream的方式去处理时,你可以提前观察到结果,虽然这个结果可能不准,是一个提前观测甚至预测。但流处理语义保证当数据完整时结果是正确的(所谓的refinements of results)。而用batch的方式去处理时,如果想要正确的结果,必须保证数据完整后才能开始计算。
  • throughput:batch方式吞吐量更高,因为有全局的数据,可以对shuffle做更多优化。而且failover简单,也就不需要持久化很多状态。而stream的方式需要提前计算结果,消耗更多资源,而且failover需要保存更多中间状态,所以吞吐相对没有batch高。
    • 题外话,关于shuffle:毫不夸张的说,所有的分布式计算框架,最重要的就是shuffle过程。shuffle的语义是否强大决定了框架的能力,shuffle的实现方式决定了框架的效率。

问题域统一后,流和批的API也是可以统一的(Dataflow/Beam)。这也很好理解,如果要解决的问题是相同的,而API是更多表意的(不掺杂太多底层逻辑),那表达方式应该也是相同的。

真正麻烦的在于引擎层面(或者说runner),目前还没有一个很好的解,估计很长一段时间内也都很难统一。现阶段Beam这套API,还是用于流处理更实际一点。即使flink宣称同时支持流和批,实际上批处理那套API也没啥人用。

所以上面说的各种统一更多是理论上的,我们作为工程开发人员还是要实际一点。。。仰望星空,脚踏实地嘛。不过这应该是未来可以探究的一个方向,各大公司也有很多投入(Google Cloud Dataflow/Databricks Delta: Unified Analytics Engine),嘴上说着没有one size fits all/no silver bullet,身体还是很诚实的。。。

Stream vs Table

本书认为衡量数据的特性应该从两个维度出发:cardinality和constitution(看到这两个词直接懵逼了,不查根本不知道什么意思)。

  • cardinality:数据的规模,是有界还是无界;
  • constitution:数据的组织方式,或者说对外的表现形式,有两种:stream和table;

就像波粒二象性一样,stream和table其实是针对数据的两种视角(不过应该是特指结构化数据,书中没有明确写出来)。这么说有点抽象,最直接的例子就是mysql的binlog:binlog代表了数据变化的过程,应用binlog后的table代表着数据的一份快照。另一个例子就是物化视图,视图会随着数据源的变化而更新(可以认为是接收到了一条数据变化的消息通知)。
或者这么说,在大多数database中,虽然呈现给我们的是table形式,但实际上的底层都存在一种append-only的数据结构(innodb的redo log,hbase的WAL)。呈现出来的table形式,是更上一层的“抽象”。这种机制也被称作CDC(Change Data Capture),不光在数据库领域,在其他领域也是非常常见的。

于是作者提出了stream和table的相关性(streams and tables are really just two different sides of the same coin):

  • Streams → tables:The aggregation of a stream of updates over time yields a table.
  • Tables → streams:The observation of changes to a table over time yields a stream.

另一种表述:

  • Tables are data at rest.
  • Streams are data in motion.

其实点破的话,也没那么难理解,虽然有点抽象。但是然后嘞?就算论证了stream和table的相关性,意义在哪里?这就有点纯理论研究的性质了:

  • 任何data processing system,无论流还是批,其实就3个要素:stream、table、operation
    • 作者论证了现有的各种系统(MR、Beam等)都能适配到这个理论中,具体的论证过程不详述
    • 但是各种系统中会有偏向(biased),比如Beam中,stream才是first class citizen,table的概念是隐式的,MR则反之
  • operation会导致stream和table之间的变换,共有4种情况:
    • stream → stream:nongrouping operation,或者说是对单个元素的处理(filter/map等),很好理解,storm等传统意义上的流处理都是针对的这种情况
    • stream → table:grouping operation,或者说是需要shuffle的操作,shuffle后每个operator需要保存一个internal state,其实就是这个所谓的table,典型的就是各种聚合(参考spark中的stage划分策略)
    • table → stream:ungrouping operation,典型的就是dataflow中的各种trigger
    • table → table:不存在,所有对table的修改都是通过先ungroup再group实现的
  • 更关键的一点:对于table我们已经有了很完善的处理手段(比如SQL),而这些手段都是能直接应用于stream的,至少理论上是完备的

关于StreamSQL,本书中有专门的一章来讨论,作者描述了一种比较完善的流式SQL语义。其大概思想就是在传统的关系代数(RDBMS的理论基础)中加上时间这一维度,用所谓的TVR(Time-Varying Relations)来描述各种变换,进而扩展现有的SQL语法(前提是尽量精简,保证一个最小集)以支持流式处理。感觉上,有一点像数仓理论中的拉链表?表中的每条数据,其实都是有生命周期的。
不过作者也承认,这是一个比较理想的模型,现有的各种系统没有一个真正实现了这套完备的SQL语义。好像ANSI也在讨论流式SQL的标准,准备制定规范了?不知道最终的规范会是什么样。

总的来说,这个“stream/table二象性”还是比较抽象的,更多是一种理论上的指导意义。了解下可以,别过于纠结。

P.S.:关于append-only和CDC,还有一篇经典文章:The Log

状态是万恶之源

很早以前我也这么说过,不过那时针对的是React

不只是在分布式领域中,哪怕是单机的业务系统,我们也要很小心的设计状态,包括各种状态的转移、状态的持久化、状态的恢复等。在面对复杂的业务规则时,使用状态机也往往是一个比较好的选择。完全没有状态似乎不太可能,不过合理使用shared storage可以尽量减少维护成本。

说回流处理领域,如果我没记错,storm里是没有状态的概念的,所有的一切都要自己管理,第一个完善状态管理的好像是samza?状态带来的问题主要有两方面:

  • failover:一个有状态的task/node,挂掉后如何恢复?
  • rescale:一个有状态的operator,修改parallelism后,状态如何拆分?(operator/instance的概念需要区分)

实际中讨论的更多的问题好像还是failover,rescale的问题我只在flink的文档中看到一些,动态修改并行度一直都是个大难题。

如果说的抽象一些,所谓的状态就是上文中“stream/table二象性”理论中“stream → table”变换生成的table。这个“table”往往都是各个operator自己维护在内存中,不对外暴露,待到trigger触发后才对外emit一个值。不过本书中提到,如果这个internal state可以直接对外暴露,很多情况下可以省略掉sink阶段,flink中也有一些类似的做法:Queryable State

如果从状态的分类来说,大概可以分为两种:

  • operator内部的状态,比如window的聚合、kafka的offset等;
  • transformation的中间状态,用户也可以将自己code中的状态交给系统托管;

感觉上,系统提供了一个托管状态的“容器”,所有在这个容器中的状态,其生命周期、持久化、failover都由系统管理。如果你写自己的transform逻辑时,非要自己维护状态(比如在redis中),也不是不行,谁也拦不住对吧。(“我就是要出狂战斧”,这个梗还有多少人懂。。。)
由系统维护的状态,如果要保证其可靠性,要么写到shared storage中,将可靠性交给底层存储来保证;要么就是定期checkpoint,相当于定期备份。其实很多系统的机制都差不多,比如hdfs的editlog+fsimage。至于备份的状态失败时如何恢复、如何split,那就是另外一个话题了,跟下文要讨论的exactly-once也有关。

参考flink相关文档:Working with StateCheckpointing

exactly-once

老生常谈了,为啥流处理经常被人吐槽结果不准,还有人说“storm是弱一致性的”(“一致性”这个词在个场景是很模糊的,我觉得就等价于exactly-once),就是因为很难保证exactly-once语义,数据处理过程很难做到replayable。同样的输入每次计算的结果都可能不一样(nondeterministic),让别人如何相信你(虽然这种不确定性不完全是由于框架本身导致的)。。。所以才有了Nathan Marz的lambda-architecture,但也带来了各种其他问题,更像是一个过渡方案。

为啥exactly-once这么难,根本原因还是在于failover,或者说fault tolerance(在分布式的环境下框架不可能忽略这个问题)。如果框架要处理节点失败的情况,至少也是at-least-once。我们要考虑的是如何在此基础之上达成exactly-once。
需要注意的是,exactly-once并不保证user code只运行一次,所以如果在代码内部做了一些外部操作,比如访问外部系统服务,还是可能有副作用的。数据也未必真的只是处理一次,只要保证最后的结果与“只处理一次”的场景下一致就可以了。

如果细分的话,exactly-once其实分3个层面,只有这三个层面都得到保证,才是完整的端到端exactly-once语义:

  • shuffle:every record is shuffled exactly once,或者说是系统内部的exactly-once语义。非shuffle的场景下(比如单个stage内)通常都能隐式的保证exactly-once。
  • source:every source record is processed exactly
  • sink:every sink produces accurate output

很多系统(包括flink的旧版本)宣称提供exactly-once,但实际上只能保证第一个层面,即shuffle。

shuffle的exactly-once的常用解决方法通俗点说就是去重。at-least-once是如何保证的?一般是上游直接重试,比如storm的ack机制,这种机制也被称作upstream backup,由上游保证消息被正确传递,ack丢失时就可能出现重复。如果在接收端维护一个hashmap之类的结构,给每条消息分配唯一的ID,理论上就可以避免重复。当然实际上肯定没这么简单,书中还提到了graph optimization、bloom filter等策略以提高效率。
书中还讨论了另一个有趣的case:部分transformation的结果是nondeterministic的,尤其是用户自己写的代码。比如每次去查一下redis再决定输出什么值。如果发生了失败和retry,很可能每次重跑的结果都是不一样的,虽然理论上讲这个锅和系统本身无关。为了应对这种情况Dataflow会在shuffle前将每个stage的输出的值加上unique id写到stable storage,再发送到下一个stage。发生retry时,直接使用stable storage中的值输出给下游。这种场景大多数框架都不会考虑吧,也许google内部有什么特殊需求,实现中也必然还是有一些优化的。

除了这种常见的重试(at-least-once)+去重的策略,还有个画风不太一样的就是flink。它通过自身的checkpoint机制(其实就是当前pipeline的状态的快照),当出现错误时直接回滚到上一个checkpoint,而且是全局回滚。理论上来讲这种操作代价还是比较大的?仅适用于错误概率比较小的情况吧。从这个角度来说,flink不是failover,而更像是failback。
而且通过对checkpoint机制的扩展,flink衍生出了savepoint的概念,可以从任意savepoint重启整个pipeline,还是挺nb的。看了下逻辑上似乎就是一个 < operator-id, state > 的map,只要从这个savepoint重启,就会按照id去恢复状态。甚至拓扑结构完全不一样都可以,只要id能对上。

source/sink端的exactly-once保证更多要依赖于具体的存储系统,比如kafka。
source端为了防止重复消费,常见的策略也是去重。在kafka 0.11之前,都是需要每个source端自己去做去重(应该是根据partition+offset吧);0.11版本之后引入了幂等和事务特性,理论上不再需要source端处理。
sink端似乎是最麻烦的,借用spark的文档中的说法,有两种方式:

  • Idempotent updates:sink还是可能重跑,输出多个重复值,但存储系统是幂等方式存储的,比如文件、KV这种每次覆盖;
  • Transactional updates:这个比较好理解,如果retry,上一次输出的值其实没有commit;

理论上来讲,去重策略在这里还是可以用的吧?sink在输出前先查下外部系统,判断是否已经输出,就是代价可能比较大。

flink是将kafka的事务特性与checkpoint机制结合(Kafka Producers and Fault Tolerance),引入两阶段提交才实现的exactly-once,好像还挺复杂的我也没仔细看。

总的来说,感觉exactly-once已经不是那么遥不可及的的一个东西了,之前很多人甚至说exactly-once是不可能的。当然可能他们定义的exactly-once更严格,也许是学术界与工业界的gap?

参考资料:

You Cannot Have Exactly-Once Delivery
文章比较老了,还是可以看看

Exactly once is NOT exactly the same
直接对比了Distributed snapshot和at-least-once event delivery plus deduplication

Apache Flink 端到端(end-to-end)Exactly-Once特性概览

其他

以上我只总结了自己印象深刻的几个点,也不全是书里的内容,加上了自己的一些理解。

书中还有很多其他有意思的东西,不一一赘述了,比如:

  • watermark概念也是很有意思的,还有low/high watermark的设计理念上的差别
  • session的计算、window的合并
  • 不同的Accumulation模式:discarding、accumulating、retracting
  • 在event-time时间域实现process-time处理逻辑
  • StreamJoin的各种实现策略,老大难问题
    • inner join还好,就是在join operator中要保存很多状态
    • outer join还会带来额外的retraction成本
  • 所谓的What/Where/When/How等等

作者在QCon等场合也有一些presentation,可以结合着一起看。

其实很多问题都不难想到,只要你真实使用MR、storm之类做过数据处理。比如时间漂移/状态存储/重复消费等等,dataflow只不过把这些问题抽象化、系统化了,并且在一个框架内提供了统一的解决手段。如果之前没有经验,可能很难理解为啥google要搞得这么复杂,这么多概念。

最后说说flink,这应该是最近最火热的流处理框架了吧。总结原因大概有几点:

  1. 语义上更完备,支持dataflow模型,对于无界、无序数据提供更好的支持;
  2. 全局快照机制,解决了状态问题并基于此提供强一致性保证。而且还玩出了花样,又是增量又是异步的,并衍生出了savepoint能力;
  3. SQL能力的加持;
  4. 开源社区活跃 + 大公司站台;

以后有时间还是要好好研究下。