Post

DDD 中的事务与集成事件处理

eShopOnContainers 是微软官方提供的一个基于微服务架构的参照项目,它全面展示了如何运用领域驱动设计(DDD)原则来构建复杂系统。在本文中,我们将深入探讨 Ordering (订单) 微服务,特别是其如何通过手动管理事务和 MediatR 管道来确保数据一致性,以及领域事件和集成事件在其中扮演的关键角色。

为何需要手动管理事务?

在标准的 ASP.NET Core 应用中,Entity Framework Core (EF Core) 的 SaveChanges 方法会自动包裹在一个事务中。这个默认行为极大地简化了开发,只要实体跟踪不出问题,数据库的更新要么全部成功,要么全部回滚。

然而,在 Ordering 微服务中,我们看到了不同的做法。OrderingContext 选择了手动管理事务。为什么呢?

答案在于集成事件(Integration Events)的持久化。在 DDD 中,当一个聚合根(Aggregate Root)的状态发生重要变化时,我们通常会发布一个事件。如果这个事件需要通知其他微服务(即其他领域),它就成了一个集成事件。为了保证原子性,对聚合根的业务操作和集成事件的持久化必须在同一个事务中完成。

OrderingContext 不仅管理着订单相关的业务实体(如 Order, OrderItem),还负责持久化集成事件日志。这一点可以从 OnModelCreating 方法的配置中得到证实:

1
2
3
4
5
6
7
// src/Ordering.Infrastructure/OrderingContext.cs

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    // ... 其他配置
    modelBuilder.UseIntegrationEventLogs();
}

UseIntegrationEventLogs() 是一个扩展方法,它将 IntegrationEventLogEntry 实体添加到 DbContext 中,用于记录待发布的集成事件。由于事件的发布逻辑(位于 MediatR 管道中)和业务逻辑(位于 Command Handler 中)在代码层面是分离的,我们需要一个共享的事务来跨越这两个阶段,因此必须手动开启和提交事务。

MediatR 管道:事务的编排者

既然需要手动管理事务,那么事务的生命周期是在哪里被控制的呢?答案就在 MediatR 的管道(Pipeline)行为中。

在 Ordering.API 的服务配置中,我们可以看到一系列的 IPipelineBehavior 被注册了进来,其中就包括 TransactionBehavior

1
2
3
4
5
6
7
8
9
10
11
// src/Ordering.API/Extensions/ApplicationServicesExtensions.cs

// ...
services.AddMediatR(cfg =>
{
    cfg.RegisterServicesFromAssemblyContaining(typeof(Program));
    cfg.AddOpenBehavior(typeof(LoggingBehavior<,>));
    cfg.AddOpenBehavior(typeof(ValidatorBehavior<,>));
    cfg.AddOpenBehavior(typeof(TransactionBehavior<,>));
});
// ...

这些 Behavior 类似于 ASP.NET Core 的中间件,它们会按注册顺序包裹住真正的命令处理器(Command Handler)。TransactionBehavior 的职责正是在处理核心业务逻辑之前开启事务,并在处理完成后提交事务。

让我们看看 TransactionBehavior 的简化逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/Ordering.API/Application/Behaviors/TransactionBehavior.cs

public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
    // ...
    var strategy = _dbContext.Database.CreateExecutionStrategy();

    await strategy.ExecuteAsync(async () =>
    {
        await using var transaction = await _dbContext.BeginTransactionAsync();
        // 1. 开启事务

        response = await next(); // 2. 执行下一个 Behavior 或 Command Handler
                                 //    这其中包含了业务逻辑、领域事件分发和 SaveChanges

        await _dbContext.CommitTransactionAsync(transaction); // 3. 提交事务

        // 4. 发布集成事件到事件总线
        await _orderingIntegrationEventService.PublishEventsThroughEventBusAsync(transaction.TransactionId);
    });
    // ...
}

通过这种方式,MediatR 管道巧妙地将事务管理逻辑与业务逻辑解耦,实现了关注点分离。

逻辑流向:一次“创建订单”之旅

为了将所有知识点串联起来,我们以“创建订单”(CreateOrder) 这个用例为线索,完整地走一遍代码的逻辑流。

第 1 步: API 端点接收请求

请求首先到达 OrdersApi,CreateOrderAsync 方法接收到 HTTP POST 请求。它将请求包装成一个 CreateOrderCommand,并通过 MediatR 发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/Ordering.API/Apis/OrdersApi.cs

public static async Task<Results<Ok, BadRequest<string>>> CreateOrderAsync(
    [FromHeader(Name = "x-requestid")] Guid requestId,
    CreateOrderRequest request,
    [AsParameters] OrderServices services)
{
    // ...
    var createOrderCommand = new CreateOrderCommand(/* ... */);
    var requestCreateOrder = new IdentifiedCommand<CreateOrderCommand, bool>(createOrderCommand, requestId);
    var result = await services.Mediator.Send(requestCreateOrder);
    // ...
}

第 2 步: TransactionBehavior 启动事务

MediatR 接收到命令后,TransactionBehavior 作为管道的第一层(在日志和验证之后),开始执行。它调用 _dbContext.BeginTransactionAsync(),开启一个新的数据库事务。

第 3 步: CreateOrderCommandHandler 处理业务逻辑

接下来,控制权交给了 CreateOrderCommandHandler。在这里,核心的业务逻辑被执行:

  1. 创建一个 OrderStartedIntegrationEvent 集成事件,并暂存起来,等待事务成功后发布。
  2. 创建 Order 聚合根,并添加 OrderItem。
  3. 调用 _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// src/Ordering.API/Application/Commands/CreateOrderCommandHandler.cs

public async Task<bool> Handle(CreateOrderCommand message, CancellationToken cancellationToken)
{
    // 添加集成事件,此时仅保存到数据库的事件日志表中
    var orderStartedIntegrationEvent = new OrderStartedIntegrationEvent(message.UserId);
    await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStartedIntegrationEvent);

    // 创建聚合根和值对象
    var address = new Address(...);
    var order = new Order(...);

    // ... 添加订单项

    _orderRepository.Add(order);

    // 保存实体变更和领域事件
    return await _orderRepository.UnitOfWork.SaveEntitiesAsync(cancellationToken);
}

第 4 步: SaveEntitiesAsync 与领域事件

SaveEntitiesAsync 是工作单元(Unit of Work)模式的实现,它在 OrderingContext 中。这一步非常关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// src/Ordering.Infrastructure/OrderingContext.cs

public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
{
    // 1. 分发领域事件 (Domain Events)
    // 这些事件在同一个领域内部进行处理,例如更新其他聚合的状态
    await _mediator.DispatchDomainEventsAsync(this);

    // 2. 调用 EF Core 的 SaveChangesAsync
    // 将所有被跟踪的实体变更(包括领域事件处理器可能引起的变更)和
    // 之前暂存的集成事件日志,一并写入数据库。
    // 注意:此时事务尚未提交!
    _ = await base.SaveChangesAsync(cancellationToken);

    return true;
}

这里需要区分领域事件和集成事件:

领域事件 (Domain Events): 在聚合内部或领域内部广播,用于处理副作用,通常是同步执行的。DispatchDomainEventsAsync 负责的正是这个。 集成事件 (Integration Events): 用于跨微服务通信,是异步的。它们在 CreateOrderCommandHandler 中被创建并保存到数据库,但直到事务提交后才会被发布到事件总线。 SaveChangesAsync 执行后,所有的数据变更都已准备就绪,等待事务的最终裁决。

第 5 步: TransactionBehavior 提交事务并发布事件

CreateOrderCommandHandler 执行完毕,控制权回到 TransactionBehavior

它调用 _dbContext.CommitTransactionAsync(transaction),将之前 SaveChangesAsync 产生的所有数据库变更原子性地提交。 事务成功后,调用 _orderingIntegrationEventService.PublishEventsThroughEventBusAsync(),从事件日志表中读取刚刚持久化的集成事件,并将它们发布到 RabbitMQ 等事件总线上。 至此,一个订单被成功创建,并且通知其他微服务(如 Basket.API 清理购物车)的集成事件也被可靠地发了出去。如果过程中任何一步失败,TransactionBehavior 的 catch 块会捕获异常并回滚整个事务,数据库将恢复到操作前的状态。

总结

eShopOnContainers 的 Ordering 服务为我们提供了一个 DDD 实践的绝佳范例。通过对 MediatR 管道、手动事务管理、领域事件和集成事件的精妙组合,它解决了在复杂业务场景下保证数据一致性的核心难题。这种模式虽然比 EF Core 的默认行为要复杂,但它提供了更强的灵活性和控制力,是构建健壮、可扩展的分布式系统的关键。

This post is licensed under CC BY 4.0 by the author.