logo头像

Always believe youself.

DeltaLake入门

Delta Lake,让你从复杂的 Lambda 架构中解放出来


方案升级过程

方案一

建设一个基本的 Data Pipeline [数据处理流水线],按照项目经理的说法,那就很简单。我们把 Kafka、Kinesis、各种各样数据湖的格式用 Spark 读出来,再用 Spark 做一些数据清理和数据转换,然后再把结果存到一个数据湖,再用另一个 Spark job 把数据湖的内容分析一下,做训练或者做各种各样的分析,最后产生一个报告给终端用户。

这是一个非常简单的 Pipeline。但是这个 Pipeline 有个头痛的问题。如果仅仅用 Spark 的批处理,那么延迟可能不达标,而且也不是在做增量处理。

image

方案二

用 Spark Structured Streaming。Structured Streaming 有 Trigger Once,可以帮你记录上次处理到什么地方,这样的话可以把延迟降低,只处理增量,你也不需要去记录和管理上次处理到哪里了。可是我们又遇到了一个新的问题,就是你如果用 Structured Streaming,每个小的 Batch 都会产生多个小的 Spark 的结果文件。小文件越来越多,整个 Pipeline 就越来越慢,延迟往往到了最后就无法接受了。

方案三

既然有小文件,就得定期去做压缩。但是在做压缩的过程中整个作业线会下线。为什么?由于缺乏原子性读写的能力,没办法在写你的压缩的时候同时读数据。压缩的周期太长也会影响到你的生产最后报表的时效性。比如说,业务是不能接受半小时或者一个小时这种延迟的。那么,这个时候,大家自然而然会选择最经典的架构,Lambda 架构。就是说,你同时可以部署一个批处理的和一个流处理的,批可以慢一点,但是结果全面准确,而流处理就是用最快的时间对最新增量产生结果。然后将批和流的结果汇总,产生一个全局的结果。

方案四

Lambda 架构需要同时运营两个不同的 pipeline,并且额外资源消耗也大幅增多,运营的人力和资源成本都大幅提高。

image

方案五

对这两个 pipeline 都需要做验证。尤其是当数据来源于非结构数据的数据源,数据不是特别干净和一致。

image

方案六

对于验证发现的错误,我们又不希望将 Pipeline 给宕下来,而是希望它自动去修复。那么,一种解决方案就是避免对全表做修正,而是对某些分区重新处理。数据的重新处理一般都会影响你整个 pipeline 的延迟,而且还进一步增加硬件资源的负荷和 pipeline 的复杂度。

image

方案七

image

也许会有一些业务上的调整,或者是诸多原因,你可能想把数据湖做一些 update 和 merge。由于当前数据湖不支持 update 和 delete,那么你可能需要自己实现 update 和 merge。我们发现不同用户的实现方法都不太一样,简直就是各显神通,这些方案不但容易出错,复杂度和延迟也很高,而且大多数情况还不通用

方案八

image

最开始第一个方案实际上是很简单,很优美。那它到底哪里错了?是什么原因导致它最后变得这么复杂?我们缺了什么?如何可以简化来产生一个简单易维护的架构?

这里我们列出了五点原因:

  • 第一,要支持同时读写,就意味着你写的时候还可以读,不应该读到一个错误的结果。同时还可以支持多个写,且能保证数据的一致性;

  • 第二,可以高吞吐地从大表读取数据。大数据方案不能有诸多限制,比如,我听说有些方案里最多只可以支持几个并发读,或者读的文件太多了就不让你提交作业了。如果这样,对业务方来说,你的整个设计是不满足他的需求的;

  • 第三,错误是无可避免,你要可以支持回滚,可以重做,或者可以删改这个结果,不能为了支持删改而要求业务方去做业务逻辑的调整;

  • 第四,在重新改变业务逻辑的时候要对数据做重新处理,这个时候,业务是不能下线的。在数据被重新处理完成之前,数据湖的数据是要一直可被访问的;

  • 第五,因为有诸多原因,数据可能会有晚到的情况,你要能处理迟到数据而不推迟下阶段的数据处理。

基于以上五点,我们基于 Delta Lake 和 Structured Streaming 产生了一个新的架构,叫 Delta 架构,它是对 Lambda 架构的一种颠覆,或者称为一种提升。

image

在 Delta 架构下,批流是合并的,并且要持续的进行数据处理,按需来重新处理历史数据,并且利用公有或私有云的特性来对计算或者存储资源按需分别做弹性扩展。

Delta Lake

基本原理

image

Delta Lake 的基本原理其实很简单,简单得令人发指。作为一个普通的 Partquet 一般就是 Partition Directories 再加一些 Data Files。Delta Lake 也是基于这个结构的,唯一的区别就是它有一个 Transaction Log 记录你的 Table Version 和变更历史。

image

现在,让我们来重新看待什么构成了一张表。表实际上是一堆操作的结果,比如说改变元数据,改变名字,改变 Schema,增加或删除一些 Partitioning,还有另外一种操作是添加或者移除文件。所有表的当前状态或者是结果,都是这一系列 Action 产生的结果。这个结果包含了当前的 元数据,文件列表,transaction 的历史,还有版本信息。

image

那怎么去实现这个原子性?也很简单,只要保证 Commit File 的顺序和原子性就可以了。

比如说表的第一个版本,它是增加两个文件,第二个版本就是把这两个文件删掉,增加一个新的文件,作为 Reader 来说,每次只能看到当前已经 Commit 的结果。

怎么实现多个写入的并发?Spark 的 Pipeline 一般都是高并发读,低并发写。在这种情况下,乐观并发就更加合适了。它实际上很简单,就说你多个用户读的时候,先记录一下当前读用的 data 版本是什么,如果同时有两个人都在 commit,只有一方可以成功,而另一方就需要去看一下成功方之前的 commit 里有没有碰他读的文件。如果没有改,他就改一下文件名就行了,如果改了,那就得重做。这个可以是 Delta Lake 自动去重试,也可以是事务提交方 / 业务方,去重做。

Delta Lake 需要解决的另一个经典问题就是大规模元数据的处理。你发现你有大量的 commit log file,因为每次 commit 都会产生一个文件,这其实也是一个经典的小文件处理。如何解决这种元数据处理?标准答案就是使用 Spark。Delta Lake 便是使用 Spark 去处理它的元数据。比如刚才说了一个例子,加了两个文件,减了两个文件,之后加了一个 parquet,之后 Spark 会把这些 commit 全部读下来,产生一个新的,我们称之为叫 Checkpoint。

这就是 Delta Lake,就是这么简单。