分享一个CQRS/ES架构中基于写文件的EventStore的设计思路
最近打算用C#实现一个基于文件的EventStore。
什么是EventStore
关于什么是EventStore,如果还不清楚的朋友可以去了解下CQRS/Event Sourcing这种架构,我博客中也有大量介绍。EventStore是在Event Sourcing(下面简称ES)模式中,用于存储事件用的。从DDD的角度来说,每个聚合根在自己的状态发生变化时都会产生一个或多个领域事件,我们需要把这些事件持久化起来。然后当我们需要恢复聚合根的最新状态到内存时,可以通过ES这种技术,从EventStore获取该聚合根的所有事件,然后重演这些事件,就能将该聚合根恢复到最新状态了。这种技术和MySQL的Redo日志以及Redis的AOF日志或者leveldb的WAL日志的原理是类似的。但是区别是,redo/AOF/WAL日志是Command Sourcing,而我们这里说的是Event Sourcing。关于这两个概念的区别,我不多展开了,有兴趣的朋友可以去了解下。
为什么要写一个EventStore
目前ENode使用的EventStore,是基于关系型数据库SqlServer的。虽然功能上完全满足要求,但是性能上和数据容量上,离我的预期还有一些距离。比如:
- 关于性能,虽然可以通过SqlBulkCopy方法,实现较大的写入吞吐,但是我对EventStore的要求是,需要支持两个唯一索引:1)聚合根ID+事件版本号唯一;2)聚合根ID+命令ID唯一;当添加这两个唯一索引后,会很大影响SqlBulkCopy写入数据的性能;而且SqlBulkCopy只有SqlServer才有,其他数据库如MySQL没有,这样也无形之中限制了ENode的使用场景;
- 关于使用场景,DB是基于SQL的,他不是简单的帮我们保存数据,每次写入数据都要解析SQL,执行SQL,写入RedoLOG,等;另外,DB还要支持修改数据、通过SQL查询数据等场景。所以,这就要求DB内部在设计存储结构时,要兼顾各种场景。而我们现在要实现的EventStore,针对的场景比较简单:1)追求高吞吐的写入,没有修改和删除;2)查询非常少,不需要支持复杂的关系型查询,只需要能支持查询某个聚合根的所有事件即可;所以,针对这种特定的使用场景,如果有针对性的实现一个EventStore,我相信性能上可以有更大的提升空间;
- 关于数据量,一个EventStore可能需要存储大量的事件,百亿或千亿级别。如果采用DB,那我们只能进行分库分表,因为单表能存储的记录数是有限的,比如1000W,超过这个数量,对写入性能也会有一定的影响。假设我们现在要存储100亿事件记录,单表存储1000W,那就需要1000个表,如果单个物理库中分100个表,那就需要10个物理库;如果将来数据量再增加,则需要进一步扩容,那就需要牵涉到数据库的数据迁移(全量同步、增量同步)这种麻烦的事情。而如果是基于文件版本的EventStore,由于没有表的概念了,所以单机只要硬盘够大,就能存储非常多的数据。并且,最重要的,性能不会因为数据量的增加而下降。当然,EventStore也同样需要支持扩容,但是由于EventStore中的数据只会Append写入,不会修改,也不会删除,所以扩容方案相对于DB来说,要容易做很多。
- 那为何不使用NoSQL?NoSQL一般都是为大数据、可伸缩、高性能而设计的。因为通常NoSQL不支持上面第一点中所说的二级索引,当然一些文档型数据库如MongoDB是支持的,但是对我来说是一个黑盒,我无法驾驭,也没有使用经验,所以没有考虑。
- 从长远来看,如果能够自己根据自己的场景实现一个有针对性的EventStore,那未来如果出现性能瓶颈的问题,自己就有足够的能力去解决。另外,对自己的技术能力的提高也是一个很大的锻炼机会。而且这个做好了,说不定又是自己的一个很好的作品,呵呵。所以,为何不尝试一下呢?
EventStore的设计目标
- 要求高性能顺序写入事件;
- 要求严格判断聚合根的事件是否按版本号顺序递增写入;
- 支持命令ID的唯一性判断;
- 支持大量事件的存储;
- 支持按照聚合根ID查询该聚合根的所有事件;
- 支持动态扩容;
- 高可用(HA),需要支持集群和主备,二期再做;
EventStore核心问题描述、问题分析、设计思路
核心问题描述
一个EventStore需要解决的核心问题有两点:1)持久化事件;2)持久化事件之前判断事件版本号是否合法、事件对应的命令是否重复。一个事件包含的信息如下:
- 聚合根ID
- 事件版本号
- 命令ID
- 事件内容
- 事件发生时间
为什么是这些信息?
本文所提到的事件是CQRS架构中,由C端的某个命令操作某个聚合根后,导致该聚合根的状态发生变化,然后每次变化都会产生一个对应的事件。所以,针对聚合根的每个事件,我们关注的信息就是:哪个命令操作哪个聚合根,产生了什么版本号的一个事件,事件的内容和产生的时间分别是什么。
事件的版本号是什么意思?
由于一个聚合根在生命周期内经常会被修改,也就是说经常会有命令去修改聚合根的状态,而每次状态的变化都会产生一个对应的事件,也就是说一个聚合根在生命周期内会产生多个事件。聚合根是领域驱动设计(DDD)中的一个概念,聚合根是一个具有全局唯一ID的实体,具有独立的生命周期,是数据强一致性的最小边界。为了保证聚合根内的数据的强一致性,针对单个聚合根的任何修改都必须是线性的,因为只有线性的操作,才能保证当前的操作所基于的聚合根的状态是最新的,这样才能保证聚合根内数据的完整性,总是满足业务规则的不变性。关于线性操作这点,就像对DB的一张表中的某一条记录的修改也必须是线性的一样,数据库中的同一条记录不可能同时被两个线程同时修改。所以,分析到这里,我们知道同一个聚合根的多个事件的产生必定是有先后顺序的。那如何保证这个先后顺序呢?答案是,在聚合根上设计一个版本号,通过版本号的顺序递增来保证对同一个聚合根的修改也总是线性依次的。这个思路其实就是一种乐观并发控制的思路。聚合根的第一个事件的版本号为1,第二个事件的版本号为2,第N个事件的版本号为N。当第N个事件产生时,它所基于的聚合根的状态必须是N-1。当某个版本号为N的事件尝试持久化到EventStore时,如果EventStore中已经存在了一个版本号为N的事件,则认为出现并发冲突,需要告诉上层应用当前事件持久化遇到并发冲突了,然后上层应用需要获取该聚合根的最新状态,然后再重试当前命令,然后再产生新的版本号的事件,再持久化到EventStore。
希望能自动检测命令是否重复处理
CQRS架构,任何聚合根的修改都是通过命令来完成的。命令就是一个DTO,当我们要修改一个聚合根的状态时,就发送一个命令到分布式MQ即可,然后MQ的消费者处理该命令。但是大家都知道任何分布式MQ一般都只能做到至少投递一次(At Least Once)的消息投递语义。也就是说,一个命令可能会被消费者重复处理。在有些情况下,某个聚合根如果重复处理某个命令,会导致聚合根的最终状态不正确,比如重复扣款会导致账号余额不正确。所以,我们希望在框架层面能支持命令的重复处理的检测。那最理想的检测位置在哪里呢?如果是传统的DB,我们会在数据库层面通过建立唯一索引保证命令绝对不会重复执行。那对应到我们的EventStore,自然也应该在EventStore内部检测。
核心问题分析
通过上面的问题描述,我们知道,其实一个EventStore需要解决的问题就两点:1)以文件的形式持久化事件;2)持久化之前判断事件的版本号是否冲突、事件的命令是否重复。
关于第一点,自然是通过顺序写文件来实现,机械硬盘在顺序写文件的情况下,性能也是非常高的。写文件的思路非常简单,我们可以固定单个文件的大小,比如512MB。然后先写第一个文件,写满后新建一个文件,再写第二个,后面以此类推。
关于第二点,本质上是两个索引的需求:a. 聚合根ID+事件版本号唯一(当然,这里不仅要保证唯一,还要判断是否是连续递增);b. 聚合根ID + 命令ID唯一,即针对同一个聚合根的命令不能重复处理;那如何实现这两个索引的需求呢?第一个索引的实现成本相对较低,我们只需要在内存维护每个聚合根的当前版本号,然后当一个事件过来时,判断事件的版本号是否是当前版本号的下一个版本号即可,如果不是,则认为版本号非法;第二个索引的事件成本比较高,我们必须维护每个聚合根的所有产生的事件对应的命令的ID,然后在某个事件过来时,判断该事件对应的命令ID是否和已经产生的任何一个事件的命令ID重复,如果有,则认为出现重复。所以,归根结底,当需要持久化某个聚合根的事件时,我们需要加载该聚合根的所有已产生的事件的版本号以及事件对应的命令ID到内存,然后在内存进行判断,从而检查当前事件是否满足这两个索引需求。
好了,上面是基本的也是最直接的解决问题的思路了。但是我们不难发现,要实现上面这两个问题并不容易。因为:首先我们的机器的内存大小是有限的,也就是说,无法把所有的聚合根的事件的索引信息都放在内存。那么当某个聚合根的事件要持久化时,发现内存中并无这个聚合根的事件索引时,必然要从磁盘中加载该聚合根的事件索引。但问题是,我们的事件由于为了追求高性能的写入到文件,总是只是简单的Append追加到最后一个文件的末尾。这样必然导致某个聚合根的事件可能分散在多个文件中,这样就给我们查找这个聚合根的所有相关事件带来了极大的困难。那该如何权衡的去设计这两个需求呢?
我觉得设计是一种权衡,我们总是应该根据我们的实际业务场景去有侧重点的进行设计,优先解决主要问题,然后次要问题尽量去解决。就像leveldb在设计时,也是侧重于写入时非常简单快速,而读取时,可能会比较迂回曲折。EventStore,是非常典型的高频写入但很少读取的系统。但写入时需要保证上述的两个索引需求,所以,应该说这个写入的要求比leveldb的写入要求还要高一些。那我们该如何去权衡呢?
EventStore核心设计思路
- 在内存中维护每个聚合根的版本索引eventVersion,eventVersion中维护了当前聚合根的所有的版本、每个版本对应的cmdId,以及每个版本的事件在event文件中的物理位置;当一个事件过来时,通过这个eventVersion来判断version,cmdId是否合法(version必须是currentVersion+1,cmdId必须唯一);
- 当写入一个事件时,只写入一个文件,event.file文件;假设一个文件的大小为512MB,一个事件的大小为1KB,则一个文件大概存储52W个事件;
- 一个event.file文件写满后:
- 完成当前event.file文件,然后新建一个新的event.file文件,接下来的事件写入新的event.file文件;
- 启动一个后台线程,在内存中对当前完成的event.file文件中的event按照聚合根ID和事件版本号进行排序;
- 排序完成后,我们就知道了该文件中的事件涉及到哪些聚合根,他们的顺序,以及最大最小聚合根ID分别是什么;
- 新建一个和event.file文件一样大小的临时文件;
- 在临时文件的header中记录当前event.file已排序过;
- 在临时文件的数据区域将排好序的事件顺序写入文件;
- 临时文件写入完成后,将临时文件替换当前已完成的event.file文件;
- 为event.file文件新建一个对应的事件索引文件eventIndex.file;
- 将event.file文件中的最大和最小聚合根ID写入到eventIndex.file索引文件的header;每个event.file的最大最小的聚合根ID的关系,会在EventStore启动时自动加载并缓存到内存中,这样可以方便我们快速知道某个聚合根在某个event.file中是否存在事件,因为直接在内存中判断即可。这个缓存我暂时命名为aggregateIdRangeCache吧,以便下面更方便的进一步说明如何使用它。
- 将event.file文件中的每个聚合根的每个事件的索引信息写入eventIndex.file文件,事件索引信息包括:聚合根ID+事件版本号+事件的命令ID+事件在event.file文件中的物理位置这4个信息;有了这些索引信息,我们就可以只需要访问事件索引文件就能获取某个聚合根的所有版本信息(就是上面说的eventVersion)了;
- 但仅仅在事件索引文件中记录最大最小聚合根ID以及每个事件的索引信息还不是不够的。原因是,当我们要查找某个聚合根的所有版本信息时,虽然可以先根据内存中缓存的每个event.file文件的最大最小聚合根ID快速定位该聚合根在哪些event.file中存在事件(也就是明确了在哪些对应的事件索引文件中存在版本信息),但是当我们要从这些事件索引文件中找出该聚合根的事件索引到底具体在文件的哪个位置时,只能从文件的起始位置顺序扫描文件才能知道,这样的顺序扫描无疑是不高效的。假设一个event.file文件的大小固定为512MB,一个事件的大小为1KB,则一个event.file文件大概存储52W个事件,每个事件索引的大小大概为:24 + 4 + 24 + 8 = 60个字节。所以,这52W个事件的索引信息大概占用30MB,也就是最终一个事件索引文件的大小大概为30MB多一点。当我们要获取某个聚合根的所有版本信息时,如果每次访问某个事件索引文件时,总是要顺序扫描30MB的文件数据,那无疑效率不高。所以,我还需要进一步想办法优化,因为事件索引文件里的事件索引信息都是按照聚合根ID和事件版本号排序的,假设现在有52W个事件索引,则我们可以将这52W个事件索引记录均等切分为100个点,然后把每个点对应的事件索引的聚合根ID都记录到事件索引文件的header中,一个聚合根ID的长度为24个字节,则100个也就2.4KB左右。这样一来,当我们想要知道某个聚合根的事件索引大概在事件索引文件的哪个位置时,我们可以先通过访问header里的信息,快速知道应该从哪个位置去扫描。这样一来,本来对于一个事件索引文件我们要扫描30MB的数据,现在变为只需要扫描百分之一的数据,即300KB,这样扫描的速度就快很多了。这一段写的有点啰嗦,但一切都是为了尽量详细的描述我的设计思路,不知道各位看官是否看懂了。
- 除了记录记录最大最小聚合根ID以及记录100个等分的切割点外,还有一点可以优化来提高获取聚合根的版本信息的性能,就是:如果内存足够,当某个eventIndex.file被读取一次后,EventStore可以自动将这个eventIndex.file文件缓存到非托管内存中;这样下次就可以直接在非托管内存访问这个eventIndex.file了,减少了磁盘IO的读取;
- 因为内存大小有限,所以eventVersion不可能全部缓存在内存;所以,当某个聚合根的eventVersion不在内存中时,需要从磁盘加载。加载的思路是:扫描aggregateIdRangeCache,快速找出该聚合根的事件在哪些event.file文件中存在;然后通过上面提到的查找算法快速查找这些event.file文件对应的eventIndex.file文件,这样就能快速获取该聚合根的eventVersion信息了;
- 另外,EventStore启动时,最好需要预加载一些热门聚合根的eventVersion信息到缓存。那该预加载哪些聚合根呢?我们可以在内存中维护一个固定大小(N)的环形数组,环形数组中维护了最近修改的聚合根的ID;当某个聚合根有事件产生,则将该聚合根ID的hashcode取摸N得到环形数组的下标,然后将该聚合根ID放入该下标;定时将该环形数组中的聚合根ID dump到文件preloadAggregateId.file进行存储;这样当EventStore启动时,就可以从preloadAggregateId.file加载指定聚合根的eventVersion;
思路总结:
上面的设计的主要思路是:
- 写入一个事件前先内存中判断是否允许写入,如果允许,则顺序写入event.file文件;
- 对一个已经写入完成的event.file文件,则用一个后台异步线程对文件中的事件按照聚合根ID和事件版本号进行排序,然后将排序后的临时event.file文件替换原event.file文件,同时将排序后得到的事件索引信息写入eventIndex.file文件;
- 写入一个事件时,如果当前聚合根的版本信息不在内存,则需要从相关的eventIndex.file文件加载到内存;
- 由于加载版本信息可能需要访问多个eventIndex.file文件,会有多次读磁盘的IO,对性能影响较大,所以,我们总是应该尽量在内存缓存聚合根的版本信息;
- 整个EventStore的性能瓶颈在于内存中能缓存多少聚合根版本信息,如果能够缓存百分百的聚合根版本信息,且能做到没有GC的问题(尽量避免),那我们就可以做到写入事件非常快速;所以,如何设计一个支持大容量缓存(比如缓存几十个GB的数据),且没有GC问题的高性能缓存服务,就变得很关键了;
- 由于有了事件索引信息以及这么多的缓存机制,所以,当要查询某个聚合根的所有事件,也就非常简单了;
如何解决多线程并发写的时候的CPU占用高的问题?
到这里,我们分析了如何存储数据,如何写入数据,还有如何查询聚合根的所有事件,应该说核心功能的实现思路已经想好了。如果现在是单线程访问EventStore,我相信性能应该不会很低了。但是,实际的情况是N多客户端会同时并发的访问EventStore。这个时候就会导致EventStore服务器会有很多线程要求同时写入事件到数据文件,但是大家知道写文件必须是单线程的,如果是多线程,那也要用锁的机制,保证同一个时刻只能有一个线程在写文件。最简单的办法就是写文件时用一个lock搞定。但是经过测试发现简单的使用lock,在多线程的情况下,会导致CPU很高。因为每个线程在处理当前事件时,由于要写文件或读文件,都是IO操作,所以锁的占用时间比较长,导致很多线程都在阻塞等待。
为了解决这个问题,我做了一些调研,最后决定使用双缓冲队列的技术来解决。大致思路是:
设计两个队列,将要写入的事件先放入队列1,然后当前要真正处理的事件放在队列2。这样就做到了把接收数据和处理数据这两个过程在物理上分离,先快速接收数据并放在队列1,然后处理时把队列1里的数据放入队列2,然后队列2里的数据单线程线性处理。这里的一个关键问题是,如何把队列1里的数据传给队列2呢?是一个个拷贝吗?不是。这种做法太低效。更好的办法是用交换两个队列的引用的方式。具体思路这里我不展开了,大家可以网上找一下双缓冲队列的概念。这个设计我觉得最大的好处是,可以有效的降低多线程写入数据时对锁的占用时间,本来一次锁占用后要直接处理当前事件的,而现在只需要把事件放入队列即可。双缓冲队列可以在很多场景下被使用,我认为,只要是多个消息生产者并发产生消息,然后单个消费者单线程消费消息的场景,都可以使用。而且这个设计还有一个好处,就是我们可以有机会单线程批量处理队列2里的数据,进一步提高处理数据的吞吐能力。
如何缓存大量事件索引信息?
最简单的办法是使用支持并发访问的字典,如ConcurrentDictionary<T,K>,Java中就是ConcurrentHashmap。但是经过测试发现ConcurrentDictionary在key增加到3000多万的时候就会非常慢,所以我自己实现了一个简单的缓存服务,初步测试下来,基本满足要求。具体的设计思路本文先不介绍了,总之我们希望实现一个进程内的,支持缓存大量key/value的一个字典,支持并发操作,不要因为内存占用越多而导致缓存能力的下降,尽量不要有GC的问题,能满足这些需求就OK。
如何扩容?
我们再来看一下最后一个我认为比较重要的问题,就是如何扩容。
虽然我们单台EventStore机器只要硬盘够大,就可以存储相当多的事件。但是硬盘再大也有上限,所以扩容的需求总是有的。所以如何扩容(将数据迁移到其他服务器上)呢?通过上面的设计我们了解到,EventStore中最核心的文件就是event.file,其余文件都可以通过event.file文件来生成。所以,我们扩容时只需要迁移event.file文件即可。
那如何扩容呢?假设现在有4台EventStore机器,要扩容到8台。
有两个办法:
- 土豪的做法:准备8台全新的机器,然后把原来4台机器的全部数据分散到新准备的8台机器上,然后再把老机器上的数据全部删除;
- 屌丝的做法:准备4台全新的机器,然后把原来4台机器的一半数据分散到新准备的4台机器上,然后再把老机器上的那一半数据删除;
对比之下,可以很容易发现土豪的做法比较简单,因为只需要考虑如何迁移数据到新机器即可,不需要考虑迁移后把已经迁移过去的数据还要删除。大体的思路是:
- 采用拉的方式,新的8台目标机器都在向老的4台源机器拖事件数据;目标机器记录当前拖到哪里了,以便如果遇到意外中断停止后,下次重启能继续从该位置继续拖;
- 每台源机器都扫描所有的事件数据文件,一个个事件进行扫描,扫描的起始位置由当前要拖数据的目标机器给出;
- 每台目标机器该拖哪些事件数据?预先在源机器上配置好这次扩容的目标机器的所有唯一标识,如IP;然后当某一台目标机器过来拖数据时,告知自己的机器的IP。然后源机器根据IP就能知道该目标机器在所有目标机器中排第几,然后源机器就能知道应该把哪些事件数据同步给该目标机器了。举个例子:假设当前目标机器的IP在所有IP中排名第3,则针对每个事件,获取事件的聚合根ID,然后将聚合根ID hashcode取摸8,如果余数为3,则认为该事件需要同步给该目标机器,否则就跳过该事件;通过这样的思路,我们可以保证同一个聚合根的所有事件都最终同步到了同一台新的目标机器。只要我们的聚合根ID够均匀,那最终一定是均匀的把所有聚合根的事件均匀的同步到目标机器上。
- 当目标机器上同步完整了一个event.file后,就自动异步生成其对应的eventIndex.file文件;
扩容过程的数据同步迁移的思路差不多了。但是扩容过程不仅仅只有数据迁移,还有客户端路由切换等。那如客户端何动态切换路由信息呢?或者说如何做到不停机动态扩容呢?呵呵。这个其实是一个外围的技术。只要数据迁移的速度跟得上数据写入的速度,然后再配合动态推送新的路由配置信息到所有的客户端。最终就能实现动态库容了。这个问题我这里先不深入了,搞过数据库动态扩容的朋友应该都了解原理。无非就是一个全量数据迁移、增量数据迁移、数据校验、短暂停止写服务,切换路由配置信息这几个关键的步骤。我上面介绍的是最核心的数据迁移的思路。
结束语
本文介绍了我之前一直想做的一个基于文件版本的EventStore的关键设计思路,希望通过这篇文章把自己的思路系统地整理出来。一方面通过写文章可以进一步确信自己的思路是否OK,因为如果你文章写不出来,其实思路一定是哪里有问题,写文章的过程就是大脑整理思绪的过程。所以,写文章也是检查自己设计的一种好方法。另一方面,也可以通过自己的原创分享,希望和大家交流,希望大家能给我一些意见或建议。这样也许可以在我动手写代码前能及时纠正一些设计上的错误。最后再补充一点,语言不重要,重要的是架构设计、数据结构,以及算法。谁说C#语言做不出好东西呢?呵呵。