原文链接

从monolithic database(整体服务器)到distributed system(分布式系统)的过渡中, 我们渐渐意识到这一切的核心都有一个很简单的概念: log(日志), 也称为write-ahead log(WAL), commit log, 或transaction log. Log的历史几乎和计算机一样长, 且作为分布式数据系统和实时应用程序架构的核心. 若不理解log, 则无法完全理解数据库, NoSQL存储, key-value存储, replication(复制), paxos, hadoop, version control(版本控制), 和大多数软件系统.

1. What is a Log?

Log可能是最简单的存储抽象, 是一种append-only(仅追加), totally-ordered(全序)的record(记录)序列, 以时间排序, 如下图:
log

Record只能被追加到log的尾部, 并从左往右读取record, 每个record都有一个唯一日志条目号. 由于左边的record一定早于右边的record写入, 因此record的排序定义了一种时序. 这种顺序让其与物理时钟脱钩, 该属性在分布式系统中十分重要. Log内部的数据内容或数据形式并不重要. 需要注意的是, 如果我们一直添加新的record, log最终会消耗掉所有存储空间.
读到这里你可能会想, 我们为什么要讨论这么易懂的事情? 仅追加的record序列与数据系统有何关联? 答案是, log能够记录发生了什么事情何时发生的事情, 这是分布式系统中很多问题的核心.
在进一步讨论之前, 必须先区分另一种logging: 当应用程序遇到错误时, 使用syslog或log4j写入本地文件的非结构化错误消息. 区别在于: 这种log是供人类阅读的文本信息, 而我们讨论的log为程序化访问而构建的.

1.1 Logs in databases

Log一词的起源已经不可追溯, 或许因为log这个概念过于简单, 发明者都没意识到这是一个发明. Log最先用于IBM的System R, 当数据库崩溃时, log可让各种数据结构和index保持同步. 为了保证数据库的atomic(原子性)和durable(持久性), 在更改数据之前, 我们要将被修改的record信息写入log. 换句话说, log记录了所发生事件, 每个table或index都是log的映射. 由于log会被立即持久化, 因此其可作为崩溃后恢复其他持久性结构的依据.
随时间推移, log的应用从ACID的实现细节发展到数据库之间复制数据. 事实证明, 单个数据库中发生更改的顺序, 正是多个数据副本保持同步所需的. Oracle, MySQL, 和PostgreSQL都包含传输log的协议, 以便向slave数据库传输log.
Log作为数据订阅机制虽然处于偶然, 但这种抽象非常适合各种消息传递, 数据流, 和实时数据处理.

1.2 Logs in distributed systems

Log解决了两个问题:

  • 顺序更改
  • 分发数据

如何对更改的顺序达成一致是这些系统的核心设计问题. 分布式系统之所以采取以日志为中心的方案, 源于一个简单的观察, 这里称之为State Machine Replication原则: 若两个相同的, deterministic(确定性的)进程以相同state(状态), 以相同顺序获得相同输入, 则它们会产生相同的输出, 并转换为相同的状态.

  • Deterministic表示处理的结果与时间无关, 也不会受到外部影响. 例如, 线程执行的顺序, gettimeofday的调用, 或其他不可重复的事情, 这些事件能影响程序的输出, 则该程序是non-deterministic(非确定性的).
  • State(状态)表示进程运行结束后, 机器内的数据(内存或磁盘).

这里需要注意: 以相同顺序获得相同输入, 这就是log的来源. 换句话说, 当两个确定性代码输入相同的log, 应产生相同的结果.
在分布式计算中, 让多台机器做相同事情十分困难, 但该问题可简化为: 让这些机器获得同一份log. Log的作用在于, 让输入流不包含任何非确定性事件, 以此同步每个服务器的输入.
这种方法的优点在于, log的索引可作为服务器状态的时间戳. 换句话说, 日志中每个record的索引都可以描述一个副本, 也就是副本已处理的record数量.
根据日志中的内容, 有多种方法可以在系统中应用此原则. 例如, 我们可以记录服务的传入请求, 服务处理请求时经历的状态变化, 或执行的命令. 理论上, 我们可以记录每个副本执行的机器指令序列, 或所调用的函数名和参数. 只要两个进程以相同方式处理这些输入, 副本就能保持一致的状态.
不同的人对日志的用途有不同的描述. 数据库人员会区分physical logging(物理日志)和logical logging(逻辑日志):

  • physical logging: 记录每一行更改的内容
  • logical logging: 不记录更改的内容, 而记录导致了数据更改的SQL命令(insert, update, delete)

分布式系统的文献会将processing and replication(处理和复制)分为两种解决方案:

  • state machine model: 也称为active-active model, 日志记录输入的请求, 每个服务器各自处理请求
  • primary-backup model: 选择一个服务器作为leader, 允许leader按照请求抵达的顺序处理请求, 并输出状态变化日志, 其他服务器按照日志内的顺序应用leader的状态变化, 并在leader宕机时替代其成为新的leader
    Two broad approaches to processing and replication

为理解上述两种方案, 我们以一个玩具问题作为例子. 假设集群中有多个服务器, 各自维护一个整数作为其状态(初始为零), 并对该值进行加法和乘法运算.

  • active-active model: 输出所进行的命令的日志, 如+1, *2等, 每个服务器应用这些命令, 从而达成相同的状态
  • primary-backup model: leader执行命令, 并将运行的结果输出到日志中, 如3, 6

上述例子表明, 顺序是保持各个副本间一致性的关键: 加法和乘法的不同顺序会导致不同的结果.
分布式系统中, log可看作对consensus(共识)问题建模的数据结构. Log代表了对数据的一系列决策. 很多共识算法的目的都是为了维护分布式一致的log.
我们应将日志作为一种抽象的存在, 而不必考虑其实现, 就像我们讨论hash table时不会讨论太多实现细节. 日志作为一种接口, 可以有多种算法和实现, 以提供不同环境下的最佳性能.

1.3 Changelog 101: Tables and Events are Dual

回到数据库, 变更的logtable之间有着一种duality(二象性). Log类似于借贷清单和银行处理流水, 而table则是当前账户的余额. 只要拥有log, 就能将log中的变更应用到table中, 获得最新状态; table记录每条数据的最新状态(日志的特定时间点). 因此, log是最基础的数据结构: log除了可用来创建原本的table, 还能创建各种衍生table.
这个过程也是可逆的: 当对一个table进行更新时, 我们可以记录这些变更, 并将包含变更的changelog发布到table的状态. 因此, table和event是二象性: table支持静态数据, log记录仪变更. Log能够记录完整的变更过程, 其不仅仅包含table的最终状态, 还可以用于还原任何之前的状态. 换句话说, log可看作一系列table状态的备份.

代码的version control(版本控制)与数据库之间有密切关联. 版本控制解决了一个分布式系统中很类似的问题: 管理并发的状态变更. 版本管理系统采用的是sequence of patches(补丁序列), 也就是log. 我们与当前代码的snapshot(快照)交互, 类似于table; 更新时, 只需要下载补丁(也就是日志)并将其应用到当前快照即可.

1.4 What's next

接下来我们将介绍一些log的优势, 不限于分布式计算或抽象分布式计算模型:

  1. Data Integration: 让存储和处理系统可以轻松地访问组织中所有数据
  2. Real-time data processing: 计算生成的数据流
  3. Distributed system design: 如何通过log-centric design(集中式日志设计)来简化系统

上述场景都使用到日志的功能: 生成持久化的, 可重放的历史记录. 让多台机器以确定性方式按照不同速度重放历史记录是这些问题的核心.

2. Data Integration

首先需要定义data integration(数据集成): 让组织内的所有数据都被所有服务和系统访问. 虽然人们听得更多的是big data(大数据), 而不是数据集成. 但我认为, 数据集成其实更有价值.
数据的使用遵循着马斯洛需求体系, 金字塔的底层是如何捕获所有数据, 并将它们放到合适的处理环境(可以是实时查询系统, 或文本文件). 这些数据需要以统一方式建模, 以方便读取和处理. 只有满足了捕获数据的需求, 才能以不同方式处理数据, 如MapReduce.
若没有可靠且完整的数据流, Hadoop则很难发挥作用. 一旦有了data and processing(数据和处理), 才需要将注意力转移到高级数据模型和语义上.

2.1 Two complications

两个趋势让数据集成实现起来越来越困难:

  1. The event data firehose(事件数据管道): 第一个趋势是不断增长的event data(事件数据). 事件数据记录的是发生的事件, 而不是已存在的事件. 在Web系统中, 这意味着用户活动事件, 机器活动事件, 和统计数字, 统称为log data(日志数据). 但不要和log混淆, 这些数据是现代Web的核心. 这种类型的事件数据记录了发生的事情, 数据量往往比传统数据库应用大几个数量级, 因此对处理能力形成巨大挑战.
  2. The explosion of specialized data systems(专用数据系统的爆发): 第二个趋势是专用数据系统的爆发, 这些数据系统在最近几年流行, 用于OLAP, 搜索, 批处理, 图分析等. 大量多样化数据组合需要传递给多个系统, 这也是数据集成的一个问题.

2.2 Log-structured data flow

对于系统之间的数据流, log是最自然的数据结构, 解决方案很简单: 提取所有系统的数据, 并放到一个用于实时订阅的中心化log中.
每个逻辑数据源都可被建模为其日志, 每个数据源可看做一个输出事件日志的应用, 或是一个接受修改的数据表. 每个订阅消息的系统都尽可能快地从log读取record, 并将每条record应用到自己数据中, 同时向前滚动log文件中自己的坐标. 订阅方可以是任意一种数据系统, 如缓存, Hadoop, 另一个网站的数据库, 搜索系统等.
Log subscription

Log为每个变更提供了一个逻辑时钟, 由于每个订阅方都有一个可比较的"时间点", 这让判断两个订阅方的状态是否一致变得很轻松. 假设集群中有一个数据库, 和一组缓存服务器, log可提供一种方法同步更新到所有系统, 并推断出各个系统所处的时间点. 现在客户端对log中的record执行一个写操作, 如果客户端不想在缓存中看到旧值, 只需避免从复制操作还未跟上进度的缓存读取该record.
日志还起到缓存的作用, 让数据的生产异步于其消费. 这一点对于多个订阅方消费速度不同时很重要, 如果某个数据订阅方宕机或维护, 重新上线后其仍能继续赶上进步. 这使得订阅方可按照自己的节奏来消费数据, 批处理系统(如Hadoop)或数据仓库只能周期性消费数据, 而实时查询系统可能每秒都要消费数据, 无论添加或删除何种数据系统, 日志系统都无需改变传输管道.

2.3 At Linkedin

随着Linkedin从集中式关系数据库过渡到分布式系统, 数据集成问题也在迅速变化. 目前Linkedin的数据系统如下:

  • 搜索
  • Social Graph
  • Voldemort(键值存储)
  • Espresso(文档存储)
  • 推荐引擎
  • OLAP查询引擎
  • Hadoop
  • Terradata
  • Ingraphs(监控图表和指标服务)

每个数据系统都是一个专用的分布式系统, 在各自领域提供高级功能. 最早Linkedin开发一种称为databus的基础服务, 其已展现出一种日志缓存的抽象, 用于可伸缩地订阅数据库修改, 给social graph和搜索索引提供数据.
在发表了自己的key-value streo后, 接下来的一个项目需要将一个运行的Hadoop部署用起来, 迁移一些推荐处理上来. 由于缺乏这方面的经验, 只偶安排了几周时间来做数据导入导出, 剩下时间用于实现复杂的预测算法.
原本计划是将数据从Oracle数据仓库中剥离, 但随后发现, 从Oracle数据仓库中迅速取出数据很困难; 更糟糕的是, 数据仓库的处理过程并不适合我们为Hadoop设计的批处理过程, 因为大部分处理都是不可逆的, 并且与要生成的具体报表相关. 最终Linkedin决定, 避免使用数据仓库, 直接访问源数据库和日志文件, 最终演变成一个管道.
这种数据拷贝最终成为原先项目的主要内容之一, 糟糕的是, 只要管道出现问题, Hadoop也毫无用处: 错误的数据只会导致算法产生错误结果. 更糟糕的是, 虽然已经使用了很通用的构建方式, 但每个数据源仍需手动配置, 这也是大量错误和失败的根源.

以下是一些问题的总结:

  • 首先, 虽然管道很杂乱, 但极有价值. 之前处理这些数据十分困难, 现在这种处理变得可能. 许多新的产品和分析技术都来源于多个数据块集合在一起, 而这些数据之前被锁定在各自的系统中.
  • 第二, 可靠的数据加载需要数据管道的深度支持. 如果可以捕获到所有所需的结构, 则可以使得Hadoop数据全自动加载, 这样不需要额外操作就可以添加新的数据源, 或处理schema变更.
  • 第三, 数据覆盖率很低. 如果Linkedin所有数据在Hadoop中可用的比率不高, 相比于接入并运转一个新的数据源, 完整接入一个数据源需要付出更多精力.

曾经Linkedin会为每个数据源和目标源构建自定义的数据加载, 然而随着数据系统和数据仓库的数量越来越多, 系统和仓库之间的管道会变得错综复杂, 如下:
Data Pipeline Complex

需要注意的是, 数据是双向流动的: 一些系统(数据库, Hadoop)既是数据的来源, 也是数据的目的地. 因此我们需要为每个系统建立两个通道: 一个用于数据输入, 一个用于数据输出.
如果想让所有数据源两两连接, 需要$O(N^2)$条管道, 显然需要大量人力. 为避免该问题, 我们需要一种通用方式:
Data Pipeline Simple

尽可能将每个数据消费者与数据源隔离, 理想情况下, 它们只需与一个单独的数据源集成, 就能访问到所有数据: 通过添加一个新的数据系统, 该系统既作为数据源, 也作为数据目的地, 其他系统只需与该系统创建一个单独管道.
因此, 诞生了Kafka, 其将消息系统日志结合起来, 将其作为所有活动数据的中心管道, 并逐步拓展到其他使用方式, 如数据监控. 在相当长的时间内, Kafka是独一无二的: 其作为一个底层设施, 既不是数据库, 也不是日志收集系统, 更不是传统的消息系统.

2.4 Relationship to ETL and the Data Warehouse

Warehouse(数据仓库)是一个仓库, 其中包含用于分析的结构化数据. 数据仓库的用法为: 周期性地从数据源提取数据, 将数据转换为可理解的形式, 然后导入中心数据仓库. 对于数据密集型的分析和处理, 有一个高度集中的位置存放所有数据副本是非常有价值的资产. 无论使用Oracle, Teradata, 或Hadoop, 整体方法不会有太大变化.
数据仓库包含的集成式数据十分有价值, 但获取干净数据的方式有点落后. 对于以数据为中心的组织, 关键问题是将干净的集成数据耦合进数据仓库. 数据仓库是一个批处理查询基础设备, 非常适合多种report和ad hoc分析, 尤其是当查询设计count, aggregation, 和filter. 但批处理系统作为干净数据的唯一仓库, 意味着实时反馈的系统(如实时处理, 搜索索引, 监控系统)无法使用该仓库.
实际上, ETL分为两部分:

  • 数据的提取和清洗, 释放被锁在各类系统中的数据, 并移除特定于某个系统的无用数据
  • 按照数据仓库的查询重构数据, 例如, 使其符合关系型数据库, 强制使用star schema(星形模式)或snowflake schema(雪花模式), 或将数据打散为column format(列格式).

同时做到上述两件事很困难, 既要让集成的数据被实时处理, 还要索引到实时存储系统中. 虽然数据仓库ETL大大提升了组织层面的可伸缩性, 但数据仓库团队要负责收集和整理组织中各个团队所生成的全部数据, 两边的收益不对等: 数据的生产者并不知晓其数据在数据仓库中的使用情况, 为了将数据转换为可用的形式, 数据的提取过程很繁琐和困难, 且无法规模化. 而且, 数据仓库的团队很难跟上其他团队的发展速度, 结果就是, 数据的覆盖率参差不齐, 数据流十分脆弱, 每当发生数据变更时, 很难立即跟进.
较好的做法是创建一个中央管道, 也就是log, 其有一套良好的API来规范如何添加数据. 数据提供方负责与管道集成, 并提供干净, 结构良好的数据. 这意味着, 作为数据生产者, 其在交付数据到中央管道时, 必须考虑其输出和输入的数据是否有良好的结构形式. 添加新的存储系统对数据仓库团队没有影响, 因为只有一个集成中心. 数据仓库团队只需处理一些简单问题, 例如, 从中央日志中加载结构化的输入数据, 完成特定于某个系统的数据转换.
Pipeline Ownership

当考虑采用传统数据仓库之外的数据系统时, 组织的可拓展性变得尤为重要. 例如, 对完整数据集的搜索功能, 或对数据流的秒级监控. 传统数据仓库或Hadoop集群都无法满足上述两种需求. 更糟的是, ETL处理管道的目的是为了支持数据加载, 然而ETL无法输出到其他系统, 引导这些基础设备也变成一项繁重的工作. 这也解释了为什么大多数组织无法完全利用自己的数据. 反而, 如果组织已建立起一套标准的, 结构良好的数据, 那么任何新的系统要想使用数据, 只需要与管道集成即可.
这种架构为数据清洗和转换提供了不同的选择:

  1. 数据被放入全局日志之前, 数据生产者负责处理数据
  2. 日志上实时转换器负责处理数据, 并生成一个新的日志
  3. 数据订阅者负责转换数据

最好让数据生产者在发布到日志前规范化数据, 这样数据始终处于canonical form(规范形式), 且不需要任何代码来转换. 由于数据生产者最了解其拥有的数据, 因此它们更能处理其中的细节.
任何对数据的增值转换都应在原始日志的后期处理中执行, 包括事件数据的会话流程, 或增加大众感兴趣的衍生字段. 原始日志仍可用, 但实时处理产生的衍生日志包含一些参数数据.
最后, 只有针对特定的目标系统时, 才能将聚合作为加载流程的一部分, 其中包含将数据转化为star或snowflake schema. 和传统的ETL相比, 这种方式下的数据是干净且规整的, 因此转化过程十分简单.

2.5 Log Files and Events

这种架构还有一些其他优势: 其支持解耦和事件驱动的系统.
网页行业中, 通常会将活动数据保存为文本形式, 这些文本文件会被拆解并放入数据仓库或Hadoop, 用于聚合和查询处理, 由此产生的问题与所有批处理ETL相同: 其耦合了数据流到数据仓库流程调度.
在Linkedin中, Kafka作为中心的, 多订阅者事件日志用于事件数据处理, 并定义了数百个事件类型, 每个类型负责捕获特定属性的特定行为, 包括page view(页面视图), ad impression(广告展示), search(搜索), service invocation(服务调用), 和application exception(应用异常).
为进一步理解其优点, 假设一个场景: 在职位页面显示一个职位发布. 职位页面应仅包含显示职位所需的逻辑. 然而在动态网站中, 很容易变成与显示职位无关的额外逻辑. 例如, 我们对以下系统进行集成:

  • 发送数据到Hadoop和数据仓库中, 用于离线数据处理
  • 记录浏览计数, 确保用户没有爬取网站
  • 将展示职位加入到职位发布者的分析页面中
  • 记录用户浏览记录, 推送合适的职位(不要向用户展示相同职位)
  • 推荐系统可能需要用户浏览记录, 以跟踪每个职位的热度

由此可见, 展示一个职位变得很复杂. 并且, 职位显示还要支持不同终端, 如移动端. 当多个需求交织在一起时, 负责显示职位的工程师需要了解其他系统和功能, 并确保它们被正确集成.
Event-driven简化了这类问题, 职位显示页面只负责显示职位并记录显示的职位信息, 如职位相关属性, 页面浏览者, 以及其他信息; 推荐系统, 安全系统, 和数据仓库只需订阅数据, 并各自处理, 职位显示的代码无需关注其他系统, 也无需因为数据消费者的增加而作出修改.

2.6 Building a Scalable Log

publishersubscriber分离并不是新鲜事, 但如果想保留一个commit log(提交日志), 其充当多订阅者的实时日志, 负责记录发生的所有事, 那么scalability(可伸缩性)将成为一个挑战. 如果不能创建快速, 低成本, 可伸缩的日志来满足实际需求, 则无法将日志作为统一的集成机制.
人们普遍认为分布式日志是缓慢的, 重量级的; 但只要对大型数据流采用周到的实现, 是可以打破这种旧观念. Linkedin中, Kafka每天会写入超过600亿条不同消息, 如果算上数据中心之间的复制, 可能达到数千亿.
为支持这种数据规模, Kafka使用了一些技巧:

  1. 日志分片
  2. 通过批量读取和写入来优化吞吐量
  3. 避免无用的数据拷贝

为确保水平可拓展性, 我们对日志进行分片:
Partitioned Log

每个partition(分片)都是一个全序日志, 但不同分片之间没有一个全局顺序. 写入者决定了消息写入到哪个分片, 大部分用户以某种键值(如用户id)来决定分片. 追加日志时, 分片之间无需任何协调, 使得系统的吞吐量与Kafka集群大小成正比.
每个分片会有多个replica(副本), 副本数量由用户决定. 每个副本的内容相同, 任何时候都有一个副本作为leader; 当leader宕机时, 某个副本会接替其成为leader.
缺少跨分片的全局顺序是一个局限, 但其实问题不大: 日志的交互一般来源于成百上千个不同的进程, 因此, 全局顺序没有太大意义. 但每个分片必须是有序的, 换句话说, 消息发送者的发送顺序应与分片追加顺序相同.
日志与文件系统一样, 线性读写模式很容易优化. 日志可以把小的读写合成为大吞吐量操作. Kafka会积极地优化性能: 客户端发送数据, 磁盘写入, 服务器之间复制, 数据传递给消费者, 和数据提交确认都会做成batching(批处理).
最后, Kafka使用二进制格式维护内存日志, 磁盘日志, 和网络数据传输的数据, 使得我们能够利用许多优化, 如zero-copy data transfer(零拷贝数据传输).
这些优化使得Kafka可以以磁盘或网络支持的速率写入和读取数据, 即使数据集大小已远超内存大小.

3. Logs & Real-time Stream Processing

到目前为止, 我们只讲述了系统之间拷贝数据的理想机制; 但在存储系统之间传输数据并不是最终目的. 事实证明, "日志"是stream的另一种说法, 日志也是stream processing(流处理)的核心. 但什么是流处理?
流处理与SQL无关, 也不局限于实时流处理. 这里我们把流处理视为一个更广泛的概念: 持续数据流处理的基础设施. computational model(计算模型)可以像MapReduce或其他分布式处理框架一样通用, 但具有产生低延迟结果的能力.
处理模型的真正驱动力是数据收集方式, 批量手机的数据自然是批处理的, 当数据可以被连续收集, 也就能连续处理.
美国的人口统计是一个很好的例子. 早期会挨家挨户地统计公民信息, 在1790年时这么做没有问题, 人们会骑行到周边居民, 并在纸上写下记录, 然后将记录运送到一个中心位置, 以便人们统计. 但到了现在, 人们会保留出生和死亡记录, 这样就能算出人口的变动, 以任何所需的粒度产生人口计数.
即使到了现在, 许多数据传输过程仍依赖于定期转储, 批量传输, 和数据集成. 处理批量转储的唯一方法就是批处理. 但随着批处理被持续数据输入所取代, 人们开始向持续处理转变, 以平滑地使用处理资源并减少延迟.
Linkedin中几乎没有批量数据收集, 大部分数据要么是活动数据, 要么是数据库变更, 两者都是持续发生的. 实际上, 任何商业业务的底层机制都是不间断的处理, 正如Jack Bauer所说, 事件的发生是实时的. 当数据以成批方式收集, 几乎归咎于这些原因: 有一些人为的步骤, 缺少数字化, 有一些古董流程不能自动化. 使用邮件或人工方式传输和处理数据是非常缓慢的, 一开始转向自动化时会不自觉地保留原先的流程, 因此这种情况会持续很长时间.
每日运行的批处理作业可模仿为窗口大小为一天的连续计算. 当然, 底层数据其实总在变化着, Linkedin中, 这样的做法十分普遍, 以至于实现了一整套框架来管理Hadoop工作流.
由此看来, 对于流处理, 我们可得出不同观点: 其处理的是包含时间概念的底层数据, 而不是静态的数据快照, 因此应以用户可控的频率来生产输出, 而不是等待数据都到达后再生产输出. 从这个角度来看, 流处理是广义上的批处理, 随着实时数据的流行, 流处理会是很重要的处理方式.
那么为什么流处理之前未被广泛使用? 最大原因是缺少实时数据收集, 使得持续处理只能是一个学术概念. 换句话说, 是否拥有实时数据收集决定了流处理系统的命运. 金融领域是一个反例: 实时数据流已经标准化, 但流处理成为瓶颈.
流处理的实际应用能力是相当广阔的, 其填补了数据请求/响应服务离线批量处理之间的缺口. 对于现代互联网公司, 25%的代码可划分到这个情况.

3.1 Data flow graphs

DAG

流处理最有趣的一点为: 其与流处理系统的内部组织无关, 但需要拓展一个概念: data feed(数据输入)是什么. 我们主要讨论了primary feed(原始输入)或logs of primary data(原始数据的日志), 换句话说, 执行各种程序时产生的事件和数据. 但流处理包含某些feeds计算产生的feeds. 在消费者看来, 这些派生的feeds和原始feeds没有差别, 这些派生feeds可以任何方式封装组合.
更深入理解这个问题, 流处理作业是从日志中读取数据, 并将输出写入到日志或其他系统. 用于输入和输出的日志将这些处理系统连接成一个不同处理阶段的图表. 事实上, 使用中心化日志可将数据收集, 转换, 和工作流看作一系列写入日志和处理过程.
日志有两重目的:

  • 日志让各个数据集可以有多个订阅者, 并有序使用. 这里的有序性非常重要, 如果对同一记录的两次更新排序不同, 则会产生错误输出. 这里的有序性要强于TCP提供的有序: 即使流程处理失败或重传也能保持有序.
  • 日志提供了处理流程的缓冲. 若多个处理并行进行, 若上游的数据生成速度快于下游的数据消费速度, 会有三种选择: 阻塞上游的生产者, 缓存数据, 或丢弃数据. 丢弃数据和阻塞进程都不是很好的选择, 日志可作为一个缓冲, 即使某个进程重启, 也不会影响其他进程的处理速度. 当拓展数据流到一个更庞大的组织时, 这种隔离性十分重要, 整个流程不能因某个作业发生错误而中止.

3.2 Stateful Real-Time Processing

一些实时流处理只是无状态的单次记录转换, 但更多情况下, 流处理需要在某个时间段内进行复杂的count(计数), aggregation(聚合), 和join(关联)操作. 例如, 一个用户点击的事件流需要附加做出操作的用户信息, 实际上是将事件关联到用户的账户数据库. 这类流程最终要处理者维护一些状态信息: 比如在计算一个计数时, 需要维护一个计数器, 但处理者随时可能挂掉, 如何维护正确的状态?
最简单的方案是将状态保存在内存中, 若服务器崩溃, 则会丢失状态, 并回退到日志中的开始时间点上. 但如果时间窗口是一小时这么长, 这种方法可能不可信.
另一方案是通过网络将所有状态保存在远程存储系统中, 但问题在于, 没有数据局部性, 且产生很多网络数据往返.