logo头像

Always believe youself.

DeltaLake架构

Delta 架构

Delta 架构介绍

Delta 架构代替经典的 Lambda 架构

image

  • 第一,同时读写,并且要保证数据的一致性

    • 就是刚才我们提出的第一个需求,就是要支持 transcation,就是说你只要能实现读写之间的 Snapshot isolation 就行了,这样你可以集中在你的 data flow,而不用担心会不会读到部分结果,不用担心 FileNotFound 的这类错误,这些事情 Delta Lake 都可以帮你处理。

    • Delta Lake 提供了流,就是 streaming 和 batch 的读入和写入,标准 API,很容易实现,很容易去用。你可以在文档里面找到具体的 API。

  • 可以高吞吐从大表读取数据

可能处理过大数据的同学们就遇到过这个经典痛点,我也处理过客户的这种问题好多次,在 没有 Delta Lake 的时候,简直痛不欲生。

- 如果没有 Delta Lake,读取百万级的 patition 的 location path 是需要用 Hive metastore 一行行地读的,要取一百万行简直是奇慢无比。然后,在每个 patition 的 地址里还需要通过文件系统 列里面包含的所有文件。这在对象存储的系统里,这种操作也是又贵又慢。

- 其实这个问题不又是一个典型的大数据问题吗?大数据系统都解决不了大数据问题,那不是贻笑大方?

- 当然,这里的解决方案很简单,就是标准的 Spark,用 parquet 去存 file path,并且用 Spark 的分布式的向量化的读入去读,这就是 Delta Lake 怎么去解决之前的痛点。我们客户因为这个性能轻松地提高了几百倍甚至几千倍。其实也就是因为 Hive metastore 和文件系统的 list file 操作实在太慢了。
  • 支持回滚和删改

    • 数据这么脏,回滚和删改需求难以避免。Delta Lake 提供了 Time travel,因为 transaction log 实际能看到整个历史变化的结果,所以 Delta Lake 实现这个很方便。我们提供了两条 API,你可以基于 Timestamp 去做, 也可以基于 version number。Time travel 是一个特别好的功能,它可以做很多事情,不单单是纠错,你还可以 Debug,重建过往报告,查账,审计,复杂的 temporal query,对快速更新数据的表做版本查询……

    • Delta Lake 还支持删改(update/delete/merge),不过目前 Delta 还没有自己的 SQL 语法,当然我们可以把 Spark 的语法完全复制过来,但是维护成本也很高。但 Spark 3.0 来了之后这个问题就迎刃而解了。当然,如果要支持 Spark 2.4 的话,Delta 需要加上自己的 SQL parser,我们还在讨论要不要这样干。

  • 在线业务不下线的同时可以重新处理历史数据

    • 你只要对 Delta Lake 做相关结果的删除,重新改一下业务逻辑,历史数据再做批处理,你就可以得到你的最新结果了。与此同时,因为 Delta Lake 支持 ACID,数据的下游适用方还可以同时访问之前版本的数据。
  • 处理迟到数据而无需推迟下阶段的数据处理

    • 处理迟到数据也不是什么问题,只要你能支持 merge,如果存在就 update,不存在就 insert,不影响你现有的 Delta Lake 重写。

Delta Lake 完美解决了我们的需求,让大家的 Data pipeline 重新变得简单而优雅,而不需要用那么复杂的 Lambda 架构了。

image

怎么最好地使用 Delta 架构 ?基于跟客户的各种的讨论经验,我们总结出了下面几点。

你需要有多个 stage 的 Delta Lake。

我们的基本 idea 是这样的:

  • 第一个 stage 就是你要保证没有原始数据损失。它保存在 Delta Lake 里,万一哪天发现之前的一些数据清理导致丢失了很重要的信息,你还可以轻松恢复。
  • 第二个 stage 就是做数据清理,做一些清理、转换、filter。然后才真正达到一个可以被数据分析的第三个 stage。这是基于数据质量分成多个级别,多个状态。至于实际生产线上需要多少个 stage,这个取决于业务的复杂度,SLA,和对延迟的要求。

Delta 架构的特性

持续数据流

image

批流合并。Streaming 和 batch 用同一个 engine,不用维护多个;同一套 API,甚至都不用 batch 的 API,就用 streaming 的 API 就能解决问题;同样的 user code,无需用到 Lambda 架构,纯粹就是一条 pipeline 解决所有问题。高效增量数据载入。如果不断有新数据进来就直接用 Structured Streaming 的 Trigger.Once 去记录上一次你处理到哪,你只需要重启这个 Trigger.Once,就处理了上次之后的新数据, 特别方便。快速无延迟的流处理,你可以选择不同的 Trigger 的模式,当然 Trigger.Once 最省钱,当然你也可以低延迟,比如多长时间 Trigger 一次,也可以低延迟用持续 Trigger。你可以把批处理变成一个持续流处理,简单易用。而且 Delta Lake 因为支持原子性,所以它能保证 exactly once,这一点很重要,其他的数据源基本没办法保证。

物化中间结果

这一点就有点颠覆传统模式了。我们建议多次物化你的中间结果,也就是之前说的多个 stage。每个 stage 就是把中间结果落地存在文件里,它有以下好处。

容错恢复,出问题后可以回到某一个版本,从那个时候再开始,你不需要从最原始的数据开始,这点在 pipeline 里是很重要的事情。方便故障排查,你知道哪一步出错了,要是不存,业务方报告出错的时候你也不知道问题出在哪儿,连 debug 都没法 debug,回溯都没办法回溯。一写多读,当你的 pipeline 很多很复杂的时候,可能重用中间的一些结果,这真的很方便。这里面比如说图例的两个 pipeline ,其实到 T3 之前,都是一样的。我们就可以复用。

image

如果你的转换很复杂的时候,可以物化多次。到底物化多少次,取决于你对 Reliability/SLA 和 end-2-end latency 的取舍,你要是 Reliability/SLA 好,你就必须要物化多几次,但是写肯定有代价,所以 end-2-end latency 就慢,具体就要看你的需求了。

image

费用和延迟的取舍

流处理,持续的数据流入和处理,无需作业调度管理,需要永远在线的 cluster。频繁的批处理,分钟级数据流入和处理,不需要低延迟,比如半小时就可以了,需要 warm pool of machine,无事关机,按需启动。可使用 Spark structured streaming 的 Trigger.Once 模式。非频繁批处理,若干小时或若干天的数据批流入和处理,无事关机,按需启动,也可使用 structured streaming 的 Trigger.Once 模式。这样一来,就可以节省很多资源了。

优化数据的物理存储

根据常用查询的 predicate,为改善读取速度,可优化数据的物理存储。比如,用 partitioning 和 z-ordering。Partitioning 大家都应该很清楚了,low cardinality 的 column 比较合适,就是每个 partition 不要超过 1 GB,一般比如说用 date 这是一种经常被使用的 partition column,每个 date 里面要给予不同的 eventType。这样,每个 partition 不会太大,也不会产生太多 partition。反之如果用 timestamp 做 partition column,产生的 partition value 就是无数个,简直奇葩无比,可以轻松把 Hive metastore 给撑爆。在 Delta Lake 里面我们也不建议,即使我们不用 metastore。第二就是 Z-Ordering,这个还没到开源的版本,但是这个是可以解决什么问题呢,就是是针对那种 high cardinality,就是 column 里有大量的不一样的 value,这种就适合做 z-ordering index。

image

重新处理历史数据

每次 keep 住上一个 stage 的好处是什么?你把结果一删,重新用 Tigger.Once 再做一次就好了,结果就出来了。如果你系统部署在云上,那对你来说也很简单,你如果要快速回填,你就再多加几台机器,结果就更快地出来了。比如,从原来的十台机器扩张到一百台。

数据质量的调整

image

保证数据完整性。schema 可以选择自动合并,就可以避免数据的丢失。到了最后阶段,我们就需要去强制 schema 不能变,data type 不能变,data expectation 也不能。比如,不能有 NULL。数据质量对于数据分析的准确度是至关重要的。

关键特性

image

Delta 架构的优点

1)减少端到端的 pipeline SLA 多个使用单位(客户)把 data pipeline 的 SLA 从几小时减少到几分钟。

2)减少 pipeline 的维护成本原来的 Lambda 架构简直就是费时费力。要同样达到分钟级的用例延迟,Delta Lake 架构并不需要这么复杂。

3)更容易的处理数据更新和删除简化了 Change data capture,GDPR,Sessionization,数据去冗。这些都可以用 Delta Lake 去实现,方便很多。

4)通过计算和存储的分离和可弹缩而降低了 infrastructure 的费用多个使用单位将 infrastructure 的费用降低了超过十倍。

Delta 架构的经典案例