安徽股票配资网站建设,百度关键词搜索怎么收费,wordpress调用评论数,金华专业做网站公司编排式 Saga 模式#xff08;Orchestrated Saga#xff09;是指由一个中央协调者#xff08;Orchestrator#xff09;控制多个服务间的事务执行。与协作式 Saga 模式不同#xff0c;编排式 Saga 模式不依赖于事件驱动#xff0c;而是通过协调者来控制整个 Saga 流程的执行…编排式 Saga 模式Orchestrated Saga是指由一个中央协调者Orchestrator控制多个服务间的事务执行。与协作式 Saga 模式不同编排式 Saga 模式不依赖于事件驱动而是通过协调者来控制整个 Saga 流程的执行。协调者负责调用各个参与服务确保每个子事务按顺序执行并在某个子事务失败时触发补偿操作。
编排式 Saga 模式实现步骤
我们将实现一个基于编排式 Saga 模式的跨服务数据保存方案使用 Spring Boot 来开发微服务使用 RabbitMQ 或 Kafka 作为消息队列进行通信并通过一个中央协调者来管理整个 Saga 流程。
以下是如何使用编排式 Saga 模式来实现一个典型的跨服务操作订单创建和库存扣减。
1. 架构设计
我们有两个微服务
Order Service负责创建订单。Inventory Service负责管理库存。Saga Orchestrator Service协调整个 Saga 流程包括执行各服务事务并在失败时触发补偿操作。
2. 技术栈
Spring Boot用于开发微服务。Spring Cloud用于服务注册、发现和治理。Spring AMQP / Kafka用于服务间消息传递可选择 RabbitMQ 或 Kafka。Spring Data JPA用于数据库操作。Transactional Outbox Pattern用来确保跨服务操作的一致性。
3. 系统流程
Order Service接收创建订单请求调用 Saga Orchestrator Service 开始 Saga 流程。Saga Orchestrator协调 Inventory Service 扣减库存等到确认成功后继续后续操作如创建订单。Inventory Service接收扣减库存请求执行库存扣减如果成功通知 Saga Orchestrator。如果失败则触发补偿操作。补偿操作如果任何一个服务的事务失败Saga Orchestrator 会调用补偿操作回滚之前的事务确保最终一致性。
4. Spring Boot 示例实现
4.1 创建 Order Service
Order Service 负责处理订单请求并与 Saga Orchestrator 配合触发 Saga 流程。
// OrderService.java
Service
public class OrderService {Autowiredprivate SagaOrchestrator sagaOrchestrator;// 创建订单Transactionalpublic void createOrder(Order order) {// Step 1: 创建订单orderRepository.save(order);// Step 2: 调用 Saga Orchestrator 开始整个流程sagaOrchestrator.startSaga(order);}
}
4.2 创建 Saga Orchestrator Service
Saga Orchestrator Service 是整个 Saga 模式的核心它负责协调各个服务之间的事务执行。首先它会启动 Saga 事务接着协调 Inventory Service 执行库存扣减操作并处理补偿操作。
// SagaOrchestrator.java
Service
public class SagaOrchestrator {Autowiredprivate InventoryService inventoryService;Autowiredprivate OrderRepository orderRepository;// 启动 Saga 流程Transactionalpublic void startSaga(Order order) {try {// Step 1: 调用库存服务扣减库存boolean inventorySuccess inventoryService.decreaseInventory(order.getItemId(), order.getQuantity());if (!inventorySuccess) {throw new Exception(Inventory insufficient);}// Step 2: 库存扣减成功后继续创建订单order.setStatus(Created);orderRepository.save(order);} catch (Exception e) {// Step 3: 如果出错执行补偿操作compensate(order);}}// 补偿方法回滚库存操作private void compensate(Order order) {// 回滚库存增加库存inventoryService.rollbackInventory(order.getItemId(), order.getQuantity());// 回滚订单设置订单为失败状态order.setStatus(Failed);orderRepository.save(order);}
}
4.3 创建 Inventory Service
Inventory Service 负责扣减库存并通知 Saga Orchestrator 执行后续操作。
// InventoryService.java
Service
public class InventoryService {Autowiredprivate InventoryRepository inventoryRepository;// 扣减库存Transactionalpublic boolean decreaseInventory(Long itemId, int quantity) {Inventory inventory inventoryRepository.findByItemId(itemId);if (inventory.getStock() quantity) {return false; // 库存不足}inventory.setStock(inventory.getStock() - quantity);inventoryRepository.save(inventory);return true; // 库存扣减成功}// 补偿操作回滚库存Transactionalpublic void rollbackInventory(Long itemId, int quantity) {Inventory inventory inventoryRepository.findByItemId(itemId);inventory.setStock(inventory.getStock() quantity); // 恢复库存inventoryRepository.save(inventory);}
}
4.4 消息队列RabbitMQ 或 Kafka集成
为了实现 Saga 模式的跨服务通信我们可以使用消息队列来传递消息。这里我们使用 RabbitMQ 作为消息队列。
在 application.properties 中配置 RabbitMQ
spring.rabbitmq.hostlocalhost
spring.rabbitmq.port5672
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest
spring.rabbitmq.virtual-host/
在 SagaOrchestrator 和 InventoryService 中我们可以通过 RabbitTemplate 来发送和接收消息。
// SagaOrchestrator.java
Autowired
private RabbitTemplate rabbitTemplate;// 启动 Saga 流程时发送事件
public void startSaga(Order order) {// 发送一个消息通知库存服务处理库存rabbitTemplate.convertAndSend(inventoryExchange, inventory.decrease, order);
}// 监听库存扣减消息的回调
RabbitListener(queues inventory.decrease.queue)
public void handleInventoryDecrease(Order order) {try {// 扣减库存并继续订单处理boolean inventorySuccess inventoryService.decreaseInventory(order.getItemId(), order.getQuantity());if (!inventorySuccess) {throw new Exception(Inventory insufficient);}// 订单处理继续order.setStatus(Created);orderRepository.save(order);} catch (Exception e) {// 执行补偿操作compensate(order);}
}
4.5 设置消息队列的交换机和队列
Configuration
public class RabbitMQConfig {Beanpublic TopicExchange inventoryExchange() {return new TopicExchange(inventoryExchange);}Beanpublic Queue inventoryDecreaseQueue() {return new Queue(inventory.decrease.queue);}Beanpublic Binding inventoryDecreaseBinding() {return BindingBuilder.bind(inventoryDecreaseQueue()).to(inventoryExchange()).with(inventory.decrease);}Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);return rabbitTemplate;}
}
5. 确保最终一致性
在编排式 Saga 模式中每个服务通过本地事务来保证操作的原子性并通过协调者来确保每个子事务执行成功。当某个服务失败时协调者会触发补偿操作回滚之前的操作。关键要素是
补偿操作服务必须提供回滚或补偿机制确保在失败时能够撤销已完成的事务。 幂等性补偿操作应该是幂等的确保多次执行不会产生不一致的结果。
6. 总结
编排式 Saga 模式通过中央协调者来管理跨服务事务确保最终一致性和数据可靠性。使用 RabbitMQ 或 Kafka 进行服务间的消息通信可以将系统解耦提高扩展性。在这种模式下协调者充当了服务之间的桥梁负责事务流的管理并在必要时执行补偿操作。