⭐️ Furion v4 版本支持【所有历史版本】无缝升级,一套代码兼容 .NET 5+ ⭐️
Skip to main content

22. 事件总线

v2.20 以下版本说明

Furion v2.20+ 版本后采用 Jaina 事件总线替换原有的 EventBus查看旧文档

22.1 关于事件总线

事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。

22.2 快速入门

  1. 定义事件订阅者 ToDoEventSubscriber
// 实现 IEventSubscriber 接口public class ToDoEventSubscriber : IEventSubscriber{    private readonly ILogger<ToDoEventSubscriber> _logger;    public ToDoEventSubscriber(ILogger<ToDoEventSubscriber> logger)    {        _logger = logger;    }    [EventSubscribe("ToDo:Create")]    public async Task CreateToDo(EventHandlerExecutingContext context)    {        var todo = context.Source;        _logger.LogInformation("创建一个 ToDo:{Name}", todo.Payload);        await Task.CompletedTask;    }    // 支持多个    [EventSubscribe("ToDo:Create")]    [EventSubscribe("ToDo:Update")]    public async Task CreateOrUpdateToDo(EventHandlerExecutingContext context)    {        var todo = context.Source;        _logger.LogInformation("创建或更新一个 ToDo:{Name}", todo.Payload);        await Task.CompletedTask;    }    // 支持枚举类型,v3.4.3+ 版本支持    [EventSubscribe(YourEnum.Some)]    public async Task EnumHandler(EventHandlerExecutingContext context)    {        var eventEnum = context.Source.EventId.ParseToEnum(); // 将事件 Id 转换成枚举对象        await Task.CompletedTask;    }    // 支持正则表达式匹配,4.2.10+ 版本支持    [EventSubscribe("(^1[3456789][0-9]{9}$)|((^[0-9]{3,4}\\-[0-9]{3,8}$)|(^[0-9]{3,8}$)|(^\\([0-9]{3,4}\\)[0-9]{3,8}$)|(^0{0,1}13[0-9]{9}$))", FuzzyMatch = true)]    public async Task RegexHandler(EventHandlerExecutingContext context)    {        var eventId = context.Source.EventId;        await Task.CompletedTask;    }    // 支持多种异常重试配置,4.2.10+ 版本支持    [EventSubscribe("test:error", NumRetries = 3)]    [EventSubscribe("test:error", NumRetries = 3, RetryTimeout = 1000)] // 重试间隔时间    [EventSubscribe("test:error", NumRetries = 3, ExceptionTypes = new[] { typeof(ArgumentException) })]    // 特定类型异常才重试    public async Task ExceptionHandler(EventHandlerExecutingContext context)    {        var eventId = context.Source.EventId;        await Task.CompletedTask;    }}
  1. 创建控制器 ToDoController,依赖注入 IEventPublisher 服务:
public class ToDoController : ControllerBase{    // 依赖注入事件发布者 IEventPublisher    private readonly IEventPublisher _eventPublisher;    public ToDoController(IEventPublisher eventPublisher)    {        _eventPublisher = eventPublisher;    }    // 发布 ToDo:Create 消息    public async Task CreateDoTo(string name)    {        await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Create", name));    }    // v3.4.3+ 版本支持发送消息简化    public async Task CreateDoTo(string name)    {        await _eventPublisher.PublishAsync("ToDo:Create", name);        // 也支持枚举        await _eventPublisher.PublishAsync(YourEnum.Some);    }}
  1. Startup.cs 注册 EventBus 服务:
// 注册 EventBus 服务services.AddEventBus(builder =>{    // 注册 ToDo 事件订阅者    builder.AddSubscriber<ToDoEventSubscriber>();    // 通过类型注册,Furion 4.2.1+ 版本    builder.AddSubscriber(typeof(ToDoEventSubscriber));    // 批量注册事件订阅者    builder.AddSubscribers(ass1, ass2, ....);});
懒人提醒

Furion 中可以不用通过 builder.AddSubscriber<T>() 方式一一注册,只需要实现 ISingleton 接口即可,如:

public class ToDoEventSubscriber : IEventSubscriber, ISingleton{}

这样就无需写 builder.AddSubscriber<ToDoEventSubscriber>(); 代码了,只需保留 services.AddEventBus() 服务即可。

  1. 运行项目:
info: Jaina.Samples.ToDoEventSubscriber[0]      创建一个 ToDo:Jaina

22.3 自定义事件源

Furion 使用 IEventSource 作为消息载体,任何实现该接口的类都可以充当消息载体。

如需自定义,只需实现 IEventSource 接口即可:

public class ToDoEventSource : IEventSource{    public ToDoEventSource()    {    }    public ToDoEventSource(string eventId, string todoName)    {        EventId = eventId;        ToDoName = todoName;    }    // 自定义属性    public string ToDoName { get; set; }    /// <summary>    /// 事件 Id    /// </summary>    public string EventId { get; set; }    /// <summary>    /// 事件承载(携带)数据    /// </summary>    public object Payload { get; set; }    /// <summary>    /// 事件创建时间    /// </summary>    public DateTime CreatedTime { get; set; } = DateTime.UtcNow;    /// <summary>    /// 取消任务 Token    /// </summary>    /// <remarks>用于取消本次消息处理</remarks>    [Newtonsoft.Json.JsonIgnore]    [System.Text.Json.Serialization.JsonIgnore]    public CancellationToken CancellationToken { get; set; }}

使用:

await _eventPublisher.PublishAsync(new ToDoEventSource ("ToDo:Create", "我的 ToDo Name"));

22.4 自定义事件源存储器

Fruion 默认采用 Channel 作为事件源 IEventSource 存储器,开发者可以使用任何消息队列组件进行替换,如 Kafka、RabbitMQ、ActiveMQ 等,也可以使用部分数据库 Redis、SQL Server、MySql 实现。

如需自定义,只需实现 IEventSourceStorer 接口即可:

public class RedisEventSourceStorer : IEventSourceStorer{    private readonly IRedisClient _redisClient;    public RedisEventSourceStorer(IRedisClient redisClient)    {        _redisClient = redisClient;    }    // 往 Redis 中写入一条    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)    {        await _redisClient.WriteAsync(...., cancellationToken);    }    // 从 Redis 中读取一条    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)    {       return await _redisClient.ReadAsync(...., cancellationToken);    }}

最后,在注册 EventBus 服务中替换默认 IEventSourceStorer

services.AddEventBus(builder =>{    // 替换事件源存储器    builder.ReplaceStorer(serviceProvider =>    {        var redisClient = serviceProvider.GetService<IRedisClient>();        return new RedisEventSourceStorer(redisClient);    });});

22.4.1 RabbitMQ 自定义指南

版本说明

以下内容仅限 Furion 4.3.4 + 版本使用。

由于使用 RabbitMQ 作为事件总线存储器的比较多,所以这里提供了完整的使用例子。

1. 安装 RabbitMQ.Client 拓展包

Install-Package RabbitMQ.Client -Version 6.4.0

2. 创建 RabbitMQEventSourceStorer 自定义存储器

using Furion.EventBus;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;using System.Text.Json;using System.Threading;using System.Threading.Channels;using System.Threading.Tasks;namespace Furion.Core;public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable{    /// <summary>    /// 内存通道事件源存储器    /// </summary>    private readonly Channel<IEventSource> _channel;    /// <summary>    /// 通道对象    /// </summary>    private readonly IModel _model;    /// <summary>    /// 连接对象    /// </summary>    private readonly IConnection _connection;    /// <summary>    /// 路由键    /// </summary>    private readonly string _routeKey;    /// <summary>    /// 构造函数    /// </summary>    /// <param name="factory">连接工厂</param>    /// <param name="routeKey">路由键</param>    /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>    public RabbitMQEventSourceStorer(ConnectionFactory factory, string routeKey, int capacity)    {        // 配置通道,设置超出默认容量后进入等待        var boundedChannelOptions = new BoundedChannelOptions(capacity)        {            FullMode = BoundedChannelFullMode.Wait        };        // 创建有限容量通道        _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);        // 创建连接        _connection = factory.CreateConnection();        _routeKey = routeKey;        // 创建通道        _model = _connection.CreateModel();        // 声明路由队列        _model.QueueDeclare(routeKey, false, false, false, null);        // 创建消息订阅者        var consumer = new EventingBasicConsumer(_model);        // 订阅消息并写入内存 Channel        consumer.Received += (ch, ea) =>        {            // 读取原始消息            var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray());            // 转换为 IEventSource,这里可以选择自己喜欢的序列化工具,如果自定义了 EventSource,注意属性是可读可写            var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(stringEventSource);            // 写入内存管道存储器            _channel.Writer.TryWrite(eventSource);            // 确认该消息已被消费            _model.BasicAck(ea.DeliveryTag, false);        };        // 启动消费者 设置为手动应答消息        _model.BasicConsume(routeKey, false, consumer);    }    /// <summary>    /// 将事件源写入存储器    /// </summary>    /// <param name="eventSource">事件源对象</param>    /// <param name="cancellationToken">取消任务 Token</param>    /// <returns><see cref="ValueTask"/></returns>    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)    {        // 空检查        if (eventSource == default)        {            throw new ArgumentNullException(nameof(eventSource));        }        // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource        if (eventSource is ChannelEventSource source)        {            // 序列化,这里可以选择自己喜欢的序列化工具            var data = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(source));            // 发布            _model.BasicPublish("", _routeKey, null, data);        }        else        {            // 这里处理动态订阅问题            await _channel.Writer.WriteAsync(eventSource, cancellationToken);        }    }    /// <summary>    /// 从存储器中读取一条事件源    /// </summary>    /// <param name="cancellationToken">取消任务 Token</param>    /// <returns>事件源对象</returns>    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)    {        // 读取一条事件源        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);        return eventSource;    }    /// <summary>    /// 释放非托管资源    /// </summary>    public void Dispose()    {        _model.Dispose();        _connection.Dispose();    }}

3. 替换默认事件存储器

services.AddEventBus(options =>{    // 创建连接工厂    var factory = new ConnectionFactory    {        UserName = "admin",        Password = "q1w2e3",    };    // 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus    var rbmqEventSourceStorer = new RabbitMQEventSourceStorer(factory, "eventbus", 3000);    // 替换默认事件总线存储器    options.ReplaceStorer(serviceProvider =>    {        return rbmqEventSourceStorer;    });});

22.4.2 Kafka 自定义指南

1. 安装 Confluent.Kafka 拓展包

Install-Package Confluent.Kafka -Version 1.9.3

2. 创建 EventConsumer 订阅类

using Confluent.Kafka;namespace Furion.Core;/// <summary>/// Kafka 消息扩展/// </summary>/// <typeparam name="TKey"></typeparam>/// <typeparam name="TValue"></typeparam>public class EventConsumer<TKey, TValue> : IDisposable{    private Task _consumerTask;    private CancellationTokenSource _consumerCts;    /// <summary>    /// 消费者    /// </summary>    public IConsumer<TKey, TValue> Consumer { get; }    /// <summary>    /// ConsumerBuilder    /// </summary>    public ConsumerBuilder<TKey, TValue> Builder { get; set; }    /// <summary>    /// 消息回调    /// </summary>    public event EventHandler<ConsumeResult<TKey, TValue>> Received;    /// <summary>    /// 异常回调    /// </summary>    public event EventHandler<ConsumeException> OnConsumeException;    /// <summary>    /// 构造函数    /// </summary>    /// <param name="config"></param>    public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)    {        Builder = new ConsumerBuilder<TKey, TValue>(config);        Consumer = Builder.Build();    }    /// <summary>    /// 启动    /// </summary>    /// <exception cref="InvalidOperationException"></exception>    public void Start()    {        if (Consumer.Subscription?.Any() != true)        {            throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");        }        if (_consumerTask != null)        {            return;        }        _consumerCts = new CancellationTokenSource();        var ct = _consumerCts.Token;        _consumerTask = Task.Factory.StartNew(() =>        {            while (!ct.IsCancellationRequested)            {                try                {                    var cr = Consumer.Consume(TimeSpan.FromSeconds(1));                    if (cr == null) continue;                    Received?.Invoke(this, cr);                }                catch (ConsumeException e)                {                    OnConsumeException?.Invoke(this, e);                }            }        }, ct, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);    }    /// <summary>    /// 停止    /// </summary>    /// <returns></returns>    public async Task Stop()    {        if (_consumerCts == null || _consumerTask == null) return;        _consumerCts.Cancel();        try        {            await _consumerTask;        }        finally        {            _consumerTask = null;            _consumerCts = null;        }    }    /// <summary>    /// 释放    /// </summary>    public void Dispose()    {        Dispose(true);        GC.SuppressFinalize(this);    }    /// <summary>    /// 释放    /// </summary>    /// <param name="disposing"></param>    protected virtual void Dispose(bool disposing)    {        if (disposing)        {            if (_consumerTask != null)            {                Stop().Wait();            }            Consumer?.Dispose();        }    }}

3. 创建 KafkaEventSourceStore 自定义存储器

using Confluent.Kafka;using Furion.EventBus;using Newtonsoft.Json;using System.Threading.Channels;namespace Furion.Core;/// <summary>/// Kafka 存储源/// </summary>public class KafkaEventSourceStore : IEventSourceStorer, IDisposable{    /// <summary>    /// 内存通道事件源存储器    /// </summary>    private readonly Channel<IEventSource> _channel;    /// <summary>    /// 主题    /// </summary>    private readonly string _topic;    /// <summary>    /// 消费者    /// </summary>    private readonly EventConsumer<Null, string> _eventConsumer;    /// <summary>    /// 生产者    /// </summary>    private readonly IProducer<Null, string> _producer;    /// <summary>    /// 构造函数    /// </summary>    /// <param name="consumerConf">消费者配置</param>    /// <param name="producerConf">生产者配置</param>    /// <param name="topic">主题</param>    /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>    public KafkaEventSourceStore(ConsumerConfig consumerConf, ProducerConfig producerConf, string topic, int capacity)    {        // 配置通道,设置超出默认容量后进入等待        var boundedChannelOptions = new BoundedChannelOptions(capacity)        {            FullMode = BoundedChannelFullMode.Wait        };        // 创建有限容量通道        _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);        // 主题        _topic = topic;        // 创建消息订阅者        _eventConsumer = new EventConsumer<Null, string>(consumerConf);        _eventConsumer.Consumer.Subscribe(new[] { topic });        // 订阅消息写入 Channel        _eventConsumer.Received += (send, cr) =>        {            // 反序列化消息            var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr.Message.Value);            // 写入内存管道存储器            _channel.Writer.TryWrite(eventSource);        };        // 启动消费者        _eventConsumer.Start();        // 创建生产者        _producer = new ProducerBuilder<Null, string>(producerConf).Build();    }    /// <summary>    /// 将事件源写入存储器    /// </summary>    /// <param name="eventSource">事件源对象</param>    /// <param name="cancellationToken">取消任务 Token</param>    /// <returns><see cref="ValueTask"/></returns>    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)    {        if (eventSource == default)        {            throw new ArgumentNullException(nameof(eventSource));        }        // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource        if (eventSource is ChannelEventSource source)        {            // 序列化            var data = JsonConvert.SerializeObject(source);            // 异步发布            await _producer.ProduceAsync(_topic, new Message<Null, string>            {                Value = data            }, cancellationToken);        }        else        {            // 这里处理动态订阅问题            await _channel.Writer.WriteAsync(eventSource, cancellationToken);        }    }    /// <summary>    /// 从存储器中读取一条事件源    /// </summary>    /// <param name="cancellationToken">取消任务 Token</param>    /// <returns>事件源对象</returns>    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)    {        // 读取一条事件源        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);        return eventSource;    }    /// <summary>    /// 释放非托管资源    /// </summary>    public async void Dispose()    {        await _eventConsumer.Stop();        GC.SuppressFinalize(this);    }}

4. 替换默认事件存储器

services.AddEventBus(options =>{    var consumerConf = new ConsumerConfig    {        BootstrapServers = "xxx.xxx.xxx.xxx:9092",        GroupId = "Consumer",        AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起    };    var producerConf = new ProducerConfig    {        BootstrapServers = "xxx.xxx.xxx.xxx:9092",        BatchSize = 16384, // 修改批次大小为16K        LingerMs = 20 // 修改等待时间为20ms    };    // 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus    var kafkaEventSourceStorer = new KafkaEventSourceStore(consumerConf, producerConf, "testTopic", 30000);    // 替换默认事件总线存储器    options.ReplaceStorer(serviceProvider =>    {        return kafkaEventSourceStorer;    });});

22.5 自定义事件发布者

Furion 默认内置基于 Channel 的事件发布者 ChannelEventPublisher

如需自定义,只需实现 IEventPublisher 接口即可:

public class ToDoEventPublisher : IEventPublisher{    private readonly IEventSourceStorer _eventSourceStorer;    public ChannelEventPublisher(IEventSourceStorer eventSourceStorer)    {        _eventSourceStorer = eventSourceStorer;    }    public async Task PublishAsync(IEventSource eventSource)    {        await _eventSourceStorer.WriteAsync(eventSource, eventSource.CancellationToken);    }}

最后,在注册 EventBus 服务中替换默认 IEventPublisher

services.AddEventBus(builder =>{    // 替换事件源存储器    builder.ReplacePublisher<ToDoEventPublisher>();});

22.6 添加事件执行监视器

Furion 提供了 IEventHandlerMonitor 监视器接口,实现该接口可以监视所有订阅事件,包括 执行之前、执行之后,执行异常,共享上下文数据

如添加 ToDoEventHandlerMonitor

public class ToDoEventHandlerMonitor : IEventHandlerMonitor{    private readonly ILogger<ToDoEventHandlerMonitor> _logger;    public ToDoEventHandlerMonitor(ILogger<ToDoEventHandlerMonitor> logger)    {        _logger = logger;    }    public Task OnExecutingAsync(EventHandlerExecutingContext context)    {        _logger.LogInformation("执行之前:{EventId}", context.Source.EventId);        return Task.CompletedTask;    }    public Task OnExecutedAsync(EventHandlerExecutedContext context)    {        _logger.LogInformation("执行之后:{EventId}", context.Source.EventId);        if (context.Exception != null)        {            _logger.LogError(context.Exception, "执行出错啦:{EventId}", context.Source.EventId);        }        return Task.CompletedTask;    }}

最后,在注册 EventBus 服务中注册 ToDoEventHandlerMonitor

services.AddEventBus(builder =>{    // 添加事件执行监视器    builder.AddMonitor<ToDoEventHandlerMonitor>();});

22.7 添加事件执行器

Furion 提供了 IEventHandlerExecutor 执行器接口,可以让开发者自定义事件处理函数执行策略,如 超时控制,失败重试、熔断等等

如添加 RetryEventHandlerExecutor

public class RetryEventHandlerExecutor : IEventHandlerExecutor{    public async Task ExecuteAsync(EventHandlerExecutingContext context, Func<EventHandlerExecutingContext, Task> handler)    {        // 如果执行失败,每隔 1s 重试,最多三次        await Retry.InvokeAsync(async () => {            await handler(context);        }, 3, 1000);    }}

最后,在注册 EventBus 服务中注册 RetryEventHandlerExecutor

services.AddEventBus(builder =>{    // 添加事件执行器    builder.AddExecutor<RetryEventHandlerExecutor>();});

22.8 使用有作用域的服务

Furion 中, Event Bus 所有服务均注册为单例,如需使用作用域服务(单例服务可直接注入),可通过依赖注入 IServiceProvider 实例并通过 CreateScope() 创建一个作用域,如:

public class ToDoEventSubscriber : IEventSubscriber{    private readonly ILogger<ToDoEventSubscriber> _logger;    public ToDoEventSubscriber(IServiceProvider services        , ILogger<ToDoEventSubscriber> logger)    {        Services = services;        _logger = logger;    }    public IServiceProvider Services { get; }    [EventSubscribe("ToDo:Create")]    public async Task CreateToDo(EventHandlerExecutingContext context)    {        // 创建新的作用域        using var scope = Services.CreateScope();        // 解析服务        var scopedProcessingService = scope.ServiceProvider.GetRequiredService<IScopedProcessingService>();        // ....    }}

22.9 订阅执行任务意外异常

services.AddEventBus(builder =>{    // 订阅 EventBus 未捕获异常    builder.UnobservedTaskExceptionHandler = (obj, args) =>    {        // ....    };});

22.10 事件总线工厂

版本说明

以下内容仅限 Furion 4.2.10 + 版本使用。

在该版本中,Furion 提供了 IEventBusFactory 工厂服务,可在运行时动态新增或删除订阅,如:

public class TestEventBus : IDynamicApiController{    private readonly IEventPublisher _eventPublisher;    private readonly IEventBusFactory _eventBusFactory;    public TestEventBus(IEventPublisher eventPublisher, IEventBusFactory eventBusFactory)    {        _eventPublisher = eventPublisher;        _eventBusFactory = eventBusFactory;    }    // 运行时动态添加一个订阅器    public async Task AddSubscriber()    {        await _eventBusFactory.Subscribe("xxx", async (ctx) =>        {            Console.WriteLine("我是动态的");            await Task.CompletedTask;        });    }    // 运行时动态删除一个订阅器    public async Task RemoveDynamic(string eventId)    {        await _eventBusFactory.Unsubscribe(eventId);    }}

22.11 MessageCenter 静态类

版本说明

以下内容仅限 Furion 4.3.3 + 版本使用。

Furion 4.3.3 版本新增了 MessageCenter 静态类,可在任何地方发送事件消息或订阅消息。

// 发送消息(含诸多重载)await MessageCenter.PublishAsync("messageId", new {});// 动态订阅消息MessageCenter.Subscribe("messageId", async (ctx) => {    Console.WriteLine("我是动态的");    await Task.CompletedTask;});// 取消订阅MessageCenter.Unsubscribe("messageId");

22.12 EventBusOptionsBuilder 配置

EventBusOptionsBuilderAddEventBus 构建服务选项,该选项包含以下属性和方法:

  • 属性
    • ChannelCapacity:默认内存通道容量
    • UnobservedTaskExceptionHandler:订阅执行任务未察觉异常
    • UseUtcTimestamp:是否使用 UTC 事件,bool 类型,默认 false
    • FuzzyMatch:是否开启全局模糊匹配(正则表达式)事件 Idbool 类型,默认 false
    • LogEnabled:是否启用日志输出,bool 类型,默认 true
  • 方法
    • AddSubscriber<TEventSubscriber>:添加订阅者
    • ReplacePublisher<TEventPublisher>:替换发布者
    • ReplaceStorer(Func<IServiceProvider, IEventSourceStorer>):替换存储器
    • AddMonitor<TEventHandlerMonitor>:添加监视器
    • AddExecutor<TEventHandlerExecutor>:添加执行器

22.13 关于高频消息处理方式

在一些高频发送消息的场景如 IoT、日志记录、数据采集,为避免频繁解析服务和创建作用域,可使用 类全局作用域 和所有服务都采取单例的方式:

public class ToDoEventSubscriber : IEventSubscriber, IDisposable{    private readonly ILogger<ToDoEventSubscriber> _logger;    private readonly IServiceScope _serviceScope;    public ToDoEventSubscriber(IServiceProvider serviceProvider        , ILogger<ToDoEventSubscriber> logger)    {        _serviceScope = serviceProvider.CreateScope();        _logger = logger;    }    [EventSubscribe("iot:log")]    public async Task LogFromIoT(EventHandlerExecutingContext context)    {        // 解析服务        var scopedProcessingService = _serviceScope.ServiceProvider.GetRequiredService<IScopedProcessingService>();        // ....    }    /// <summary>    /// 释放服务作用域    /// </summary>    public void Dispose()    {        _serviceScope.Dispose();    }}

22.14 使用第三方事件总线 CAP 示例

builder.Services.AddCap(options =>{    options.UseInMemoryStorage();    options.UseInMemoryMessageQueue();}).AddSubscriberAssembly(App.Assemblies.ToArray());

22.15 反馈与建议

与我们交流

给 Furion 提 Issue

演练场