是什么会迫使像 Lamoda 这样拥有简化流程和数十项相互关联的服务的大公司显着改变其方法? 动机可能完全不同:从立法到所有程序员固有的实验愿望。
但这并不意味着您不能指望额外的好处。 Sergey Zaika 将告诉您,如果您在 Kafka 上实现事件驱动的 API,您到底能赢得什么(
免责声明:本文基于 Sergey 于 2018 年 XNUMX 月在 HighLoad++ 上举办的一次聚会的材料。 Lamoda 与 Kafka 合作的现场体验吸引了听众,其程度不亚于日程上的其他报道。 我们认为这是一个很好的例子,说明您可以而且应该始终找到志同道合的人,HighLoad++ 的组织者将继续努力营造有利于这一点的氛围。
关于流程
Lamoda 是一个大型电子商务平台,拥有自己的联络中心、送货服务(以及许多附属机构)、照相馆、巨大的仓库,所有这些都运行在自己的软件上。 支付方式有数十种,B2B 合作伙伴可能会使用其中部分或全部服务,并希望了解其产品的最新信息。 此外,Lamoda 在俄罗斯联邦以外的三个国家开展业务,那里的一切都有点不同。 总共可能有一百多种配置新订单的方法,必须以自己的方式进行处理。 所有这一切都需要数十种服务的帮助,这些服务有时会以不明显的方式进行通信。 还有一个中央系统,其主要职责是订单状态。 我们叫她 BOB,我和她一起工作。
具有事件驱动 API 的退款工具
事件驱动这个词已经很陈词滥调了;稍后我们将更详细地定义它的含义。 我将从我们决定在 Kafka 中尝试事件驱动的 API 方法的背景开始。
在任何商店里,除了顾客付款的订单外,有时也会因为产品不适合顾客而被要求退货。 这是一个相对较短的过程:如有必要,我们会澄清信息并转账。
但由于立法的变化,退货变得更加复杂,我们必须为其实现单独的微服务。
我们的动机:
- 法FZ-54 - 简而言之,法律要求在几分钟的相当短的 SLA 内向税务局报告每笔货币交易,无论是申报表还是收据。 我们作为一家电子商务公司,开展了相当多的业务。 从技术上讲,这意味着新的责任(因此也意味着新的服务)和所有相关系统的改进。
- 鲍勃分裂 是公司内部项目,旨在将BOB从大量非核心职责中解放出来,降低其整体复杂性。
该图显示了主要的 Lamoda 系统。 现在大部分都比较 围绕不断缩小的整体架构的 5-10 个微服务群。 它们正在缓慢增长,但我们正在努力使它们变小,因为部署中间选择的片段是可怕的 - 我们不能让它掉落。 我们被迫保留所有交易所(箭头),并考虑到其中任何一个交易所可能无法使用的事实。
BOB也有相当多的交易所:支付系统、配送系统、通知系统等。
从技术上讲,BOB 是:
- ~150k 行代码 + ~100k 行测试;
- php7.2 + Zend 1 和 Symfony 组件 3;
- >100 个 API 和约 50 个出站集成;
- 4个国家有自己的商业逻辑。
部署 BOB 既昂贵又痛苦,其代码量和解决的问题之多,没有人能把它们全部记在脑子里。 一般来说,简化它的原因有很多。
退货流程
最初,该过程涉及两个系统:BOB 和支付。 现在又出现了两个:
- 财政化服务,将解决财政化以及与外部服务沟通的问题。
- 退款工具,仅包含新的交易所,以免导致 BOB 膨胀。
现在该过程如下所示:
- BOB 收到退款请求。
- BOB 谈论这个退款工具。
- 退款工具告诉付款:“退还钱。”
- 付款返还钱。
- 退款工具和 BOB 相互同步状态,因为目前他们都需要它。 我们还没有准备好完全切换到退款工具,因为 BOB 有用户界面、会计报告以及一般情况下无法轻松传输的大量数据。 你必须坐在两把椅子上。
- 财政化的要求消失了。
于是,我们在 Kafka 上做了一种事件总线——event-bus,一切都从它开始。 万岁,现在我们遇到了单点故障(讽刺)。
优点和缺点都非常明显。 我们制造了一辆公共汽车,这意味着现在所有的服务都依赖于它。 这简化了设计,但会在系统中引入单点故障。 Kafka 将崩溃,进程将停止。
什么是事件驱动 API
Martin Fowler 的报告对此问题给出了一个很好的答案(GOTO 2017)
简单来说我们做了什么:
- 通过以下方式封装所有异步交换 事件存储。 我们不是通过网络通知每个感兴趣的消费者有关状态更改的信息,而是将有关状态更改的事件写入集中存储,并对主题感兴趣的消费者读取从那里出现的所有内容。
- 本例中的事件是通知 (通知)某处发生了变化。 例如,订单状态发生变化。 如果消费者对通知中未包含的伴随状态变化的某些数据感兴趣,则可以自行了解其状态。
- 最大的选择是成熟的事件溯源, 状态转移,其中事件包含处理所需的所有信息:它来自哪里、它进入什么状态、数据到底如何更改等。唯一的问题是可行性以及您可以存储的信息量。
作为退款工具启动的一部分,我们使用了第三种选项。 这简化了事件处理,因为不需要提取详细信息,而且它消除了每个新事件都会生成来自消费者的大量澄清获取请求的情况。
退款工具服务 未加载,所以卡夫卡的存在更多的是笔的品味而不是必需品。 我不认为如果退款服务成为一个高负荷的项目,生意会很高兴。
异步交换按原样
对于异步交换,PHP部门通常使用RabbitMQ。 我们收集请求的数据,将其放入队列中,同一服务的使用者读取它并发送它(或不发送它)。 对于 API 本身,Lamoda 积极使用 Swagger。 我们设计一个 API,用 Swagger 描述它,并生成客户端和服务器代码。 我们还使用了稍微增强的 JSON RPC 2.0。
在某些地方使用 ESB 总线,有些地方使用 activeMQ,但是一般来说, RabbitMQ - 标准.
异步交换即将到来
当通过事件总线设计交换时,可以进行类比。 我们同样通过事件结构描述来描述未来的数据交换。 yaml 格式,我们必须自己进行代码生成,生成器根据规范创建 DTO,并指导客户端和服务器使用它们。 一代人进入两种语言- golang 和 php。 这有助于保持库的一致性。 该生成器是用 golang 编写的,这就是它被称为 gogi 的原因。
Kafka 上的事件溯源是一个典型的事情。 Kafka Confluence主力企业版有一个解决方案,有
讽刺的是,即使在这样一个令人愉快的案例中,当有一个大致相似的业务,Zalando,它提出了大致相似的解决方案时,我们却无法有效地使用它。
启动时的架构模式如下:我们直接从 Kafka 读取,但仅通过 events-bus 写入。 Kafka 中有很多内容可供阅读:经纪人、平衡器,并且它或多或少已经为水平扩展做好了准备,我想保留这一点。 我们希望通过一个网关(又名事件总线)完成录制,原因如下。
活动巴士
或者活动巴士。 这只是一个无状态的 http 网关,它承担几个重要的角色:
- 生产验证 — 我们检查活动是否符合我们的规格。
- 事件主控系统,也就是说,这是公司中主要且唯一的系统,它回答了哪些事件和哪些结构被认为是有效的问题。 验证仅涉及数据类型和枚举来严格指定内容。
- 哈希函数 对于分片 - Kafka 消息结构是键值对,并使用键的哈希值来计算将其放在哪里。
为什么
我们在一家流程精简的大公司工作。 为什么要改变什么? 这是一个实验,我们预计会获得一些好处。
1:n+1交换(一对多)
Kafka 使得将新消费者连接到 API 变得非常容易。
假设您有一个目录,需要同时在多个系统(以及一些新系统)中保持最新。 之前,我们发明了一个实现 set-API 的包,并且主系统被告知消费者地址。 现在主系统发送该主题的更新,每个感兴趣的人都可以阅读。 一个新系统出现了 - 我们为该主题注册了它。 是的,也可以捆绑,但更简单。
以refund-tool为例,它是BOB的一部分,我们可以方便地通过Kafka保持它们的同步。 Payment 说钱已退回:BOB、RT 发现了此事,更改了状态,Fiscalization Service 发现了此事并签发了支票。
我们计划创建一个统一的通知服务,通知客户有关其订单/退货的新闻。 现在这个责任分散在系统之间。 对于我们来说,教通知服务从 Kafka 捕获相关信息并对其做出响应(并在其他系统中禁用这些通知)就足够了。 不需要新的直接交换。
数据驱动
系统之间的信息变得透明——无论您拥有什么样的“血腥企业”,也无论您的积压工作有多么丰富。 Lamoda 拥有一个数据分析部门,负责从系统收集数据并将其转化为可重复使用的形式,用于业务和智能系统。 Kafka 允许您快速向他们提供大量数据并保持信息流最新。
复制日志
消息在被读取后不会消失,就像在 RabbitMQ 中一样。 当事件包含足够的信息进行处理时,我们就有了该对象最近更改的历史记录,并且如果需要,还可以应用这些更改。
复制日志的存储周期取决于该主题的写入强度;Kafka允许您灵活设置存储时间和数据量的限制。 对于密集型主题,重要的是所有消费者都有时间在信息消失之前阅读该信息,即使是在短期无法操作的情况下。 通常可以存储数据 天的单位,这足以提供支持。
接下来稍微复述一下文档,给那些不熟悉Kafka的人(图片也来自文档)
AMQP 有队列:我们将消息写入消费者的队列。 通常,一个队列由具有相同业务逻辑的一个系统处理。 如果您需要通知多个系统,您可以教应用程序写入多个队列或使用扇出机制配置交换,扇出机制会克隆它们本身。
Kafka也有类似的抽象 主题,您可以在其中写入消息,但它们在阅读后不会消失。 默认情况下,当您连接到 Kafka 时,您会收到所有消息,并可以选择保存到上次中断的位置。 也就是说,您按顺序阅读,您可能不会将消息标记为已读,而是保存 id,然后您可以从中继续阅读。 你确定的Id叫做offset,机制就是commit offset。
因此,可以实现不同的逻辑。 例如,我们在不同国家/地区的 4 个实例中拥有 BOB - Lamoda 在俄罗斯、哈萨克斯坦、乌克兰、白俄罗斯。 由于它们是单独部署的,因此它们的配置和业务逻辑略有不同。 我们在消息中指出它指的是哪个国家。 每个国家的每个 BOB 消费者都使用不同的 groupId 进行读取,如果该消息不适用于他们,他们就会跳过它,即立即提交偏移量+1。 如果我们的支付服务读取同一主题,那么它会使用单独的组来读取,因此偏移量不会相交。
活动要求:
- 数据完整性。 我希望该事件有足够的数据以便可以对其进行处理。
- 正直。 我们委托事件总线验证事件是否一致并且它可以处理它。
- 顺序很重要。 在回归的情况下,我们被迫与历史合作。 对于通知,顺序并不重要,如果它们是同类通知,则无论哪个订单先到达,电子邮件都将是相同的。 在退款的情况下,有一个明确的流程;如果我们更改订单,则会出现异常,退款将不会被创建或处理 - 我们最终会处于不同的状态。
- 一致性。 我们有一个商店,现在我们创建事件而不是 API。 我们需要一种方法来快速、廉价地将有关新事件和现有事件更改的信息传输到我们的服务。 这是通过单独的 git 存储库和代码生成器中的通用规范来实现的。 因此,不同服务中的客户端和服务器是协调的。
卡夫卡在拉莫达
我们安装了三个 Kafka:
- 日志;
- 研发;
- 事件总线。
今天我们只讨论最后一点。 在 events-bus,我们没有非常大的安装 - 3 个代理(服务器)和只有 27 个主题。 通常,一个主题就是一个过程。 但这是一个微妙的点,我们现在就谈谈它。
上面是rps图。 退款过程用绿松石线标记(是的,X轴上的那条),粉红色线是内容更新过程。
Lamoda目录包含数百万种产品,并且数据一直在更新。 有些系列已经过时了,新的系列被发布来取代它们,并且新的型号不断出现在目录中。 我们试图预测客户明天会感兴趣什么,因此我们不断购买新东西,拍摄它们并更新展示柜。
粉色峰值是产品更新,即产品的变化。 看得出来,小伙子们拍照、拍照、然后再拍照! — 加载了一组事件。
Lamoda 活动用例
我们使用构建的架构进行以下操作:
- 退货状态跟踪:来自所有相关系统的号召性用语和状态跟踪。 付款、状态、财务化、通知。 在这里,我们测试了该方法,制作了工具,收集了所有错误,编写了文档并告诉我们的同事如何使用它。
- 更新产品卡: 配置、元数据、特征。 一个系统读取(显示),多个系统写入。
- 电子邮件、推送和短信:订单已取货、订单已到、已接受退货等等,有很多。
- 库存、仓库更新 — 物品的量化更新,只是数字:到达仓库、退货。 所有与预订货物相关的系统都必须使用最新数据运行。 目前,库存更新系统相当复杂;Kafka将简化它。
- 数据分析 (研发部门)、机器学习工具、分析、统计。 我们希望信息透明——Kafka 非常适合这一点。
现在更有趣的部分是关于过去六个月中发生的重大变化和有趣的发现。
设计问题
假设我们想做一件新事情——例如,将整个交付流程转移到 Kafka。 现在,该流程的一部分已在 BOB 的订单处理中实现。 将订单转移到送货服务、移动到中间仓库等背后都有一个状态模型。 有一个完整的整体,甚至两个,加上一堆专用于交付的 API。 他们对交付了解得更多。
这些看似相似的区域,但 BOB 中的订单处理和运输系统具有不同的状态。 例如,某些快递服务不发送中间状态,而仅发送最终状态:“已送达”或“丢失”。 相反,其他人则详细报告了货物的流动情况。 每个人都有自己的验证规则:对于某些人来说,电子邮件是有效的,这意味着它将被处理;对于某些人来说,电子邮件是有效的,这意味着它将被处理; 对于其他人来说是无效的,但是订单仍然会被处理,因为有联系电话号码,有人会说这样的订单根本不会被处理。
数据流
就 Kafka 而言,出现了组织数据流的问题。 这项任务涉及根据几个要点选择策略;让我们逐一讨论一下。
在一个主题中还是在不同的主题中?
我们有一个事件规范。 在BOB中我们写下某某订单需要发货,并注明:订单号、组成、一些SKU和条形码等。 当货物到达仓库时,交货将能够接收状态、时间戳和所需的一切。 但是我们希望在 BOB 中接收此数据的更新。 我们有一个从交付接收数据的逆过程。 这是同一个事件吗? 或者这是一个值得拥有自己主题的单独交易所?
最有可能的是,它们将非常相似,并且创建一个主题的诱惑并不是没有根据的,因为一个单独的主题意味着单独的消费者,单独的配置,所有这些的单独生成。 但这不是事实。
新领域还是新事件?
但如果使用相同的事件,则会出现另一个问题。 例如,并非所有交付系统都可以生成 BOB 可以生成的那种 DTO。 我们向他们发送 id,但他们没有保存它,因为他们不需要它,并且从启动事件总线流程的角度来看,该字段是必需的。
如果我们为事件总线引入一条规则,要求该字段是必需的,那么我们就被迫在 BOB 或启动事件处理程序中设置额外的验证规则。 验证开始在整个服务中传播——这不是很方便。
另一个问题是增量开发的诱惑。 我们被告知需要在该事件中添加一些内容,如果我们考虑一下,也许它应该是一个单独的事件。 但在我们的方案中,单独的事件是单独的主题。 一个单独的主题是我上面描述的整个过程。 开发人员很想简单地向 JSON 模式添加另一个字段并重新生成它。
在退款的情况下,我们在半年内到达了活动现场。 我们有一个称为退款更新的元事件,它有一个类型字段描述此更新的实际内容。 因此,我们与验证器进行了“美妙”的切换,验证器告诉我们如何使用这种类型验证此事件。
事件版本控制
要验证 Kafka 中的消息,您可以使用
保证分区的读取顺序
Kafka 内部的主题分为多个分区。 在我们设计实体和交易所时,这并不是很重要,但在决定如何使用和扩展它时,这一点很重要。
通常情况下,您在 Kafka 中编写一个主题。 默认情况下,使用一个分区,本主题中的所有消息都会转到该分区。 因此,消费者会依次读取这些消息。 假设现在我们需要扩展系统,以便两个不同的消费者可以读取消息。 例如,如果您正在发送短信,那么您可以告诉 Kafka 进行额外的分区,Kafka 将开始将消息分成两部分 - 一半在这里,一半在这里。
卡夫卡如何划分它们? 每条消息都有一个正文(我们在其中存储 JSON)和一个密钥。 您可以将哈希函数附加到该键,该函数将确定消息将进入哪个分区。
在我们的退款案例中,这一点很重要,如果我们采用两个分区,那么并行消费者有可能会在第一个事件之前处理第二个事件,并且会出现麻烦。 哈希函数确保具有相同密钥的消息最终位于同一分区中。
事件与命令
这是我们遇到的另一个问题。 Event 是某个事件:我们说某处发生了某事(something_happened),例如,某件商品被取消或发生退款。 如果有人监听这些事件,那么根据“项目取消”,将创建退款实体,并且“退款发生”将写入设置中的某处。
但通常,当您设计事件时,您不想徒劳地编写它们 - 您依赖于有人会阅读它们的事实。 人们很容易不写“something_happened”(item_canceled、refund_refunded),而是写“something_should_be_done”。 例如,物品已准备好退货。
一方面,它表明了如何使用该事件。 另一方面,它听起来不太像一个正常的事件名称。 此外,距离 do_something 命令并不远。 但您不能保证有人阅读此事件; 如果你读了,那么你就读成功了; 如果你成功地阅读了它,那么你就做了某件事,并且某件事成功了。 一旦事件变成 do_something,反馈就变得必要,这就是一个问题。
在 RabbitMQ 的异步交换中,当您读取消息时,转到 http,您会得到一个响应 - 至少消息已收到。 当您向 Kafka 写入数据时,您会向 Kafka 写入一条消息,但您不知道它是如何处理的。
因此,在我们的例子中,我们必须引入一个响应事件并设置监视,以便如果发送了这么多事件,那么在某个时间之后,应该有相同数量的响应事件到达。 如果这没有发生,那么似乎出了什么问题。 例如,如果我们发送“item_ready_to_refund”事件,我们预计将创建退款,资金将退还给客户,并且“money_refunded”事件将发送给我们。 但这是不确定的,所以需要监控。
细微之处
有一个相当明显的问题:如果你按顺序读取一个主题,并且有一些不好的消息,那么消费者就会下降,你就不会再继续下去了。 你需要 停止所有消费者,进一步提交偏移量以继续阅读。
我们知道它,我们指望它,但它还是发生了。 发生这种情况是因为从事件总线的角度来看该事件是有效的,从应用程序验证器的角度来看该事件是有效的,但从 PostgreSQL 的角度来看它是无效的,因为在我们的一个系统中MySQL 带有 UNSIGNED INT,而在新编写的系统中,PostgreSQL 只带有 INT。 他的尺寸有点小,身份证不适合。 Symfony 例外地去世了。 当然,我们捕获了异常,因为我们依赖它,并且要提交此偏移量,但在此之前我们想要增加问题计数器,因为消息处理不成功。 这个项目中的计数器也在数据库中,并且Symfony已经关闭了与数据库的通信,第二个异常杀死了整个进程,没有机会提交偏移量。
该服务搁置了一段时间——幸运的是,对于 Kafka,情况并没有那么糟糕,因为消息仍然存在。 待工作恢复后,即可阅读完毕。 很舒服。
Kafka 能够通过工具设置任意偏移量。 但要做到这一点,您需要停止所有消费者 - 在我们的例子中,准备一个单独的版本,其中不会有消费者,重新部署。 然后在 Kafka 中,您可以通过工具移动偏移量,消息就会通过。
另一个细微差别—— 复制日志与 rdkafka.so - 与我们项目的具体情况有关。 我们使用 PHP,在 PHP 中,通常所有库都通过 rdkafka.so 存储库与 Kafka 通信,然后有某种包装器。 也许这些都是我们个人的困难,但事实证明,仅仅重读我们已经读过的一段内容并不那么容易。 一般来说,存在软件问题。
回到使用分区的细节,它在文档中写得正确 消费者 >= 主题分区。 但我发现这一点的时间比我希望的要晚得多。 如果您想要扩展并拥有两个消费者,则至少需要两个分区。 也就是说,如果您有一个分区,其中积累了 20 万条消息,并且您创建了一个新分区,则消息数量不会很快均衡。 因此,为了拥有两个并行的消费者,你需要处理分区。
监控
我认为我们监控的方式会更加清楚现有方法存在哪些问题。
例如,我们计算数据库中有多少产品最近更改了状态,相应地,根据这些更改应该发生事件,并将此数字发送到我们的监控系统。 然后从 Kafka 中我们得到第二个数字,即实际记录了多少事件。 显然,这两个数字之间的差值应该始终为零。
另外,还需要监控生产者在做什么,events-bus是否收到消息,以及消费者在做什么。 例如,在下面的图表中,退款工具表现良好,但 BOB 显然存在一些问题(蓝色峰值)。
我已经提到过消费者群体的滞后。 粗略地说,这是未读消息的数量。 一般来说,我们的消费者工作速度很快,所以滞后通常是0,但有时也会出现短暂的峰值。 Kafka 可以开箱即用地执行此操作,但您需要设置一定的间隔。
有一个项目
这就是 API 响应的样子。 这是组 bob-live-fifa,分区 returned.update.v1,状态 OK,滞后 0 - 最后一个最终偏移量等等。
监控 Updated_at SLA(卡住) 我已经提到过。 例如,产品已更改为可以退货的状态。 我们安装了 Cron,它表示如果 5 分钟内该对象还没有退款(我们通过支付系统很快返还资金),那么肯定出了问题,这绝对是需要支持的情况。 因此,我们只需采用 Cron,它会读取这些内容,如果它们大于 0,则它会发送警报。
总而言之,在以下情况下使用事件很方便:
- 多个系统需要信息;
- 处理的结果并不重要;
- 事件很少或小。
这篇文章似乎有一个非常具体的主题 - Kafka 上的异步 API,但与此相关,我想立即推荐很多内容。
首先,接下来高负荷++ 我们需要等到 XNUMX 月;XNUMX 月将有圣彼得堡版本,XNUMX 月我们将讨论新西伯利亚的高负载。
其次,报告的作者Sergei Zaika是我们新一届知识管理会议的程序委员会成员知识大会 。 这次会议为期一天,将于26月XNUMX日举行,但其议程非常紧张。
并且会在五月份PHP 俄罗斯 иRIT++ (包括 DevOpsConf) - 您还可以在那里建议您的主题,谈论您的经验并抱怨您的填充锥体。
来源: habr.com