(Blog.Core框架开发情况,着色部分为本次新增)
终于项目继续迭代更新了,在开源这两年多,也是感谢每一个支持Blog.Core项目的同学,同时也感谢每一个在生产环境中使用,并提出意见和建议的小伙伴,2,606个Star,是我们相互之间共同的努力和肯定,上边的这些都是我和各位使用者提出的需求,刚开始很快,越是到后边,开发起来越难,这里先说明几点问题:
1、增加的东西太多,有一部分使用者表示使用不到,太笨重;
2、目前基本比较全面,后期新增需求难度系数较高;
3、功能太多了,不好抄代码;🙃
不过该更新的还是需要更新的,我已经很贴心的把各部分的代码隔开了,就差每个功能建立一个类库了,这个我也考虑过,不过那要是建立起来,就是二三十个,果断放弃了。因为代码已经隔开了,如果自己不需要,可以删除掉,当然这样也方便其他不使用我框架的粘贴复制到自己项目。
今年终于在年末的时候,增加上了RabbitMQ消息队列和EventBus事件总线,之前新增过Redis的消息队列,基于Redis很方便且很简单的一个InitQ组件,具体请看《【BCVP】实现基于 Redis 的消息队列》,然后,大家应该都知道,最近我一直在录制一个系列视频教程——《eShopOnContainer微服务系列讲解》,里边最重要的就是事件总线,基于的也正好是RabbitMQ的分布式消息队列组件,当然其中的订单微服务也用到了MediatR作为进程内的订阅发布模式,这个MediatR我在DDD系列中已经讲过,就不说了,这次就重点说说RabbitMQ和EventBus吧,也正好属于俩个系列的串烧了。这里说一下,我是从eshop代码里拷贝出来做讲解的,当然做了适当修改,还是要多关注官方,支持原作者:
https://github.com/dotnet-architecture/eShopOnContainers。
此外,热烈欢迎支付组件的合作者,如果你正在开发支付相关组件,可以联系我,一起推广开发,一起造福社区,也可以入驻BCVP开发者社区。
OK,今天就先简单的给大家先说下思路,以下每一个小节其实都可以写一篇或多篇文章的,本文就当个系列文章导读吧,详细讲解以后会有,主要就是关于RabbitMQ消息队列和EventBus事件总线的。
Producer:消息生产者,负责产生和发送消息到Broker。
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个Queue。
Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。
最后,系统B和系统C根据自己的能够处理的请求数去消息队列中拿数据,这样即便有每秒有8000个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩:
(图片来源于知乎/question/54152397)
当然消息队列,也有些坏处,这里就先随便列几个,其他的大家自行搜索即可:
1、高可用:如果使用消息队列,基本要配合集群的,因为如果MQ服务器崩了,那就整个服务灾难了。
2、数据安全:必须保证数据不能丢失,也就是要考虑好最终一致性,做好补偿机制。
3、合理的消费。
好啦,基本概念先说到这里,下边就简单说下代码吧,因为篇幅的问题,我们只统一讲解接口的设计,毕竟实现类是比较复杂的,当然,我会抽一个实现类的核心方法说一下。
首先说下关于RabbitMQ的连接,这个是很简单的,和平时我们使用SqlServer/Redis这种第三方组件是类似的,通过连接字符串(或者说是服务器),然后配置用户名/密码,就能连接上了,相关的接口是这样的:
/// <summary>
/// RabbitMQ持久连接
/// 接口
/// </summary>
public interface IRabbitMQPersistentConnection
: IDisposable
{
bool IsConnected { get; }
bool TryConnect();
IModel CreateModel();
}
接口一共提供了三个方法,分别是是否连接、尝试连接、创建模型。
使用的时候,首先需要连接nuget包:
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Polly" Version="7.2.1" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
其实只需要第三个RabbitMQ.Client就行了,前边两个是辅助作用,分别是提供序列化和重试机制的,如果你有一个需求是需要重试的,比如连接数据库或者执行某个进程,如果遇到异常,重试几次,可以使用组件Polly,它还有其他的功能,自己可以多尝试下。
那说到了重试,我就说一下,TryConnect(); 这个核心的方法:
/// <summary>
/// 连接
/// </summary>
/// <returns></returns>
public bool TryConnect()
{
_logger.LogInformation("RabbitMQ Client is trying to connect");
// 加锁
lock (sync_root)
{
// 重试策略
var policy = RetryPolicy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(_retryCount,
retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message);
}
);
// 执行策略
policy.Execute(() =>
{
// 开始连接RabbitMQ
_connection = _connectionFactory
.CreateConnection();
});
// 连接成功
if (IsConnected)
{
// 追加事件处理器,目的是为了异常重试,共3种情况
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);
return true;
}
else
{
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false;
}
}
}
相应的逻辑我已经在代码中,增加了注释,过程肯定能看得懂,至于真实的底层原理,这里先不说了。
可以看到上边就用到了重试机制,可以配置策略。这样就可以连接上RabbitMQ服务器了,那如何基于这个连接做事件总线呢,别着急,咱们先说下什么是事件和事件处理器。
void btnRegister_Click(object sender, EventArgs e)
object sender
指代发出事件的对象,这里也就是button对象;EventArgs e事件参数,可以理解为对事件的描述 ,它们可以统称为事件源。其中的代码逻辑,就是对事件的处理。我们可以统称为事件处理程序。所以:事件有两部分=事件源对象+事件处理器程序。
那我们平时肯定会遇到很多很多的事件:
注册的时候,校验成功后持久化到数据库,然后发注册成功的邮件。
支付的时候,判断成功后,修改数据库订单,库存,物流,邮件,短信,等等等等,这都是一个个的事件。
那如何对这些事件进行统一的管理呢,单体下很简单,就是按照过程走就行了,分布式或者微服务中,多个服务已经隔离开,无法按照过程一步步走,那这个时候就需要一个策略,常用的就是——订阅发布模式,事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
我们用代码来简单看看如何设计事件和事件处理器:
/// <summary>
/// 事件模型
/// 基类
/// </summary>
public class IntegrationEvent
{
public IntegrationEvent()
{
Id = Guid.NewGuid();
CreationDate = DateTime.UtcNow;
}
[ ]
public IntegrationEvent(Guid id, DateTime createDate)
{
Id = id;
CreationDate = createDate;
}
[ ]
public Guid Id { get; private set; }
[ ]
public DateTime CreationDate { get; private set; }
}
事件是一个对象,是一个模型,那很重要的标识,就是Id和Date这两个属性了,当然也可以适当做其他的一些处理,请注意private set; 的写法。
/// <summary>
/// 集成事件处理程序
/// 泛型接口
/// </summary>
/// <typeparam name="TIntegrationEvent"></typeparam>
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
where TIntegrationEvent : IntegrationEvent
{
Task Handle(TIntegrationEvent );
}
/// <summary>
/// 集成事件处理程序
/// 基 接口
/// </summary>
public interface IIntegrationEventHandler
{
}
对事件的处理比较简单的,我们定义接口,只需要一个Handle方法即可,剩下的就是我们定义一个一个的具体的事件处理器,通过继承这个接口,来实现具体的业务逻辑。
比如我这里定义了一个例子,关于博客删除的,当然可能不太贴切,我只是想举个例子:
/// <summary>
/// 博客删除事件处理器
/// 删除博客后触发
/// </summary>
public class BlogDeletedIntegrationEventHandler : IIntegrationEventHandler<BlogDeletedIntegrationEvent>
{
private readonly IBlogArticleServices _blogArticleServices;
private readonly ILogger<BlogDeletedIntegrationEventHandler> _logger;
public BlogDeletedIntegrationEventHandler(
IBlogArticleServices blogArticleServices,
ILogger<BlogDeletedIntegrationEventHandler> logger)
{
_blogArticleServices = blogArticleServices;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task Handle(BlogDeletedIntegrationEvent )
{
_logger.LogInformation("----- Handling integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", .Id, "Blog.Core", );
ConsoleHelper.WriteSuccessLine($"----- Handling integration event: {@event.Id} at Blog.Core - ({@event})");
await _blogArticleServices.DeleteById( .BlogId.ToString());
}
}
当我执行删除的时候,不去执行,而是放到队列里,通过订阅发布的模式,每一个订阅者来消费信息,从而实现解耦的目的。
现在明白了事件和处理器,那如何对这是事件操作,怎么发布,又是如何订阅呢?事件总线就这么出现了,请往下看。
上边我们已经连接好了RabbitMQ服务器,也明白了什么是事件和处理器,现在就是需要发布和订阅了,总线是一个很好的方案,那设计下接口,就是这样的:
/// <summary>
/// 事件总线
/// 接口
/// </summary>
public interface IEventBus
{
/// <summary>
/// 发布
/// </summary>
/// <param name="event">事件模型</param>
void Publish(IntegrationEvent @event);
/// <summary>
/// 订阅
/// </summary>
/// <typeparam name="T">约束:事件模型</typeparam>
/// <typeparam name="TH">约束:事件处理器<事件模型></typeparam>
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
/// <summary>
/// 取消订阅
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TH"></typeparam>
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
/// <summary>
/// 动态订阅
/// </summary>
/// <typeparam name="TH">约束:事件处理器</typeparam>
/// <param name="eventName"></param>
void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
/// <summary>
/// 动态取消订阅
/// </summary>
/// <typeparam name="TH"></typeparam>
/// <param name="eventName"></param>
void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
}
这里定义了基本的常见操作,如何实现这个接口,可以针对不同的方案,既然我们使用了RabbitMQ,就说说它,当然你也可以使用其他的,比如AzureService之类的。
基于RabbitMQ的事件总线实现类比较复杂,我就不多说明了,感兴趣的可以直接看我的代码,我这里就说一下构造函数,从构造函数中,可以知道,当前类的依赖项,毕竟现在都是使用依赖注入了:
/// <summary>
/// RabbitMQ事件总线
/// </summary>
/// <param name="persistentConnection">RabbitMQ持久连接</param>
/// <param name="logger">日志</param>
/// <param name="autofac">autofac容器</param>
/// <param name="subsManager">事件总线订阅管理器</param>
/// <param name="queueName">队列名称</param>
/// <param name="retryCount">重试次数</param>
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
ILifetimeScope autofac,
IEventBusSubscriptionsManager subsManager,
string queueName = null,
int retryCount = 5)
{
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
_queueName = queueName;
_consumerChannel = CreateConsumerChannel();
_autofac = autofac;
_retryCount = retryCount;
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
}
除了比较常规的日志、RabbitMQ连接、Autofac容器、Polly重试这几个比较基础和必要的,还有一个参数是很重要的——IEventBusSubscriptionsManager subsManager 。
/// <summary>
/// 事件总线订阅管理器
/// 接口
/// </summary>
public interface IEventBusSubscriptionsManager
{
bool IsEmpty { get; }
event EventHandler<string> OnEventRemoved;
void AddDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void AddSubscription<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void RemoveSubscription<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
void RemoveDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
bool HasSubscriptionsForEvent(string eventName);
Type GetEventTypeByName(string eventName);
void Clear();
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
string GetEventKey<T>();
}
基本到这里就没啥问题了,核心的几个知识点也讲完了,当然,仅仅是讲完了,其中的知识点量,要远比这个多的多,剩下的可以看看效果。
public static void AddRabbitMQSetup(this IServiceCollection services)
{
if (services == null) throw new ArgumentNullException(nameof(services));
if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool())
{
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Appsettings.app(new string[] { "RabbitMQ", "Connection" }),
DispatchConsumersAsync = true
};
if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "UserName" })))
{
factory.UserName = Appsettings.app(new string[] { "RabbitMQ", "UserName" });
}
if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "Password" })))
{
factory.Password = Appsettings.app(new string[] { "RabbitMQ", "Password" });
}
var retryCount = 5;
if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })))
{
retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }));
}
return new RabbitMQPersistentConnection(factory, logger, retryCount);
});
}
}
public static void AddEventBusSetup(this IServiceCollection services)
{
if (services == null) throw new ArgumentNullException(nameof(services));
if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool())
{
var subscriptionClientName = Appsettings.app(new string[] { "EventBus", "SubscriptionClientName" });
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<BlogDeletedIntegrationEventHandler>();
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })))
{
retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }));
}
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
}
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<BlogDeletedIntegrationEvent, BlogDeletedIntegrationEventHandler>();
我们尝试一下,发送一个事件到总线里:
[ ]
[ ]
public void EventBusTry([FromServices] IEventBus _eventBus, string blogId = "1")
{
var blogDeletedEvent = new BlogDeletedIntegrationEvent(blogId);
_eventBus.Publish(blogDeletedEvent);
}
动图效果如下:
是不是很简单,好啦,暂时就先到这里,打完手工。
发表评论 取消回复