当前位置:技术分享 > 技术参考 > 正文

为了使用好 Apache Flink,Yelp 实现了一个连接算法2019-01-18 11:22:30 | 编辑:hely | 查看: | 评论:0

摘要 在 Yelp,大家生成了大量高吞吐量的数据流,包括日志、业务数据和应用程序数据。大家需要对这些数据流进行连接、过滤、聚合,有时候甚至需要进行快速转换。

 

摘要

在 Yelp,大家生成了大量高吞吐量的数据流,包括日志、业务数据和应用程序数据。大家需要对这些数据流进行连接、过滤、聚合,有时候甚至需要进行快速转换。为了实现这一过程,工程团队投入了大量时间来分析多个流式处理框架,最终确定 Apache Flink 是最佳选择。大家现在使用 Flink 实现了一个连接算法,大家称之为“Joinery”。它可以针对两个或两个以上基于键的数据流实行非时间窗口的一对一、一对多和多对多内部连接。

那么它的工作原理是什么?简单地说,就是开发人员提供用于描述所需连接的配置文件,Joinery 服务负责实行并输出连接过的基于键的结果流。

背景:大家要解决什么问题?

自从流式管道出现以来,流和表之间的差距已经大大缩小。流式管道允许对高吞吐量数据流实行计算密集型的数据操作,如连接、过滤和聚合。虽然大多数流式管道支撑基于时间窗口的连接,但在很多情况下也需要进行非时间窗口的连接。

Salesforce 就有这样的需求。Salesforce 是大家在 Yelp 使用的一个下游数据存储,为销售团队提供支撑。它包含了平台的业务数据,例如购买的广告包和业务所有者的资料。数据被存储在关系数据库中,但这些数据是非规范化的,以便在销售人员需要即时访问数据时(例如在向客户推销时)可以避免耗时的实时联接操作。

为了支撑这个用例,大家实现了一个实时流连接器,用于连接多个数据流,并将关系数据库中的规范化表呈现为流,存入 Salesforce 的非规范化表中。在下图中,每个入站流表示关系数据库中的一张表。流连接器消费这些入站流中的消息,并基于消息键创建完全连接的消息,再将结果写到出站流中。例如,在下面的流连接器中,用于连接消息的消息键是 business-id,它是业务和广告表的主键以及业务所有者表的外键。

 

 

之前的方法

从历史上看,Yelp 工程团队已经构建了 Paastorm 来解决类似的问题。但是,当数据集增长到数十 GB 时,Paastorm 带来了更高的维护成本。另一个问题是它们不是为有状态应用程序而设计的,因此使用 Paastorm 作为有状态解决方案意味着必须从头开始实施状态管理。例如,一个将结果上传到 Salesforce 的 spolt 保存了数千万条消息,一旦发生崩溃,需要花费几个小时来恢复!这将导致整个管道出现严重的延迟,并需要人工干预,最终导致工程生产力下降。

这种场景要求任何用于连接无界流的方法都必须具备可扩展性和容错能力。

一个连接算法?

基于大家过去在构建数据管道和聚合方面的经验,大家开发出了以下的连接算法:

 

 

算法:

根据消息键将消息打散或排列到等值连接(equi-join)分区中。

将消息插入到相应的 multi-map 哈希表中。

通过获取所有 multi-map 的笛卡尔积来构造输出。

过滤、投射并输出结果。

上述算法可归纳为三个关键部分:

更新阶段;
连接阶段;
投射(Projection)阶段。

让大家更详细地先容这些阶段。

更新阶段

对于每个输入,算法会创建一个哈希表,然后将消息与键映射起来。对于每个新传入的消息,大家会检查消息类型(类似于 MySQL LogType——log、create、update、delete),并将 create/update/delete 消息分别加入到对应的哈希表中。

连接阶段

接下来,大家会探测上述的哈希表,以便生成所有消息的连接结果。这将生成所有可能的排列。然后,经过连接的消息被发布到目标流中。请注意,只有当入站消息具有相同的键时,连接的消息才会被发布到目标流中。这个算法的连接阶段实行的是内连接。

投射阶段

在创建输出消息期间,可以使用别名来投射输出流中的字段,以防止命名冲突。如果下游消费者不需要字段,也可以完全将字段删除。

这个算法仅适用于基于键的压缩日志型数据流。使用日志压缩型数据流可防止出现无限制的增长,并确保消费者应用程序至少可以保留 Kafka 分区中每个消息的最后一个已知值。这些约束意味着这个算法适用于数据变更日志流,而不是常规日志流。

在下图中,左侧表示输入流,消息来自不同的输入源。这张图描绘了输入流的笛卡尔积。在连接阶段,大家实行流聚合,当检测到输入源中具有相同键(在此示例中为 id)的记录时,聚合操作会生成一个元组。换句话说,算法会检查输入流中的键是否在所有哈希表(流)中具有映射,如果有,就进入到投射阶段。

 

 

下图说明了算法是如何生成记录的:

 

 

这很酷,但内存占用是怎样的?

由于 Joinery 实行的是无界流的连接,因此其内部状态可能会变得非常大。维护巨大的内存状态是很昂贵的,而且无法进行快速的恢复。为了缓解这种情况,Joinery 为数据流中的数据分配了键,这样有助于跨节点分配内存,但仍然无法阻止状态大小超出节点的总可用堆内存(这可能会导致 OOM 错误)。因此,大家需要一种方法将数据写到磁盘上,同时保持相对较低的内存占用。

通过利用 Flink 的增量检查点,大家可以将应用程序状态保存到外部存储。这样可以减少内存占用量,并且可以在几分钟内实现更快的恢复(与大家的 spolt 相比)。

一个端到端的例子

 

 

大家通过一个假设的场景来演示 Joinery 是如何连接两个流的:用户评论(user review)和业务(business)。

user review:
- biz_id
- content
- review_id
- user_id

用户评论流

business:
- business_id
- name
- address
- state

业务流

大家想要根据业务 ID 连接上述两个流,并生成一个输出流。Joinery 配置如下:

join:
- schema_id: 12345
join_keys: [biz_id]
exclude_fields: [content, review_id]
- schema_id: 23143
join_keys: [business_id]
aliases:
- from: business_id
to: biz_id
exclude_fields: [address, name]
output:
namespace: joinery_example
source: business_review_join
Doc: Join of business table and review table
pkey:
- business_id

Joinery 配置

上面的配置要求 Joinery 根据 biz_id 键来连接两个流。这里需要注意的是,即使两个流中都没有相同的键,大家也可以使用别名来映射键(类似于传统的 SQL 别名)。

 

 

未来的工作

大家现在面临的并希翼在未来解决的主要挑战之一是在升级和状态迁移期间保持数据的完整性。部署在生产环境中的流式应用程序应该具备强壮的弹性,并且能够快速进行状态恢复。

对 Joinery 这样的应用程序进行黑盒测试和审计是很难的。Yelp 已经开发了像 pqctl(自定义 docker compose 环境)这样的工具,可以帮助基础设施团队进行可重复的简单单元测试。借助这个工具和大量的验收测试套件,大家希翼能够测试到更多的端到端连接场景。其中一些正在进行中,但仍有很多工作要做,以确保大家可以在应用程序重启后验证状态,特别是在升级 Joinery 版本时。

附件:MJoin 算法(http://www.vldb.org/conf/2003/papers/S10P01.pdf)

英文原文:https://engineeringblog.yelp.com/2018/12/joinery-a-tale-of-unwindowed-joins.html

分享到:0收藏

上一篇:Spark+Alluxio性能调优十大技巧 Spark SQL | Spark,从入门到精通下一篇:

公众平台

搜索"raincent"或扫描下面的二维码

新浪微博 Tencent微博 订阅中心
?