ENode 2.0 – 全体架构介绍

哪些有限援救2个IDomain伊夫nt只会被2个I伊芙ntHandler处理3次

这一条的原由,我想我们都能精通。比如1个伊芙nt
Handler是立异读库的,或者我们会履行一条有副作用的SQL,比如update product
set price = price + 1 where id =
一千。那条SQL即使被再一次执行3遍,那price字段的值就多了1了,那不是大家所期待的结果。所以框架需求有中央的职分可以基本制止那种气象的发生。这怎么得以实现啊?思路也是用一张表,记录被执行的Domain伊芙nt的ID以及当前处理那一个Domain伊夫nt的伊夫nt
Handler的门类的多个Code,对那多个一起字段做一道唯一索引。每一趟当1个伊夫nt
Handler要处理二个Domain
伊芙nt时,先判断是不是已经处理过,假设没处理过,则处理,处理过之后把被处理的Domain
伊夫nt ID和伊夫ntHandler Type
Code添加到这么些表里即可。那借使添加的时候失利了啊?因为有大概也会有现身的情景,导致伊芙nt
Handler重复处理同2个Domain
伊芙nt。那种情状框架就不做严格的处理了,因为框架本身也不能实现。因为框架式无法知晓伊夫nt
Handler里面到底在做什么样的。有或然是出殡和埋葬邮件,也有可能是记录日志,也或许是立异读取(Read
DB)。所以,最根本的,照旧供给伊芙nt
Handler内部,也正是开发那温馨索要考虑幂等的落到实处。当然框架会提须求开发者要求的新闻来救助他们成就严峻幂等控制。比如框架会提供当前Domain
伊芙nt 的版本号给伊夫nt Handler,那样伊夫nt Handler里就能在Update
SQL的Where部分把这几个Version带上,从而达成乐观并发控制。比如上面包车型大巴代码示例:

public void Handle(IEventContext context, SectionNameChangedEvent evnt)
{
    TryUpdateRecord(connection =>
    {
        return connection.Update(
            new
            {
                Name = evnt.Name,
                UpdatedOn = evnt.Timestamp,
                Version = evnt.Version
            },
            new
            {
                Id = evnt.AggregateRootId,
                Version = evnt.Version - 1
            }, Constants.SectionTable);
    });
}

地方的代码中,当咱们立异三个论坛的版块时,大家得以在sql的where条件中,用version
= evnt.Verion –
1这样的标准化。从而保障当前你要拍卖的风云一定是上叁遍已处理的风云的版本号的下三个版本号,相当于承接保险了Query
Side的换代的逐条严谨和事件发生的依次一致。那样固然框架在有漏网之鱼的时候,伊夫nt
Handler内部也能做严俊的顺序控制。当然若是你的伊夫nt
Handler是发送邮件,那笔者还真不知道该怎么特别入保障证这一个严格的逐一恐怕出现争持了,呵呵。有趣味的爱人能够和自笔者调换。

关于重复的command的幂等处理和聚合根大概存在的产出争辨的判定

除此以外一些很重庆大学的是,因为大家的command是会发送到分布式新闻队列,然后队列中的command音信会被取出来执行;我们领略,大家很难保险二个音信不会被再度执行,也正是说,一个command可能会再度执行。由此,大家的选拔要协助对command的密等处理。而对于使用enode框架的施用,因为全数command
side的数据持久化正是持久化domain event,程序员不必关注domain
event的持久化进度。所以enode很有须要能放费用持对command的再度处理的论断。那么如何做吗?笔者认为最可靠的做法是,在持久化domain
event的时候就能相对可相信的检查和测试出来有些command是还是不是被另行执行了。这很自然就悟出将被持久化的domain
event和发生他的对应command关联起来。所以自身安插了之类的构造,用来代表1个command在操作聚合根后所发出的领域事件的音信。

/// <summary>The commandId which generate this event stream.
/// </summary>
public string CommitId { get; private set; }
/// <summary>The aggregate root id.
/// </summary>
public string AggregateRootId { get; private set; }
/// <summary>The aggregate root type code.
/// </summary>
public int AggregateRootTypeCode { get; private set; }
/// <summary>The version of the event stream.
/// </summary>
public int Version { get; private set; }
/// <summary>The occurred time of the event stream.
/// </summary>
public DateTime Timestamp { get; private set; }
/// <summary>The domain events of the event stream.
/// </summary>
public IEnumerable<IDomainEvent> Events { get; private set; }
  • CommitId:正是现阶段的CommandId;
  • AggregateRootId:当前被操作的聚合根的全局唯一ID;
  • AggregateRootTypeCode:表示聚合根的品类的一个code,通过该code大家能够知晓当前记下是哪位品种的聚合根的;
  • Version:多个本子号,表示聚合根爆发领域事件后的新本子号,是发闹事件前的版本号+1;也便是说,聚合根的本子是历次被改动壹遍,那Version就加1;
  • Timestamp:八个时光戳,用于记录发生domain event时的时光;
  • 伊芙nts:表示近日command操作聚合根后所发生的圈子事件,三遍操作能够爆发几个领域事件;

对此地点的结构体,大家得以兑现几个关键的效益:1)为AggregateRootId和Version那五个字段建立唯一索引,那样我们就能落到实处判断某些聚合根是不是被出现修改,因为只要有出现修改导致出现争执,那保存到eventstore时,它们的Version肯定是相同的;2)为AggregateRootId和CommitId三个字段建立唯一索引,那样我们就能判定有些command是或不是被重新执行,因为三个command被实例化出来后,它所要修改的聚合根ID就不容许再修改了,所以要是该command被再一次执行,那最后产生的小圈子事件(上边这一个结构体)最终被持久化到eventstore时就会背离这些唯一索引,从而框架就能明了是或不是有command被另行执行了;

除此以外,上面那几个结构体被保存到eventstore时,是以一条记下的不二法门被封存,伊芙nts集合会被系列化为一段二进制;所以,要是大家用关系型数据库来保存,这便是唯有一条insert语句即可,那样就贯彻了四个聚合根的三遍修改的事务持久化。然后因为上三个目录的存在,我们就能在保存时判断是或不是有出现争执或command是还是不是被再一次执行。

前言

ENode是2个基于音信的架构,使用ENode开发的种类,每一个环节都以拍卖新闻,处理完后发出新的音信。本篇小说小编想详细分析一下ENode框架中间是怎样兑现整个新闻处理流程的。为了更好的精晓作者背后的流水生产线的讲述,作者觉得依然应超越把ENode的架构图贴出来,好让大家在看前边的分析时,能够相比较这些架构图实行思想和掌握。

ENode框架简介

  1. 框架名称:ENode
  2. 框架特色:DDD+CQ奥迪Q7S + EDA + 伊夫nt Sourcing + In Memory
  3. 规划目的:让程序员只关怀工作代码、高品质、分布式、可水平增添
  4. 开源地址:https://github.com/tangxuehua/enode
  5. 基于enode落成的1个形成案例,1个论坛:https://github.com/tangxuehua/forum
  6. nuget包Id:ENode
  7. 三个独立的分布式音信队列EQueue,能够为ENode提供Command,Domain
    伊夫nt的发布和订阅:https://github.com/tangxuehua/equeue

在Saga Process Manager中发生的ICommand怎样能够扶助重试发送而不造成操作的重复

归根结底到最后一点了,好累。坚韧不拔就是常胜!假使现在的Saga 伊夫nt
Handler里是会爆发Command,那框架在发送这几个Command时,要力保不可能重复执行。如何是好呢?即使在Saga
Event
Handler里产生的Command的Id每趟都以新new出来的二个唯一的值,那框架就无法知道那几个Command是还是不是和后面包车型地铁再次了,框架会认为那是多少个不等的Command。那里其实有三种做法:

  1. 框架先把Saga 伊夫nt
    Handler中产生的Command保存起来,然后逐步发送到EQueue。发送成功三个,就删除八个。直到一切发送完停止。那种做法是实用的,因为那样一来,大家要发送的Command就延续从存款和储蓄那一个Command的地点去拿了,所以不会出现每一遍要发送的同二个Command的ID都是见仁见智的情状。然则那种规划性子不是太好,因为要发送的Command须要求先被保存起来,然后再发送,发送完未来还要删除。品质上一定不会太高。

2.次之种做法是,Command不存款和储蓄起来,而是直接把Saga 伊芙nt
Handler中发出的Command拿去发送。但那种规划供给:框架对那种要发送的Command的ID总是遵照某些特定的规则来发生的。那种规则要确定保证发生的CommandId首先是要唯一的,其次是明确的。上面大家看一下底下的代码:

private string BuildCommandId(ICommand command, IDomainEvent evnt, int eventHandlerTypeCode)
{
    var key = command.GetKey();
    var commandKey = key == null ? string.Empty : key.ToString();
    var commandTypeCode = _commandTypeCodeProvider.GetTypeCode(command.GetType());
    return string.Format("{0}{1}{2}{3}", evnt.Id, commandKey, eventHandlerTypeCode, commandTypeCode);
}

地方那些代码是多少个函数,用来构建要被发送的Command的Id的,大家得以看看ID是由Command的三个key+要被发送的Command的门类的code+当前被拍卖的Domain
伊芙nt的ID,以及当前的Saga 伊夫nt
Handler的项指标code那八个音信整合。对于同八个Domain 伊芙nt被同多少个伊夫nt
Handler处理,然后一旦产生的Command的花色也是平等的,那大家基本能够通过那多个音信创设一个唯一的CommandId了,不过有时那样还不够,因为我们可能在一个伊夫nt
Handler里创设多个类型完全一样的Command,但是他们修改的聚合根的ID区别。所以,笔者下面才有3个commandKey的组成都部队分。那几个key暗中同意正是以此Command要修改的聚合根的ID。那样,通过如此6个消息的整合,大家能够保障不管有个别Domain
伊芙nt被某些Saga 伊夫nt
Handler处理多少次,最终发生的Command的ID总是明确的,不变的。当然上边包车型地铁commandKey有时只是考虑聚合根ID也许还不够,即便自己还没遇到过那种状态,呵呵。所以自个儿框架设计上,就允许开发者能再一次GetKey方法,开发者要求知道几时需求重写那么些艺术。看了此地的认证应该就知晓了!

好了,大致了,该睡觉了!

至于Domain 伊芙nt大数据量的设想

在统一筹划event store时,作者设想了不少。最后认为event
store要缓解的最大的两个难点是持久化性能和可水平增加性。首先,因为每一回command
handler在处理完叁个聚合根后,都会把发生的园地事件持久化到event
store,没持久化达成则无法认为该command已处理完,所以持久化的属性对拍卖command的吞吐量至关心注重要。其它一些就是可水平扩张性,因为event
store里保存的都以domain
event,而enode又是为着兑现高质量为对象的,所以event
store里的数据一定会卓殊多,比如1s中要持久化1K个domain
event,那一天就会有8600W条记下要记录,一天就真么多,那1年就越来越多了,所以用单点存款和储蓄全数的domain
event显示不可信赖了。所以大家的event
store必须求帮忙水平增添。比如大家能够陈设玖拾玖个分区,那每一种分区一天只需求保留86W条记下,一年也只须要保留3亿多条记下即可。往日小编很追求单个存储节点的高品质,所以已经想过要用leveldb,stsdb,甚至redis那种高质量的基于key,value的nosql存款和储蓄。但后来发现那种nosql存款和储蓄固然质量很高,但因为只是key,value的仓库储存结构,所以无法扶助二级索引,那样就不能够落到实处地点第叁点中提到的command的幂等处理和聚合根并发争论的检查和测试。另三个首要的缘由是,event
store中的数据大家有时候是要被询问的。比近来后有些command境遇的出现冲突,那框架需求活动重试,可是重试在此之前必要先更新redis缓存,正是把eventstore里的风行的聚合根更新到redis缓存里,那样command在重试时才能获得最新版本的聚合根,那样重试才能不负众望。那怎么从eventstore里拿最新的聚合根呢?只好依照聚合根ID从eventstore里查询。而聚合根ID又不是key,value
nosql的key,自然就不能够落到实处这几个须求了;所以,笔者觉着理所当然的主意应该是用关系型数据库来贯彻eventstore。有人说关系型数据库的属性尤其。笔者以为要是关系型数据库帮忙水平增加,约等于将domain
event sharding(分片)到不一样的分库分表中,那平均到种种库里的domain
event的数量就非常的小了;这样一切eventstore的持久化质量就足以随着分库的数目标加码而线性扩张;比如自个儿现在单个db
insert domain event的质量是1K
tps(mysql合营ssd硬盘完全无压力,呵呵),那拾个库的tps就能完毕1W
tps了。因为我们分库会基于聚合根ID的hash
code来平均散列,这样能有限支撑每一种库中的聚合根的domain
event数量是主导雷同的;从而就能完结全体event
store的持久化质量随着分库的增多而线性增添。所以,有了分库的优势,大数据量和性情都不是题材了。且因为关系型数据库帮助二级索引和唯一索引,那查询domain
event也不是题材了。

Command的幂等处理

上边流程中的第⑩步,Command会被添加到ICommandStore。那里,实际上小编添加到ICommandStore的是八个HandleCommand对象,该对象涵盖当前的Command之外,还有当前被修改的聚合根ID。这样做的理由请看自个儿后边的演说。大家领略ICommandStore会对CommandId作为主键,这样大家就能绝对保险贰个Command不会被重新添加。假设Command添加到ICommandStore成功,那自然最棒了,间接进入持续的步子即可;可是只要出现CommandId重复的时候,大家供给做什么样的拍卖吧?

万一出现重复,则须要依据CommandId(主键),把前边早已持久化过的HandledCommand取出来;然后然后大家从HandledCommand获得被修改的聚合根ID,然后最器重的一步:大家将该聚合根ID以及CommandId作为标准从I伊夫ntStore中查询出多个大概存在的伊芙ntStream。假设存在,就表明这几个Command所爆发的Domain
伊夫nt已经被持久化了,所以我们只要再做贰遍发表事件的操作即可。即调用I伊芙ntPublisher.Publish方法来发布事件到Query
Side。那么为何要公布呢?因为固然事件被持久化了,但并不代表已经成功被披流露去了。因为理论上有恐怕Domain
伊芙nt被持久化成功了,然而在要颁发事件的时候,断电了!所以那种景况下,重启服务器就会冒出那里切磋的动静了。所以我们要求重新Publish事件。

下一场,借使没有依照CommandId和聚合根ID查找到伊芙ntStream呢?那也好办,因为那种场地就表明这几个Command尽管被持久化了,可是它所发生的伊芙ntStream却尚未被持久化到伊夫ntStore,所以我们须要将日前的伊芙ntStream调用I伊夫ntService.Commit方法实行持久化事件。

除此以外,那里其实有一个问题,为何查找伊芙ntStream不可能只是依据CommandId呢?原因是:从技术上来说,我们能够只依照CommandId来查找到1个唯一的伊芙ntStream,但这么设计的话,就供给伊芙ntStore必须求协理通过四个CommandId来全局唯一定位到二个伊夫ntStream了。可是因为考虑到伊芙ntStore的数据量是很是大的,大家未来恐怕会依据聚合根ID做水平拆分(sharding)。那样的话,大家唯有靠CommandId就无法知道到哪个分片下去查找对应的伊夫ntStream了。所以,要是查询时,能同时钦点聚合根ID,那咱们就能自在知道首先到哪些分片下去找伊夫ntStream,然后再依照CommandId就能轻轻松松定位到二个唯一的伊夫ntStream了。

既然说到此处,小编再说一下CommandStore的档次划分的陈设啊,CommandStore的数据量也是可怜大的,因为它会储存全体的Command。不过幸而,大家对此CommandStore只供给依照CommandId去摸索即可,所以大家能够依据CommandId来做Hash取模的点子来水平拆分。那样正是是分片了,大家如果知道了2个加以的CommandId,也能精晓它近期是在哪些分片下的,就很简单找到该Command了。

因此,通过下面的剖析,大家清楚了CommandStore和伊夫ntStore在统一筹划上不仅考虑了怎么存款和储蓄数据,还考虑了前途天数据量时怎样分片,以及怎么样在分片的情事下仍旧能便宜的摸索到大家的多少。

终极,上边还有一种情景没有表明,正是当出现Command添加到CommandStore时意识重复,不过尝试从CommandStore中依照CommandId查询该Command时,发现查不到,天哪!这种情景其实不该出现,要是出现,这表达CommandStore内部有标题了。因为为何添加时说有双重,而查询却差不离来啊?呵呵。那种景况就无法处理了,大家只好记录错误日志,然后开始展览接二连三的排查。

ENode物理布置结构图

澳门美高梅手机网站 1

上海体育地方是enode在实际上项目中本身如今觉得的1个大体布置协会图。

率先客户端浏览器通过互联网最后访问到大家的web服务器集群,当然web服务器前边肯定还有网关和负载均衡器,笔者那边为了优异重点就不画出来了。然后每一种web服务器接受到httprequest后会生成command,然后经过enode框架发送到分布式音讯队列服务器(message
queue
server),近来由本身付出的equeue达成。然后新闻队列服务器上的音讯会被推送到command
process servers,command process server正是进行command
handler、达成domain logic,持久化domain event,以及publish domain
event的服务器。command process server处理完之后,domain
event会由enode框架自动发送到message queue server,然后会被event process
server处理,event process server便是订阅domain event,然后依照domain
event更新query db。对于查询,web server能够直接通过sql查询query db即可。

各个服务器的集群:

  • web server:无状态,能够私自扩充服务器;
  • command process
    server:便是处理业务逻辑的服务器,也是无状态,可以自由扩大服务器;但服务器的多少最棒和command所对应的topic下的queue的多寡保持一致,那点持续在写分布式新闻队列equeue的稿龙时在事无巨细谈吧;
  • redis
    server:正是缓存聚合根的服务器,属于缓存服务器;能够按必要仓库储存的体积来统一筹划必要开多少台redis
    server;方今自我觉着最佳的redis动态扩大容积方法就是pre sharding;
  • event db server:正是储存domain
    event的服务器,依据上的辨析,大家选取的是关系型数据库,比如用mysql;mysql的分库分表技术早已很成熟,后续小说大家再详尽谈论什么分库以及怎么着做多少迁移;
  • event process server:就是订阅domain event,依据domain
    event更新query db的服务器;能够依据必要来布局多少台,和command
    process server类似;那里有几许亟供给先提一下,就是在立异query
    db时,因为老是换代都以针对有些domain event来更新query db的,而domain
    event只象征二个聚合根的改动,所以每便我们创新query
    db时,也只更新该聚合根所在范围的表;大家相对不要去立异超过该聚合根范围的表,否则就会产生并发争辩,导致event
    handler执行破产;那样就会是的cqrs的query
    db同步数据变的一点也不快。对于query side,假使大家以为直接从query
    db查询数据太慢,能够考虑设计查询缓存,也便是不走query
    db来查询数据,而是走缓存。那种缓存就和我们平昔的缓存设计类似了;利用domain
    event,大家自然就有优势可以让缓存极度及时的换代,呵呵。因为只要有domain
    event过来,我们就能一点也不慢更新大家的query side缓存,而query
    db就足以异步更新即可。那样就足以消除query side同步更新数据慢的题材。
  • message queue
    server:就是音信队列服务器,近日equeue还不帮助集群,只接济单机;那个今后有时光会考虑完成master-slave形式,类似于天猫商城的rocketmq一样。

好了,就写那一个呢,后续的再持续小说中补上,呵呵。

Domain 伊芙nt持久化时的产出争论检查和测试和拍卖

地点流程中的第八步,大家关系:假诺遇上伊芙ntStream持久化到I伊芙ntStore时境遇版本号重复(同1个聚合根ID+聚合根的Version相同,则以为有出现争辩),此时框架须求做差异的逻辑处理。具体是:

先是,大家得以先想想为啥会冒出同2个聚合根会在大概一致时刻发生七个本子号同样的小圈子事件,并持久化到伊夫ntStore。首先,作者先说一下那种景色差不多不汇合世的说辞:ENode中,在ICommandExecutor在拍卖1个Command时,会检讨当前该Command所要修改的聚合根是或不是曾经有至少三个聚合根正在被拍卖,假如有,则会将近期Command排入到这些聚合根所对应的等待队列。约等于说,它一时半刻不会被执行。然后当当前聚合根的前头的Command被实施完了后才会从那个等候队列取出下三个等候的Command举办拍卖。通过如此的安顿性,大家保险了,对1个聚合根的享有Command,不会互相被实施,只会根据顺序被实践。因为每一个ICommandExecutor会在须要的时候,为有个别聚合根自动创制那种等待队列,只要对该聚合根的Command同一时半刻刻进来二个或以上。

那么,假若集群的时候吧?你一台机械的话,通过地点的措施得以保险一个聚合根实例的有所的Command会被各种处理。可是集群的时候,也许二个聚合根会在多台机械被同时处理了。要消除那么些难题的笔触就是对Command依照聚合根ID实行路由了,因为相似若是是修改聚合根的Command总是会含有二个聚合根ID,所以大家得以遵照那一个特点,对被发送的Command遵照聚合根ID进行路由。只要CommandId相同,则总是会被路由到同叁个队列,然后因为二个队列总是只会被一台机械消费,从而大家能保险对同1个聚合根的Command总是会落得一台机械上被处理。那么您或许会说,固然走俏数据吧?比如有个别聚合根突然对他改动的Command或者很是多(扩充了一倍),而有个别则很少,那怎么做吧?没关系,我们还有信息队列的监督平台。当出现某些聚合根的Command突然非凡多的时候,大家得以借助EQueue的Topic的Queue可以随时举办追加的表征来敷衍这么些题材。比如原先这些Topic下唯有四个Queue,那未来增多到九个,然后消费者机器也从4台增添到8台。那样也就是Command的拍卖能力也增添了一倍。从而得以便宜的缓利尿点数据难题。由此,那也是本人想要本人完毕分布式音信队列EQueue的原因啊!有个别场景,假使自身平素不章程完全掌控,会很消沉,直接造成整个架构的严重缺陷,最终导致系统瘫痪,而团结却无能为了。当然你能够说我们可以利用卡夫卡,
罗克etmq那样的高品质分布式队列,确实。不过究竟这种巨大上的行列卓殊复杂,且都以非.NET平台。除了难题,维护起来肯定比自个儿付出的要难保证。当然除非你对它们卓殊掌握且有自信的运营能力。

透过上面的思路达成的,确定保障聚合根的Command总是被顺序线性处理的宏图,对伊夫ntStore有非常的大的含义。因为如此可以让伊夫ntStore不会出现并发争论,从而不会招致无谓的对伊夫ntStore的走访,也得以十分大的下滑伊芙ntStore的压力。

而是怎么时候照旧只怕会现身并发争执呢?因为:

1)当处理Command的某台机器挂了,然后这台机器所花费的Queue里的音讯就会被其它机器接着消费。别的机器大概会从那一个Queue里批量拉取一些Command音讯来耗费。然后此时一旦大家重启了那台有难题的服务器,重启完之后,因为又会开端消费这么些Queue。然后1个最重要的点是,每一回一台机器运维时,会从EQueue的Broker拉取这几个Queue最终七个被消费的消息的职责,也正是Offset,而出于那一个Offset的更新是异步的,比如5s才会更新到EQueue的Broker,所以导致这台重启后的服务器从Broker上拉取到的消费地点其实是有延期的,从而就或者会去消费在那台以前接替你的服务器已经开销过的依然正在消费的Command音讯了。当然那种气象因为条件太苛刻,所以基本不会爆发,即使会产生,一般也不会导致Command的产出执行。可是那终归也是一种大概。实际上那里不仅是某些服务器挂掉后再重启的动静会造成现身冲突,只若是处理Comand的机器的集群中有别的的机器的增添或减弱,由于都会招致Command消息的买主集群重新负载均衡。在那一个负载均衡的历程中,就会导致同叁个Topic下的同一个Queue里的有个别新闻或者会在两台服务器上被消费。原因是Queue的开销地方(offset)的创新不是实时的,而是定时的。所以,大家一般建议,尽量不要在消息很多的时候做消费者集群内机器的转移,而是尽大概在没什么信息的时候,比如凌晨4点时,做集群的扩大容积操作。那样能够尽量幸免全数恐怕带来的新闻再一次消费或许现身抵触的恐怕。呵呵,那段话或许很两人看的云里雾里,我只可以说到那几个水平了,大概要统统明白,大家还亟需对EQueue的布署性很精晓才行!

2)就算同1个机械内,其实也是有大概出现对同3个聚合根的出现修改,约等于对准同1个聚合根的三个Command被同时施行。原因是:当3个Command所对应的伊芙ntStream在被持久化时出现重复,然后自个儿就会放在3个地面包车型地铁内存队列举行重试,然后重试由于是在另贰个特意的重试线程里,该线程不是健康处理Command的线程。所以若是对该聚合根后续还有Command要被处理,那就有大概会现出一样时刻,三个聚合根被五个Command修改的场馆了。

前些天,我们在回去谈论,假设境遇争辩时,要如何做?那几个上面小编总结关联过,便是急需重试Command。但也不是那样不难的逻辑。大家需求:

a.
先检查当前的伊芙ntStream的Version是还是不是为1,假诺为1,表明有三个创制聚合根的Command被出现执行了。此时大家决不在重试了,因为纵然再重试,这最后产生的伊芙ntStream的版本号也接二连三1,因为借使是首先次创设聚合根,这这一个聚合根所发生的Domain伊夫nt的版本总是1。所以那种气象下,我们只须求一贯从伊芙ntStore拿出那几个已经存在的伊芙ntStream,然后通过I伊夫ntPublisher.Publish方法发表该伊夫ntStream即可。为啥要重新发表,上面表达Command的幂等时,也解释了原由,那里是平等的案由。那里也有二个小的点须要注意,正是假如尝试从伊芙ntStore拿出那几个伊芙ntStream时,若是没获得到呢?那一个题材实际上不该出现,原因就像是上面分析Command幂等时一致,为啥会并发拉长时提示存在,但查询时却查不到的事态呢?那种景况正是伊夫ntStore的宏图卓殊了,读写存在非强一致性的状态了。

b.
假使当前的伊芙ntStream的Version大于1,则我们须要先更新内部存款和储蓄器缓存(Redis),然后做Command的重试处理。为什么要先更新缓存呢?因为只要不立异,有可能重试时,获得的聚合根的动静依旧旧的,所以重试后可能造成版本号争辨。那干什么从缓存中获得的聚合根的景观或许如故旧的吧?因为伊芙ntStream已经存在于伊芙ntStore并不意味着这一个伊夫ntStream的修改已经更新到缓存了。因为我们是先持久化到伊芙ntStore,在立异缓存的。完全有大概你还没来得及更新缓存的时候,另三个Command正好须要重试呢!所以,最保障的做法,就是再重试的时候将缓存中的聚合根状态更新到最新值。那怎么革新呢?呵呵,很简单,正是通过事件本源(即伊夫nt
Sourcing技术)了。大家只要从伊夫nt Store获取当前聚合根的有所的伊夫nt
Stream,然后溯源这几个事件,最终就能获得聚合根的最新版本的动静了,然后更新到缓存即可。

最后,假诺急需重试的话,要怎么重试呢?很不难,只要扔到1个地点的遵照内部存款和储蓄器的重试队列即可。小编今天是用BlockingCollection的。

ENode架构图

澳门美高梅手机网站 2

纯熟CQLX570S架构的人来看那图应该就再熟知但是了,enode完结的是一个CQ中华VS架构。基本的定义就不多介绍了,若是大家对上图中的一些概念还不老子@楚,能够看一下本身的博客里的别的相关文章,我应当都有写到。下边首要介绍一下enode
2.0在达成CQPAJEROS架构时的部分差别的地方(由于篇幅的范围,先说三点吧):

ENode框架内部贯彻流程分析

  1. Controller发送ICommand到音信队列(EQueue);
  2. 澳门美高梅手机网站,【从这一步早先次拍卖卖Command】ENode.EQueue中的CommandConsumer接收到该ICommand,先创建三个ICommandContext实例,然后调用ENode中的ICommandExecutor执行当前ICommand并将ICommandContext传递给ICommandExecutor;
  3. ICommandExecutor依据当前ICommand的类别,获取到一个唯一的ICommandHandler,然后调用ICommandHandler的Handle方法处理当下ICommand,调用时传递当前的ICommandContext给ICommandHandler;
  4. ICommandHandler处理完Command后,ICommandExecutor获取当前ICommandContext中新增或改动的聚合根;
  5. 自笔者批评当前ICommandContext中是或不是唯有2个激增或修改的聚合根;假使超越一个,则报错,通过那样的反省来从框架级别保障3个Command3回只好修改二个聚合根;
  6. 假如发现眼下激增或涂改的聚合根为0个,则一向认为日前的ICommand已处理完了,就调用ICommandContext的OnCommandExecuted方法,该措施内部会打招呼EQueue发送CommandResult信息给Controller;然后Controller那边的经过,会有一个CommandResultProcessor接收到这些CommandResult的音信,然后就精通该ICommand的处理结果了;
  7. ICommandExecutor从ICommandContext获得当下唯一修改的聚合根后,取出该聚合根里发出的IDomain伊芙nt。由于贰个聚合根叁回恐怕会毕生八个IDomain伊芙nt,所以大家会营造三个伊夫ntStream对象。那么些指标涵盖了颇具当前聚合根所发出的IDomain伊芙nt。3个伊芙ntStream会包蕴众多主要的信息,蕴涵方今ICommand的ID、聚合根的ID、聚合根的Version(版本号),以及全体的IDomain伊夫nt,等等;
  8. ICommandExecutor将该Command添加到ICommandStore。因为ICommandStore是以CommandId为主键(即Key),之所以一旦CommandId重复,框架就会精晓,然后就会做重新时的逻辑处理,这一点前面再详细分析;
  9. 假使Command成功添加到ICommandStore,则接下去调用I伊夫ntService的Commit方法将方今伊夫ntStream持久化到I伊夫ntStore;
  10. I伊芙nt瑟维斯内部首要做3件事情:1)将伊夫ntStream持久化到I伊芙ntStore;2)持久化成功后调用IMemoryCache更新缓存(缓存可以布署为本土缓存也得以配备为分布式缓存Redis,倘使Command的处理是集群处理的,那大家理应用共享缓存,也等于用Redis这种分布式缓存);3)缓存更新好之后,调用I伊夫ntPublisher接口的Publish方法将伊芙ntStream公布出来,I伊夫ntPublisher的切实可行落到实处者会把当下的伊夫ntStream发送到EQueue。那3步是健康状态的流程。设若遭逢持久化到I伊芙ntStore时遇见版本号重复(同三个聚合根ID+聚合根的Version相同,则以为有出现争持),此时框架须求做不相同的逻辑处理;那点也在前边详细分析。
  11. 【从这一步开头拍卖Domain
    伊夫nt】EventStream被ENode.EQueue中的EventConsumer接收到,然后伊芙ntConsumer调用I伊夫ntProcessor处理当下的伊芙ntStream;
  12. I伊夫ntProcessor首先判断当前的伊芙ntStream是或不是足以被拍卖,那里大家需求保险的很首要的有个别是,必须确定保障事件的持久化顺序和被事件的订阅者处理的逐一要严苛平等,不然就会出现Command端的数量和Query端的Read
    DB中的数据不平等的情景。至于怎么样确认保证那么些顺序的等同,前边大家在详细分析。此地先举个简易的例子来验证为什么要逐项一致。比如即便今后有三个聚合根的2性子质,该属性的私下认可值是0,然后该属性先后爆发了八个Domain
    伊芙nt(代表的情趣分别是对这一个性子做+1,*2,-1)。那八个事件一旦依据那样的逐条爆发后,那那些天性最终的值是1;可是借使这个事件被消费者消费的次第是+1,-1,*2那最后的结果就不是1了,而是0了;所以经过这些例子,笔者想我们应该都了然了为啥要严苛管教聚合根持久化事件的次第必须和被消费的次第要完全一致了;
  13. 若是当前的伊夫ntStream允许被拍卖,则I伊夫ntProcessor对现阶段的伊芙ntStream中的各种IDomain伊夫nt做如下处理:1)依照IDomain伊夫nt的类型获得全体当前I伊夫ntProcessor节点上保有注册的I伊芙ntHandler,然后调用它们的Handle方法,达成比如Query端的Read
    DB的创新。唯独工作还没那么不难,因为我们还索要确定保障当前的IDomain伊夫nt只会被当下的I伊夫ntHandler处理3遍不然I伊夫ntHandler就会因为重新处理了IDomain伊夫nt而造成最后的多寡错乱;那里的幂等也在后边详细座谈。
  14. 稍许I伊芙ntHandler处理完IDomain伊夫nt后会爆发新的ICommand(正是Saga
    Process
    Manager)的事态。这种景况下,大家还需求把那几个发生的ICommand由框架自动发送到音讯队列(EQueue);不过工作也没那么简单,借使发送那一个ICommand退步了呢?那就要求重发,那重发如何布置才能确认保障不管重发多少次,也不会导致ICommand的再一次执行吗?那里实在最根本的一点是要确定保证你每一趟重发的ICommand的Id总是和率先次发送时要一如既往的,否则框架就不能够知道是或不是是同一个Command了。那里的切实可行规划后边再分析。

让Domain生活在In Memory中

相对而言一般的CQLacrosseS架构,enode每回在处理四个command,在取得聚合根时,不是从eventstore获取,而是从缓存获取。从地点的架构图能够见见,enode架构中有几个domain
memory
cache,最近用redis实现。那样做的益处是,将拥有的聚合根都缓存在redis缓存中,那样就能增长聚合根的读取时间;有2个标题亟需考虑,redis缓存服务器宕机了怎么办?宕机后缓存数据就没了,那怎么恢复生机那个缓存数据呢?那也是自作者采取redis的几个重聊城由,因为redis帮衬持久化,大家能够动用redis的aof或快速照相格局的持久化成效,来持久化缓存数据。从而得以在redis挂了后能最快的快慢苏醒缓存,重启redis服务器即可。那重启在此以前以及重启的历程中,因为不能从redis获取聚合根了,那只能从eventstore通过event
sourcing的格局去获取,那样的话质量肯定会相比较差,这如何做呢?答案是经过定时为聚合根成立快速照相,那也是接纳event
sourcing架构的1个便宜。我们能够定时对一些聚合跟创设快速照相(注意,小编觉着只必要考虑那个对性能供给很高的模块所关联到的聚合根创立快照即可),那怎么开创呢?可以开一个独立的经过,监听domain
event,对要求创立快速照相的domain
event做出判断,依照某种快速照相创制策略举行判断,纵然觉得需求创立快速照相,则从event
store拿出该聚合根的相干事件,通过event
sourcing还原获得某些版本的聚合根,那样就拿到了某些聚合根的某些版本的快速照相了。然后持久化起来即可。然后,enode帮忙在从event
store获取聚合根前,先反省是还是不是有快速照相,尽管有快速照相,则会先加载快照,再把快速照相之后的domain
event从event store获取,再把这几个快速照相之后的domain
event1个个apply到近期聚合根,从而获得最新事态的聚合根。那个进程比获得该聚合根的全体领域事件在二个个透过event
sourcing还原得到聚合根要快的多;越发是在八个聚合根的domain
event相比多的事态下就更有意义。因而,通过缓存的引入,我们得以升高command
handler的处理速度。

怎样确认保证事件时有爆发的顺序和被消费的相继相同

怎么要保管那一个相同的依次,在地点的流程手续介绍里曾经证实了。那里大家分析一下怎样贯彻那几个顺序的同等。基本的思绪是用2个表,存放全体聚合根当前一度处理过的最大学本科子号,假设当前已经处理过的最大版本号是10,那接下去只好处理那些聚合根版本号为11的伊芙ntStream。即使Version=12要么更前面包车型大巴先过来,也只能等着。那怎么等啊?也是近乎Command的重试队列一样,在一个本地的内部存款和储蓄器队列等就行了。比如未来最大已处理的版本号是10,然后未来12,13那四个版本号的伊芙ntStream先过来,这就先到行列等着,然后版本号是11的这几个事件还原了,就能够拍卖。处理好以往,当前最大已处理的版本号就编制程序11了,所以等候队列中的版本号为12的伊芙ntStream就能够允许被处理了。整个控制逻辑正是这么。那么那是单机的算法,尽管集群呢?实际上那不要考虑集群的情况,因为我们每台机器上都以这几个顺序控制逻辑,所以假设是集群,那最多或然出现的意况(实际上那种气象存在的大概性也是尤其的低)是,版本号为11的伊芙ntStream被冒出的拍卖。那种情状正是本人上边要分析的。

那边其实还有八个细节笔者还没说到,这些细节和EQueue的Consumer的ConsumerGroup相关,正是即便一种音讯,有许多Consumer消费,然后这么些Consumer如果分为多个ConsumerGroup,那那五个ConsumerGroup的消费是相互隔绝的。也正是说,全部这几个音信,五个ConsumerGroup内的Consumer都会开支到。这里假设不做一些别样的计划,恐怕会在用户采纳时遇见潜在的难点。那里笔者不能说的很掌握,说的太理解估算会让大家想想更混乱,且因为这么些点不是根本。所以就不举行了。有趣味的心上人能够看一下ENode中的伊芙ntPublishInfo表中的伊芙ntProcessorName字段的用意。

Command Handler二遍只处理三个Command

就算您不可能在command
handler中一次修改三个聚合根,作者觉得这应当是enode对开发人士的最大封锁,也许也是最让开发人士觉得优伤的地点。但作者认为那个不是约束,而是对数据强一致性和终极一致性的三个正确认识。在自个儿学过ddd+cqrs+event
sourcing那五个东西之后,笔者认识到,聚合内必须保障强一致性,聚合间最后一致性。古板三层开发,我们经过unit
of work形式(简称uow,比如nhibernate的session, entity
framework的dbcontext)能够轻易达成四个目的修改的强一致性事务;确实在价值观三层格局开发中,这种应用uow的措施来落到实处跨聚合的强一致性事务的措施很实用,开发起来很有利,开发人士能够不要顾虑会合世数量不相同的标题了,因为具有修改总是在三个业务内保留。

但enode的设计目的不是为着协理守旧三层开发,而是面向ddd+cqrs+eda+event
sourcing架构的框架。曾经本身也想让command
handler协助修改多少个聚合根,但诸如此类做必须求面临八个很伤脑筋的题材:command在发送到command
queue时,不能够依据聚合根ID来路由了。因为1个command会修改多少个聚合根,也等于说一个command不会和1个聚合根一一对应了。那意味同三个聚合根不能够总是被路由到同二个command
queue里,那样就招致相同ID的聚合根大概会在两台服务器被同时修改,这就会促成整个类别只怕会频仍的产生并发更新抵触。很多command就会频频的重试,整个类别的习性就会下滑。而enode设计之初便是为着高质量,所以那一点让自己觉着很难吸收。

反而,借使八个command总是只会创建或改动2个聚合根,那大家的command就能依照聚合根ID来路由到一定的音信队列,同1个聚合根ID总是会被路由到同3个queue,而三个queue的主顾服务器(command
handler所在的服务器)同暂且刻一而再只有四个,那大家就能担保多个聚合根的改动不会有出现难点。当然光这样还不够,在这么些command消费者服务器里,enode框架会用内部存款和储蓄器级别的queue对同叁个聚合根的富有command再一次实行排队(要是要求排队的话),之所以要这么是因为有时对二个聚合根的产出修改command恐怕1s内发送了不可胜道回复,所以command
handler肯定来不及在1s内整个拍卖掉那个command,所以须求在内部存款和储蓄器里再度排队(天猫商城双十一的时候,应用服务器内部也会有像样的对同二个聚合根设计四个相应的内部存款和储蓄器queue来防止对同1个聚合根的修改的产出争辩的标题)。通过如此的筹划,我们可以形成绝大部分场所下,不会再有出现争执的题材,也正是command不会再冒出重试的事态。那样结尾的坚守就是:差异ID的聚合根的处理能够相互,同三个ID的聚合根的拍卖是串行,通过两级排队达成。前面说到,那样只好做到绝当先四分之二处境下不会有出现冲突,那么怎么样时候依然会有出现顶牛吧?正是在新增command消费者服务器的时候,比如大家发现以来系统繁忙,大家意在大增command消费者服务器来加快command的拍卖,那在新增服务器后,原来修改某些聚合根的command大概会被路由到新的服务器,但是这些聚合根的有个别command只怕还在原先的服务器上还没实施完,此时就会冒出同三个聚合根在两台服务器上被同时修改的或是了;那这么些怎么消除吗?笔者未来的想法是框架层面不必解决了,大家只须要在系统最空的时候(比如凌晨4点)的时候,扩充服务器即可,因为极度时候音讯队列里的音讯是最少的,也便是不太大概会发出因为扩充command
handler服务器而造成现身抵触的难点,那样大家就能够最大限度的幸免大概带来的出现争辩。

ENode架构图

澳门美高梅手机网站 3

Event Store的设计

前言

今天是个畅快的生活,又是周末,能够轻松的写写小说了。二〇一八年,小编写了ENode
1.0本子,那时笔者也写了2个解析类别。经过了大多年的大运,小编对第二个本子做了广大架构上的创新,最主要的正是让ENode实现了分布式,通过新增一个分布式新闻队列EQueue来落到实处。之所以要设计1个分布式的新闻队列是因为在enode
1.0版本中,有些特定的音讯队列只好被有个别特定的顾客消费。这样就会导致二个题材,正是要是这么些消费者挂了,那这些消费者对应的音信队列就无法自动被其它消费者消费了。这些难题会一向导致系统不可用。而ENode
2.0中,就不会有这一个标题了,因为新闻队列被设计为独立的,被消费者所共享的;多个新闻队列能够被多少个买主集群消费或播报消费,若是三个顾客挂了,这其余的顾客会活动顶上。那里具体的底细,笔者会在前面详细介绍。

发表评论

电子邮件地址不会被公开。 必填项已用*标注