架构师之路--事件驱动架构设计与实现(05)

张开发
2026/4/10 13:53:54 15 分钟阅读

分享文章

架构师之路--事件驱动架构设计与实现(05)
系列导读本篇将深入讲解事件驱动架构的核心概念、设计模式与最佳实践。文章目录一、事件驱动概述1.1 什么是事件驱动架构1.2 架构对比二、核心概念2.1 事件定义2.2 事件流三、事件模式3.1 事件通知3.2 事件溯源3.3 CQRS四、技术选型4.1 消息中间件对比4.2 选型建议五、代码实战5.1 Spring Event5.2 Kafka 集成总结一、事件驱动概述1.1 什么是事件驱动架构事件驱动架构是一种以事件为核心进行系统通信的架构模式。┌─────────────────────────────────────────────────────────────┐ │ 事件驱动特征 │ ├─────────────────────────────────────────────────────────────┤ │ 松耦合生产者和消费者互不依赖 │ │ ⚡ 异步处理提高系统响应速度 │ │ 可扩展易于添加新的事件消费者 │ │ 可追溯事件日志便于审计和回放 │ └─────────────────────────────────────────────────────────────┘1.2 架构对比模式说明同步调用请求-响应模式强耦合异步消息生产者-消费者模式松耦合事件驱动发布-订阅模式完全解耦二、核心概念2.1 事件定义// 事件基类publicabstractclassDomainEvent{privatefinalStringeventId;privatefinalLocalDateTimeoccurredOn;protectedDomainEvent(){this.eventIdUUID.randomUUID().toString();this.occurredOnLocalDateTime.now();}}// 订单创建事件publicclassOrderCreatedEventextendsDomainEvent{privatefinalStringorderId;privatefinalStringcustomerId;privatefinalBigDecimaltotalAmount;publicOrderCreatedEvent(StringorderId,StringcustomerId,BigDecimaltotalAmount){super();this.orderIdorderId;this.customerIdcustomerId;this.totalAmounttotalAmount;}}2.2 事件流┌─────────┐ 事件 ┌─────────┐ 事件 ┌─────────┐ │ 生产者 │ ────────► │ 通道 │ ────────► │ 消费者 │ └─────────┘ └─────────┘ └─────────┘ │ ▼ ┌─────────┐ │ 存储 │ └─────────┘三、事件模式3.1 事件通知// 简单事件通知ServicepublicclassOrderService{AutowiredprivateApplicationEventPublishereventPublisher;publicvoidcreateOrder(Orderorder){// 业务逻辑orderRepository.save(order);// 发布事件eventPublisher.publishEvent(newOrderCreatedEvent(order));}}// 事件监听ComponentpublicclassNotificationListener{EventListenerpublicvoidonOrderCreated(OrderCreatedEventevent){// 发送通知notificationService.send(event.getCustomerId(),订单创建成功);}}3.2 事件溯源// 事件存储publicclassEventStore{publicvoidappend(StringaggregateId,ListDomainEventevents){events.forEach(event-{EventEntryentrynewEventEntry(aggregateId,event.getClass().getName(),JSON.toJSONString(event),LocalDateTime.now());eventRepository.save(entry);});}publicListDomainEventloadEvents(StringaggregateId){returneventRepository.findByAggregateId(aggregateId).stream().map(this::deserialize).collect(Collectors.toList());}}// 聚合重建publicclassOrder{publicstaticOrderfromEvents(ListDomainEventevents){OrderordernewOrder();events.forEach(order::apply);returnorder;}privatevoidapply(DomainEventevent){if(eventinstanceofOrderCreatedEvent){applyOrderCreated((OrderCreatedEvent)event);}elseif(eventinstanceofOrderItemAddedEvent){applyOrderItemAdded((OrderItemAddedEvent)event);}}}3.3 CQRS┌─────────────────────────────────────────────────────────────┐ │ CQRS 架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Command Side (写) Query Side (读) │ │ ┌─────────┐ ┌─────────┐ │ │ │ Command │ │ Query │ │ │ │ Handler │ │ Handler │ │ │ └────┬────┘ └────┬────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ │ │ │ Write │ │ Read │ │ │ │ Model │ │ Model │ │ │ └────┬────┘ └────┬────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ │ │ │ Write DB│ │ Read DB │ │ │ └─────────┘ └─────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘四、技术选型4.1 消息中间件对比中间件吞吐量延迟适用场景Kafka百万级ms级日志、大数据RabbitMQ万级μs级业务消息RocketMQ十万级ms级金融、交易Pulsar百万级ms级云原生4.2 选型建议高吞吐、大数据 → Kafka 低延迟、可靠性 → RabbitMQ 金融级、事务 → RocketMQ 云原生、多租户 → Pulsar五、代码实战5.1 Spring Event// 事件定义publicclassOrderPaidEventextendsApplicationEvent{privatefinalStringorderId;privatefinalBigDecimalamount;publicOrderPaidEvent(Objectsource,StringorderId,BigDecimalamount){super(source);this.orderIdorderId;this.amountamount;}}// 事件发布ServicepublicclassOrderService{AutowiredprivateApplicationEventPublishereventPublisher;publicvoidpayOrder(StringorderId,BigDecimalamount){// 业务逻辑orderRepository.updateStatus(orderId,OrderStatus.PAID);// 发布事件eventPublisher.publishEvent(newOrderPaidEvent(this,orderId,amount));}}// 事件监听ComponentpublicclassOrderEventListener{EventListenerAsyncpublicvoidonOrderPaid(OrderPaidEventevent){// 更新积分pointsService.addPoints(event.getOrderId(),event.getAmount());// 发送通知notificationService.sendOrderPaidNotification(event.getOrderId());}}5.2 Kafka 集成// 生产者配置ConfigurationpublicclassKafkaProducerConfig{BeanpublicProducerFactoryString,StringproducerFactory(){MapString,ObjectconfignewHashMap();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,localhost:9092);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);returnnewDefaultKafkaProducerFactory(config);}BeanpublicKafkaTemplateString,StringkafkaTemplate(){returnnewKafkaTemplate(producerFactory());}}// 消费者配置ConfigurationEnableKafkapublicclassKafkaConsumerConfig{BeanpublicConsumerFactoryString,StringconsumerFactory(){MapString,ObjectconfignewHashMap();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,localhost:9092);config.put(ConsumerConfig.GROUP_ID_CONFIG,order-group);config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);returnnewDefaultKafkaConsumerFactory(config);}}// 消息发送ServicepublicclassOrderEventPublisher{AutowiredprivateKafkaTemplateString,StringkafkaTemplate;publicvoidpublishOrderCreated(Orderorder){StringmessageJSON.toJSONString(order);kafkaTemplate.send(order-created,order.getId(),message);}}// 消息消费ComponentpublicclassOrderEventConsumer{KafkaListener(topicsorder-created,groupIdinventory-group)publicvoidhandleOrderCreated(Stringmessage){OrderorderJSON.parseObject(message,Order.class);inventoryService.reserveStock(order);}}总结✅事件驱动概述定义、特征、优势✅核心概念事件、通道、存储✅事件模式通知、溯源、CQRS✅技术选型Kafka、RabbitMQ、RocketMQ✅代码实战Spring Event、Kafka 集成本系列完结作者刘~浪地球系列架构设计五更新时间2026-04-10

更多文章