使用Redis实现简单的事件驱动架构 「DDD、事件溯源和一致性哈希」

Apache Kafka 已成为大多数技术栈中的主流组件 。使用 Kafka 的好处包括确保事件中的因果顺序,同时保持并行性,通过在服务器之间快速复制分区来恢复故障,等等 。
然而,运行 Kafka 也面临着一系列挑战 。虽然许多工程团队都希望将 Kafka 添加到他们的堆栈中并与“真正的”工程师一起赢得一席之地,但运营开销构成了强大的进入障碍 。
在这篇文章中,我们将重点介绍如何构建一个看起来像传统单体应用程序但又是松散耦合的事件驱动系统的系统 。为此,我们依赖于从领域驱动设计、事件溯源和一致性哈希等概念中学习 。
 
有序事件
大多数系统关心事件的顺序 。大多数系统中的排序仅限于所考虑的域 。例如,当我们查看帖子到一个线程时,我们关心的是相对于帖子的排序 。当我们查看金融系统时,排序主要限于账户 。大型系统中事件的全局排序很少有用,但可能是相关的 。

使用Redis实现简单的事件驱动架构 「DDD、事件溯源和一致性哈希」

文章插图
 
场景:帖子被添加到一个线程Thread中
假设我们对每个添加的帖子都有相当多的后期处理,这反过来会更新线程的某些属性 。
这创建了一个相当好的场景来说明分区的使用 。
在这种情况下,默认方法是将所有帖子发送到队列中,并让一群工作人员(或消费者)完成工作 。这为我们提供了系统所需的并行性,但在我们与多个消费者打交道的那一刻,顺序就会丢失 。
我们保留顺序的唯一方法是确保我们一次处理一个任务,从而才能反映该线程上发生的事情的真实顺序 。
下一个明显的想法是使用每个线程的专用队列来处理相同的问题,但如果我们知道我们将生成大量线程,那立即感觉像是矫枉过正 。
 
分区
分区只是将我们的排队系统分解为专门的分区 。因此,如果我们从一个天真的估计开始,即8个工人每分钟能够处理1600个事件,那么我们的设计就从16个分区开始 。
你可能需要做更多的工作来确保你的估计是好的,但在这个例子中,我们将以假设它是好的来工作 。我们还为一个分区分配一个worker,因为我们希望每个分区都能始终保持因果排序 。
现在我们需要确保一个特定线程的帖子都被路由到同一个分区 。每个分区都由一个消费者管理,所以我们的排序不会被打乱 。
重要的是要记住,"队列分区 "或 "专用分区 "是一个抽象的结构 。它实际上只是一个队列 。我们使用分区这个术语,因为它使我们很容易与该领域广泛使用的术语保持一致 。
 
一致性哈希
我们将使用一致哈希散列作为一种手段,将属于特定线程的所有帖子路由到同一队列分区(或队列) 。
在我们的例子中,我们将使用Murmurhash和一个由名为uHashRing的库管理这个持续体 。
 
将我们的队列视为一个连续体
现在,如果我们简单地将所有的8个队列放在一个圆圈中,我们会得到这样的结果 。让我们把这个称为连续体,因为第7个队列后面是第一个队列,即第0个队列 。
使用Redis实现简单的事件驱动架构 「DDD、事件溯源和一致性哈希」

文章插图
 
现在,一致性散列允许我们使用threadId将一个给定的任务/工作映射到一个特定的队列 。因此,在这种情况下,我们使用threadId作为分区的关键 。
这里需要注意的一个重要方面是,我们没有把我们的队列称为后处理队列 。它们不是专用队列 。你可以把一个Transaction事务事件扔到这里,并期望相应的消费者(和事件处理程序)来处理它 。
 
事件
在前面的几段话中,我们已经说了很多关于事件的内容,但我们还没有真正定义事件的含义 。
我们的系统会把事件看成是发生在我们系统中的事实 。
事实通常是指以某种方式改变了系统状态的事情(或者是失败的事情) 。
例如,PostCreatedEvent发生在一个新帖子被创建时 。同样地,当帖子被更新时,PostUpdatedEvent也会发生 。
你可以将一个事件映射到你系统中的大多数CRUD操作 。
如果将你的系统设计成领域,你会惊讶地发现一个应用服务所触发的事件的数量 。
一个事件也映射了系统的周围状态 。
让我们设计一个创建帖子的应用服务:
 
 
from typing import Listfrom .services.base import ServiceBasefrom sqlalchemy.session import Sessionclass PostService(ServiceBase)def __init__(self, thread_id: UUID, params:PostCreateAPIParams, db_session: Session):self.thread_id = thread_idself.params = paramsself.user: Union[User, None] = Noneself.post: Union[Post, None] =Noneself.db_session = db_sessionself.errors: List[str] = []self.error_code: Union[str, None] = Noneasync def __call__(self)return await self.invoke()async def invoke(self):await self.find_thread()await self.verify_author()await self.create_post()await self.build_response_dao()await self.trigger_events()return selfasync def find_thread(self):# truncated for brevitypassasync def trigger_events(self):user_dao: UserDAO = UserDAO.from_orm(self.user) if self.user else Nonepost_dao: PostDAO = PostDAO.from_orm(self.post) if self.post else Nonethread_dao: ThreadDAO = ThreadDAO.from_orm(self.thread) if self.thread else Noneif await self.has_errors:event_dao = PostCreatedEventDAO(user=user_dao,thread=thread_dao,post=post_dao,params=self.params,errors=self.errors,error_code= self.error_code)else:event_dao = PostCreationFailedEventDAO(user=user_dao,thread=thread_dao,params=self.params,post=post_daoerrors=self.errors,error_code= self.error_code)partition_key = (str(self.thread.id) if self.thread else "PostCreationFailedEvent")await SystemEventService.trigger(partition_key=partition_key, event_dao=event_dao, db_session=self.db_session)return self


推荐阅读