微服务模式:Spring Boot + Kafka的业务流程Saga模式 - vinsguru

解道jdon 2020-11-09 00:38:42
spring 服务 boot 微服 模式


多年来,微服务已变得非常流行。微服务是分布式系统。它们更小,模块化,易于部署和扩展等。开发单个微服务应用程序可能会很有趣!但是处理跨越多个微服务的业务交易并不好玩!MicroService体系结构具有特定的职责。为了完成应用程序工作流程/任务,可能需要多个MicroServices一起工作。

让我们看看本文中在分布式系统中处理事务/数据一致性有多困难。

假设我们的业务规则说,当用户下订单时,如果产品的价格在用户的信用限额/余额之内并且该产品的库存可用,则订单将得到满足。否则将无法实现。看起来真的很简单。这在整体应用中非常容易实现。整个工作流程可以视为1个单事务。当所有内容都在单个数据库中时,提交/回滚很容易。对于具有多个数据库的分布式系统,这将非常复杂!首先让我们看一下我们的架构,看看如何实现它。

我们在下面的微服务中拥有自己的数据库。

  • 订购服务
  • 付款服务
  • 库存服务

当订单服务收到新订单的请求时,它必须与付款服务和库存服务进行核对。我们扣除付款,库存并最终完成订单!如果我们扣除付款但没有库存,会发生什么?如何回滚?涉及多个数据库很难。

传奇Saga模式

通常,在所有微服务之间处理事务和维护数据一致性很困难。当涉及多种服务时,例如付款,库存,欺诈检查,运输检查…..etc等,如果没有协调员,将很难通过多个步骤来管理如此复杂的工作流程。通过为协调员引入单独的服务,订单服务摆脱了这些多余责任。我们也没有引入任何循环依赖。

此处检查项目源代码。

跨越多个微服务的每个业务交易都被分成特定于微服务的本地交易,并按顺序执行它们以完成业务工作流程。它被称为佐贺。它可以通过两种方式实现。

  • 编舞Choreography 方法
  • 编排Orchestration 方法

在本文中,我们将讨论基于Orchestration的传奇Saga。

在这种模式下,我们将有一个协调器,一个单独的服务,它将协调所有微服务之间的所有事务。如果一切正常,它将使订单请求完成,否则将其标记为已取消。

让我们看看如何实现这一点。我们的示例架构将或多或少像这样!

  • 在此演示中,协调器与其他服务之间的通信将是一个简单的HTTP,以一种非阻塞的异步方式来使其无状态。
  • 我们也可以使用Kafka主题进行交流。为此,我们必须使用分散/聚集模式,该模式更像是有状态的样式。

Order协调器

这是一个微服务,负责协调所有事务。它侦听订单创建的主题。当创建新订单时,它会立即为每个服务(如付款服务/库存服务等)建立单独的请求,并验证响应。如果可以,请执行订单。如果其中之一不是,则取消定单。它还尝试重置任何微服务中发生的任何本地事务。

我们将所有本地交易视为1个单一工作流程。一个工作流程将包含多个工作流程步骤。

  • 工作流程步骤

public interface WorkflowStep {
    WorkflowStepStatus getStatus();
    Mono<Boolean> process();
    Mono<Boolean> revert();
}

  • 工作流程

public interface Workflow {
    List<WorkflowStep> getSteps();
}

  • 在本例中,对于“订购”工作流,我们有2个步骤。每个实现都应该知道如何进行本地事务以及如何重置。
  • 库存步骤需要继承实现WorkflowStep接口

public class InventoryStep implements WorkflowStep {
    private final WebClient webClient;
    private final InventoryRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
    public InventoryStep(WebClient webClient, InventoryRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }
    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }
    @Override
    public Mono<Boolean> process() {
        return this.webClient
                .post()
                .uri("/inventory/deduct")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(InventoryResponseDTO.class)
                .map(r -> r.getStatus().equals(InventoryStatus.AVAILABLE))
                .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }
    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                    .post()
                    .uri("/inventory/add")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(Void.class)
                    .map(r ->true)
                    .onErrorReturn(false);
    }
}

  • 付款步骤也是要实现接口中处理和回退两个步骤

public class PaymentStep implements WorkflowStep {
    private final WebClient webClient;
    private final PaymentRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
    public PaymentStep(WebClient webClient, PaymentRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }
    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }
    @Override
    public Mono<Boolean> process() {
        return this.webClient
                    .post()
                    .uri("/payment/debit")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(PaymentResponseDTO.class)
                    .map(r -> r.getStatus().equals(PaymentStatus.PAYMENT_APPROVED))
                    .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }
    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                .post()
                .uri("/payment/credit")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(Void.class)
                .map(r -> true)
                .onErrorReturn(false);
    }
}

  • 服务/协调员

@Service
public class OrchestratorService {
    @Autowired
    @Qualifier("payment")
    private WebClient paymentClient;
    @Autowired
    @Qualifier("inventory")
    private WebClient inventoryClient;
    public Mono<OrchestratorResponseDTO> orderProduct(final OrchestratorRequestDTO requestDTO){
        Workflow orderWorkflow = this.getOrderWorkflow(requestDTO);
        return Flux.fromStream(() -> orderWorkflow.getSteps().stream())
                .flatMap(WorkflowStep::process)
                .handle(((aBoolean, synchronousSink) -> {
                    if(aBoolean)
                        synchronousSink.next(true);
                    else
                        synchronousSink.error(new WorkflowException("create order failed!"));
                }))
                .then(Mono.fromCallable(() -> getResponseDTO(requestDTO, OrderStatus.ORDER_COMPLETED)))
                .onErrorResume(ex -> this.revertOrder(orderWorkflow, requestDTO));
    }
    private Mono<OrchestratorResponseDTO> revertOrder(final Workflow workflow, final OrchestratorRequestDTO requestDTO){
        return Flux.fromStream(() -> workflow.getSteps().stream())
                .filter(wf -> wf.getStatus().equals(WorkflowStepStatus.COMPLETE))
                .flatMap(WorkflowStep::revert)
                .retry(3)
                .then(Mono.just(this.getResponseDTO(requestDTO, OrderStatus.ORDER_CANCELLED)));
    }
    private Workflow getOrderWorkflow(OrchestratorRequestDTO requestDTO){
        WorkflowStep paymentStep = new PaymentStep(this.paymentClient, this.getPaymentRequestDTO(requestDTO));
        WorkflowStep inventoryStep = new InventoryStep(this.inventoryClient, this.getInventoryRequestDTO(requestDTO));
        return new OrderWorkflow(List.of(paymentStep, inventoryStep));
    }
    private OrchestratorResponseDTO getResponseDTO(OrchestratorRequestDTO requestDTO, OrderStatus status){
        OrchestratorResponseDTO responseDTO = new OrchestratorResponseDTO();
        responseDTO.setOrderId(requestDTO.getOrderId());
        responseDTO.setAmount(requestDTO.getAmount());
        responseDTO.setProductId(requestDTO.getProductId());
        responseDTO.setUserId(requestDTO.getUserId());
        responseDTO.setStatus(status);
        return responseDTO;
    }
    private PaymentRequestDTO getPaymentRequestDTO(OrchestratorRequestDTO requestDTO){
        PaymentRequestDTO paymentRequestDTO = new PaymentRequestDTO();
        paymentRequestDTO.setUserId(requestDTO.getUserId());
        paymentRequestDTO.setAmount(requestDTO.getAmount());
        paymentRequestDTO.setOrderId(requestDTO.getOrderId());
        return paymentRequestDTO;
    }
    private InventoryRequestDTO getInventoryRequestDTO(OrchestratorRequestDTO requestDTO){
        InventoryRequestDTO inventoryRequestDTO = new InventoryRequestDTO();
        inventoryRequestDTO.setUserId(requestDTO.getUserId());
        inventoryRequestDTO.setProductId(requestDTO.getProductId());
        inventoryRequestDTO.setOrderId(requestDTO.getOrderId());
        return inventoryRequestDTO;
    }
}

有关完整的源码,请在此处下载。

 

                   

版权声明
本文为[解道jdon]所创,转载请带上原文链接,感谢
https://www.jdon.com/55292

  1. Microservice mode: business process saga mode of spring boot + Kafka - vinsguru
  2. Linux / MacOS 修改 ls 显示年月日的时间格式
  3. Linux / MacOS modify the time format of LS display date
  4. 7、Spring Boot检索
  5. 7. Spring boot search
  6. 7、Spring Boot检索
  7. 7. Spring boot search
  8. 10、Spring Boot分布式
  9. 10. Spring boot distributed
  10. Did you know that artifact can also manage SUSE Linux system dependencies
  11. 3、Spring Cloud Rest工程建立(通過IDEA建立)
  12. 3. Spring cloud rest project establishment (through idea)
  13. 4、Spring Cloud Eureka
  14. 4、Spring Cloud Eureka
  15. Spring Boot+Redis主备模式源码教程 - vinsguru
  16. Did you know that artifact can also manage SUSE Linux system dependencies
  17. k8s搭建zk&pulsar集群——三节点
  18. 7、Spring Cloud Hystrix
  19. Dubbo series -- source code analysis of local service publishing
  20. Netty启动对HTTP/3的孵化器支持
  21. Netty launches incubator support for HTTP / 3
  22. Spring Webflux security configuration tutorial and source code - vinsguru
  23. Spring Boot的微服务分散聚集模式教程与源码 - vinsguru
  24. WebClient: Spring的新的HTTP反应式客户端 - spring.io
  25. Webclient: spring's new HTTP reactive client- spring.io
  26. 使用Project Reactor进行反应式数据流 - spring.io
  27. Using project reactor for reactive data flow- spring.io
  28. Spring IO 2019大会上Axon+Spring的事件驱动微服务和CQRS源码项目
  29. 使用JPA和Hibernate将查询结果映射到DTO的最佳方法 - Vlad Mihalcea
  30. Humor: k8s is not so hard to deploy on kubernetes personal blog
  31. UNIX domain socket tutorial for java16 - nipafx
  32. Axion + spring's event driven microservice and cqrs source code project at spring IO 2019
  33. Using netty to realize the source code and tutorial of http2 server / client baeldung
  34. Using spring boot reactive r2dbc to realize crud operation source code of PostgreSQL Rajesh
  35. Spring Boot Liquibase基础教程 - josdem
  36. Vlad mihalcea: the best way to map query results to dto using JPA and Hibernate
  37. Using java enumeration in spring boot to automatically serialize parameters and save database - Dario
  38. In depth study of spring cloud load balancer - Piotr
  39. How to use completable future in springboot 2
  40. Basic course of spring boot liquibase - josdem
  41. Volatile side effect in Java: not using CPU cache
  42. Using redis PubSub and spring boot to realize microservice message model vinsguru
  43. Oracle vs. PostgreSQL: PostgreSQL at least beats Oracle - Foerster in terms of installation and size
  44. Spring Webflux and reactive programming
  45. Fault tolerant and reliable messaging of Apache Kafka and spring boot – Arnold galovics
  46. Java exception and tuning one stop solution
  47. Using playwright to implement automatic visual testing for Java API - applitools
  48. Oracle database official statement does not support VMWare
  49. [function] Oracle function series (1) -- character function
  50. [function] Oracle function series (1) -- character function
  51. Oracle character set simple diagram, Chinese garbled solution
  52. Oracle lock series
  53. No.2 process kthread of Linux kernel learning
  54. [password] Oracle user password series
  55. [Linux] migrate Oracle installation directory from root directory to logical volume
  56. Oracle 12cr1 RAC installation on VMware Workstation (middle) - graphical interface installation
  57. Driving of Oracle hint_ SITE
  58. [Architecture] collation of Oracle SCN knowledge points
  59. [Architecture] collation of Oracle SCN knowledge points
  60. Linux -- nginx service, NFS service, nginx reverse proxy, three web servers