Skip to main content

22. 事件总线

v2.20 以下版本说明

Furion v2.20+ 版本后采用 Jaina 事件总线替换原有的 EventBus查看旧文档

22.1 关于事件总线

事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。

22.2 快速入门

  1. 定义事件订阅者 ToDoEventSubscriber
// 实现 IEventSubscriber 接口public class ToDoEventSubscriber : IEventSubscriber{    private readonly ILogger<ToDoEventSubscriber> _logger;    public ToDoEventSubscriber(ILogger<ToDoEventSubscriber> logger)    {        _logger = logger;    }    [EventSubscribe("ToDo:Create")]    public async Task CreateToDo(EventHandlerExecutingContext context)    {        var todo = context.Source;        _logger.LogInformation("创建一个 ToDo:{Name}", todo.Payload);        await Task.CompletedTask;    }    // 支持多个    [EventSubscribe("ToDo:Create")]    [EventSubscribe("ToDo:Update")]    public async Task CreateOrUpdateToDo(EventHandlerExecutingContext context)    {        var todo = context.Source;        _logger.LogInformation("创建或更新一个 ToDo:{Name}", todo.Payload);        await Task.CompletedTask;    }    // 支持枚举类型,v3.4.3+ 版本支持    [EventSubscribe(YourEnum.Some)]    public async Task EnumHandler(EventHandlerExecutingContext context)    {        var eventEnum = context.Source.EventId.ParseToEnum(); // 将事件 Id 转换成枚举对象        await Task.CompletedTask;    }}
  1. 创建控制器 ToDoController,依赖注入 IEventPublisher 服务:
public class ToDoController : ControllerBase{    // 依赖注入事件发布者 IEventPublisher    private readonly IEventPublisher _eventPublisher;    public ToDoController(IEventPublisher eventPublisher)    {        _eventPublisher = eventPublisher;    }    // 发布 ToDo:Create 消息    public async Task CreateDoTo(string name)    {        await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Create", name));    }    // v3.4.3+ 版本支持发送消息简化    public async Task CreateDoTo(string name)    {        await _eventPublisher.PublishAsync("ToDo:Create", name);        // 也支持枚举        await _eventPublisher.PublishAsync(YourEnum.Some);    }}
  1. Startup.cs 注册 EventBus 服务:
// 注册 EventBus 服务services.AddEventBus(builder =>{    // 注册 ToDo 事件订阅者    builder.AddSubscriber<ToDoEventSubscriber>();    // 批量注册事件订阅者    builder.AddSubscribers(ass1, ass2, ....);});
懒人提醒

Furion 中可以不用通过 builder.AddSubscriber<T>() 方式一一注册,只需要实现 ISingleton 接口即可,如:

public class ToDoEventSubscriber : IEventSubscriber, ISingleton{}

这样就无需写 builder.AddSubscriber<ToDoEventSubscriber>(); 代码了,只需保留 services.AddEventBus() 服务即可。

  1. 运行项目:
info: Jaina.Samples.ToDoEventSubscriber[0]      创建一个 ToDo:Jaina

22.3 自定义事件源

Furion 使用 IEventSource 作为消息载体,任何实现该接口的类都可以充当消息载体。

如需自定义,只需实现 IEventSource 接口即可:

public class ToDoEventSource : IEventSource{    public ToDoEventSource(string eventId, string todoName)    {        EventId = eventId;        ToDoName = todoName;    }    // 自定义属性    public string ToDoName { get; }    /// <summary>    /// 事件 Id    /// </summary>    public string EventId { get; }    /// <summary>    /// 事件承载(携带)数据    /// </summary>    public object Payload { get; }    /// <summary>    /// 取消任务 Token    /// </summary>    /// <remarks>用于取消本次消息处理</remarks>    public CancellationToken CancellationToken { get; }    /// <summary>    /// 事件创建时间    /// </summary>    public DateTime CreatedTime { get; } = DateTime.UtcNow;}

使用:

await _eventPublisher.PublishAsync(new ToDoEventSource ("ToDo:Create", "我的 ToDo Name"));

22.4 自定义事件源存储器

Fruion 默认采用 Channel 作为事件源 IEventSource 存储器,开发者可以使用任何消息队列组件进行替换,如 Kafka、RabbitMQ、ActiveMQ 等,也可以使用部分数据库 Redis、SQL Server、MySql 实现。

如需自定义,只需实现 IEventSourceStorer 接口即可:

public class RedisEventSourceStorer : IEventSourceStorer{    private readonly IRedisClient _redisClient;    public RedisEventSourceStorer(IRedisClient redisClient)    {        _redisClient = redisClient;    }    // 往 Redis 中写入一条    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)    {        await _redisClient.WriteAsync(...., cancellationToken);    }    // 从 Redis 中读取一条    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)    {       return await _redisClient.ReadAsync(...., cancellationToken);    }}

最后,在注册 EventBus 服务中替换默认 IEventSourceStorer

services.AddEventBus(builder =>{    // 替换事件源存储器    builder.ReplaceStorer(serviceProvider =>    {        var redisClient = serviceProvider.GetService<IRedisClient>();        return new RedisEventSourceStorer(redisClient);    });});

22.5 自定义事件发布者

Furion 默认内置基于 Channel 的事件发布者 ChannelEventPublisher

如需自定义,只需实现 IEventPublisher 接口即可:

public class ToDoEventPublisher : IEventPublisher{    private readonly IEventSourceStorer _eventSourceStorer;    public ChannelEventPublisher(IEventSourceStorer eventSourceStorer)    {        _eventSourceStorer = eventSourceStorer;    }    public async Task PublishAsync(IEventSource eventSource)    {        await _eventSourceStorer.WriteAsync(eventSource, eventSource.CancellationToken);    }}

最后,在注册 EventBus 服务中替换默认 IEventPublisher

services.AddEventBus(builder =>{    // 替换事件源存储器    builder.ReplacePublisher<ToDoEventPublisher>();});

22.6 添加事件执行监视器

Furion 提供了 IEventHandlerMonitor 监视器接口,实现该接口可以监视所有订阅事件,包括 执行之前、执行之后,执行异常,共享上下文数据

如添加 ToDoEventHandlerMonitor

public class ToDoEventHandlerMonitor : IEventHandlerMonitor{    private readonly ILogger<ToDoEventHandlerMonitor> _logger;    public ToDoEventHandlerMonitor(ILogger<ToDoEventHandlerMonitor> logger)    {        _logger = logger;    }    public Task OnExecutingAsync(EventHandlerExecutingContext context)    {        _logger.LogInformation("执行之前:{EventId}", context.Source.EventId);        return Task.CompletedTask;    }    public Task OnExecutedAsync(EventHandlerExecutedContext context)    {        _logger.LogInformation("执行之后:{EventId}", context.Source.EventId);        if (context.Exception != null)        {            _logger.LogError(context.Exception, "执行出错啦:{EventId}", context.Source.EventId);        }        return Task.CompletedTask;    }}

最后,在注册 EventBus 服务中注册 ToDoEventHandlerMonitor

services.AddEventBus(builder =>{    // 添加事件执行监视器    builder.AddMonitor<ToDoEventHandlerMonitor>();});

22.7 添加事件执行器

Furion 提供了 IEventHandlerExecutor 执行器接口,可以让开发者自定义事件处理函数执行策略,如 超时控制,失败重试、熔断等等

如添加 RetryEventHandlerExecutor

public class RetryEventHandlerExecutor : IEventHandlerExecutor{    public async Task ExecuteAsync(EventHandlerExecutingContext context, Func<EventHandlerExecutingContext, Task> handler)    {        // 如果执行失败,每隔 1s 重试,最多三次        await Retry.InvokeAsync(async () => {            await handler(context);        }, 3, 1000);    }}

最后,在注册 EventBus 服务中注册 RetryEventHandlerExecutor

services.AddEventBus(builder =>{    // 添加事件执行器    builder.AddExecutor<RetryEventHandlerExecutor>();});

22.8 使用有作用域的服务

Furion 中, Event Bus 所有服务均注册为单例,如需使用作用域服务(单例服务可直接注入),可通过依赖注入 IServiceProvider 实例并通过 CreateScope() 创建一个作用域,如:

public class ToDoEventSubscriber : IEventSubscriber{    private readonly ILogger<ToDoEventSubscriber> _logger;    public ToDoEventSubscriber(IServiceProvider services        , ILogger<ToDoEventSubscriber> logger)    {        Services = services;        _logger = logger;    }    public IServiceProvider Services { get; }    [EventSubscribe("ToDo:Create")]    public async Task CreateToDo(EventHandlerExecutingContext context)    {        // 创建新的作用域        using var scope = Services.CreateScope();        // 解析服务        var scopedProcessingService = scope.ServiceProvider.GetRequiredService<IScopedProcessingService>();        // ....    }}

22.9 订阅执行任务意外异常

services.AddEventBus(builder =>{    // 订阅 EventBus 未捕获异常    builder.UnobservedTaskExceptionHandler = (obj, args) =>    {        // ....    };});

22.10 EventBusOptionsBuilder 配置

EventBusOptionsBuilderAddEventBus 构建服务选项,该选项包含以下属性和方法:

  • 属性
    • ChannelCapacity:默认内存通道容量
    • UnobservedTaskExceptionHandler:订阅执行任务未察觉异常
    • UseUtcTimestamp:是否使用 UTC 事件,默认 false
  • 方法
    • AddSubscriber<TEventSubscriber>:添加订阅者
    • ReplacePublisher<TEventPublisher>:替换发布者
    • ReplaceStorer(Func<IServiceProvider, IEventSourceStorer>):替换存储器
    • AddMonitor<TEventHandlerMonitor>:添加监视器
    • AddExecutor<TEventHandlerExecutor>:添加执行器

22.11 反馈与建议

与我们交流

给 Furion 提 Issue

演练场