外卖 CPS 佣金结算系统:Java 分布式事务处理与数据一致性保障

张开发
2026/4/3 18:14:18 15 分钟阅读
外卖 CPS 佣金结算系统:Java 分布式事务处理与数据一致性保障
外卖 CPS 佣金结算系统Java 分布式事务处理与数据一致性保障在外卖 CPS按销售付费与霸王餐业务中资金安全是系统的生命线。当用户完成核销并产生佣金时系统往往需要同时操作多个微服务更新订单状态服务Order-Service、增加用户余额服务Account-Service以及生成财务流水服务Finance-Service。如果在更新订单状态为“已结算”后由于网络抖动或服务宕机导致余额增加失败就会出现资损——即平台付出了商品成本却未扣除推广佣金或者用户拿到了佣金但订单状态异常。对于baodanbao.com.cn这类涉及资金流转的平台必须通过严谨的分布式事务方案来保证数据的最终一致性。本文将基于Seata AT 模式与本地消息表Local Message Table策略结合 Java Spring Boot 实战解决 CPS 结算中的分布式事务难题。一、 CPS 结算场景的事务分析在霸王餐佣金结算流程中典型的业务逻辑如下核销监听美团/饿了么回调核销 Webhook。计算佣金根据订单金额计算 CPS 佣金。更新订单将t_order表的状态更新为“已结算”并记录结算金额。发放佣金调用Account-Service增加用户的balance并记录t_account_log。其中步骤 3 和步骤 4 必须保持原子性。如果步骤 3 成功而步骤 4 失败或者反之都会导致账务不平。传统的数据库本地事务无法跨服务生效因此必须引入分布式事务解决方案。二、 基于 Seata AT 模式的强一致性方案Seata 是阿里巴巴开源的高性能微服务分布式事务解决方案。AT 模式Automatic Transaction Mode对业务代码侵入性极低它通过解析 SQL 自动生成反向的UNDO_LOG在全局事务回滚时利用这些日志进行补偿。数据库准备在每个参与事务的微服务数据库中必须创建undo_log表这是 Seata AT 模式工作的基石。-- 各微服务数据库通用 (MySQL)CREATETABLEundo_log(idbigint(20)NOTNULLAUTO_INCREMENT,branch_idbigint(20)NOTNULL,xidvarchar(100)NOTNULL,contextvarchar(128)NOTNULL,rollback_infolongblobNOTNULL,log_statusint(11)NOTNULL,log_createddatetimeNOTNULL,log_modifieddatetimeNOTNULL,extvarchar(100)DEFAULTNULL,PRIMARYKEY(id),UNIQUEKEYux_undo_log(xid,branch_id))ENGINEInnoDBAUTO_INCREMENT1DEFAULTCHARSETutf8;全局事务入口结算服务在结算服务的入口方法上添加GlobalTransactional注解。该注解会开启一个全局事务并生成全局唯一的 XID。packagebaodanbao.com.cn.settlement.service;importbaodanbao.com.cn.settlement.dto.SettlementDTO;importbaodanbao.com.cn.settlement.feign.AccountClient;importio.seata.spring.annotation.GlobalTransactional;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;/** * 结算服务 * 处理霸王餐订单的佣金结算逻辑 * author baodanbao.com.cn */ServicepublicclassSettlementService{AutowiredprivateOrderMapperorderMapper;// 订单MapperAutowiredprivateAccountClientaccountClient;// 调用账户服务的Feign Client/** * 执行结算任务 * GlobalTransactional 注解开启Seata分布式事务 */GlobalTransactionalpublicvoiddoSettlement(SettlementDTOdto){try{// 1. 更新本地订单状态 (Branch Transaction 1)// Seata 会拦截此SQL记录Before Image和After Image到UNDO_LOGorderMapper.updateStatusToSettled(dto.getOrderId(),dto.getCommission());// 2. 远程调用账户服务增加余额 (Branch Transaction 2)// 假设此处发生网络超时或异常accountClient.addBalance(dto.getUserId(),dto.getCommission(),霸王餐结算);}catch(Exceptione){// 3. 如果发生异常Seata TCTransaction Coordinator会通知所有分支事务回滚// 利用UNDO_LOG将订单状态恢复并撤销账户服务的更改thrownewRuntimeException(结算失败触发全局回滚,e);}}}三、 基于本地消息表的最终一致性方案异步解耦虽然 Seata AT 模式很好但在高并发的 CPS 结算场景下强一致性可能会锁住数据库行记录影响吞吐量。对于允许短暂延迟的场景如结算后几分钟到账我们采用本地消息表Local Message Table方案利用 RocketMQ 或 RabbitMQ 实现最终一致性。核心思想是将分布式事务拆分为本地事务 消息发送利用本地数据库的事务特性保证“业务操作与消息发送”的原子性。本地消息表结构在业务数据库中创建一张消息表用于暂存待发送的消息。-- 本地消息表CREATETABLElocal_message(idbigint(20)NOTNULLAUTO_INCREMENT,msg_idvarchar(64)NOTNULLCOMMENT消息唯一ID,msg_bodytextNOTNULLCOMMENT消息内容(JSON),topicvarchar(64)NOTNULLCOMMENTMQ Topic,statustinyint(4)DEFAULT0COMMENT状态: 0-待发送, 1-已发送,create_timedatetimeDEFAULTCURRENT_TIMESTAMP,update_timedatetimeDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,PRIMARYKEY(id),UNIQUEKEYuk_msg_id(msg_id))ENGINEInnoDBDEFAULTCHARSETutf8mb4;生产者本地事务落库在同一个本地数据库事务中更新订单状态并插入一条消息记录。如果插入消息失败整个事务回滚保证了数据一致性。packagebaodanbao.com.cn.settlement.producer;importbaodanbao.com.cn.settlement.entity.LocalMessage;importbaodanbao.com.cn.settlement.mapper.LocalMessageMapper;importbaodanbao.com.cn.settlement.mapper.OrderMapper;importcom.alibaba.fastjson.JSON;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjava.util.Map;/** * 结算消息生产者 * 使用本地消息表保证事务 * author baodanbao.com.cn */ServicepublicclassSettlementProducer{AutowiredprivateOrderMapperorderMapper;AutowiredprivateLocalMessageMappermessageMapper;AutowiredprivateKafkaTemplateString,StringkafkaTemplate;/** * 提交结算任务 * 在同一个事务中操作业务表和消息表 */Transactional(rollbackForException.class)publicvoidsubmitSettlement(LongorderId,LonguserId,BigDecimalcommission){// 1. 更新订单状态为“待结算”orderMapper.updateById(newOrder(orderId,PENDING_SETTLE));// 2. 构建消息体MapString,ObjectmsgBodyMap.of(userId,userId,amount,commission,orderId,orderId,bizType,CPS_SETTLE);// 3. 插入本地消息表 (关键与更新订单在同一个事务)LocalMessagemessagenewLocalMessage();message.setMsgId(SETTLE_orderId);message.setMsgBody(JSON.toJSONString(msgBody));message.setTopic(TOPIC_CPS_SETTLE);message.setStatus(0);// 待发送messageMapper.insert(message);// 如果到这里没有异常事务提交。消息表记录成功写入。}}定时任务补偿与消费由于网络原因消息可能发送失败。我们需要一个定时任务扫描local_message表中状态为“待发送”的记录进行重试。packagebaodanbao.com.cn.settlement.task;importbaodanbao.com.cn.settlement.entity.LocalMessage;importbaodanbao.com.cn.settlement.mapper.LocalMessageMapper;importcom.alibaba.fastjson.JSON;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importjava.util.List;/** * 本地消息表扫描任务 * 定时重试发送失败的消息 * author baodanbao.com.cn */ComponentpublicclassLocalMessageTask{AutowiredprivateLocalMessageMappermessageMapper;AutowiredprivateKafkaTemplateString,StringkafkaTemplate;/** * 每隔5分钟扫描一次待发送的消息 */Scheduled(fixedRate300000)publicvoidresendFailedMessages(){// 查询状态为0待发送的消息ListLocalMessagemessagesmessageMapper.selectByStatus(0);for(LocalMessagemsg:messages){try{// 发送消息到MQkafkaTemplate.send(msg.getTopic(),msg.getMsgBody());// 2. 发送成功更新本地消息状态为1已发送msg.setStatus(1);messageMapper.updateById(msg);}catch(Exceptione){// 发送失败记录日志下次继续重试// 注意这里不更新状态或者更新为重试次数1防止死循环System.err.println(消息发送失败稍后重试: msg.getMsgId());}}}}四、 消费者端的幂等性处理在 CPS 结算系统中由于网络抖动或 MQ 重试机制消费者可能会收到重复的消息。如果不做处理会导致用户佣金被重复发放Double Spending。解决方案是在消费者端建立一个t_settlement_record表记录已经处理过的msg_id。packagebaodanbao.com.cn.settlement.consumer;importbaodanbao.com.cn.settlement.entity.SettlementRecord;importbaodanbao.com.cn.settlement.mapper.SettlementRecordMapper;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importorg.springframework.transaction.annotation.Transactional;/** * 结算消息消费者 * 处理佣金发放逻辑 * author baodanbao.com.cn */ComponentpublicclassSettlementConsumer{AutowiredprivateSettlementRecordMapperrecordMapper;// 记录已处理的消息IDAutowiredprivateAccountServiceaccountService;// 账户服务KafkaListener(topicsTOPIC_CPS_SETTLE)Transactional(rollbackForException.class)publicvoidhandleSettlement(Stringmessage){// 1. 解析消息// JSONObject json JSON.parseObject(message);// String msgId json.getString(msgId);// 2. 幂等性校验查询是否已处理SettlementRecordrecordrecordMapper.selectById(msgId);if(record!null){// 如果已存在直接返回防止重复消费System.out.println(该结算消息已处理忽略重复消息: msgId);return;}// 3. 执行业务逻辑增加余额// accountService.addBalance(...);// 4. 记录本次处理的MsgId (关键与业务操作在同一个事务)recordMapper.insert(newSettlementRecord(msgId,SUCCESS));// 如果业务逻辑抛出异常整个事务回滚MsgId也不会被记录下次MQ重试时会再次尝试}}本文著作权归 俱美开放平台 转载请注明出处

更多文章