Microservice mode: business process saga mode of spring boot + Kafka - vinsguru

On jdon 2020-11-09 00:38:42
microservice mode business process saga


these years , Microservices have become very popular . Microservices are distributed systems . They're smaller , modularization , Easy to deploy and expand . Developing a single microservice application can be interesting ! But it's not fun to deal with business transactions that span multiple microservices !MicroService Architecture has specific responsibilities . To complete the application workflow / Mission , It may take more than one MicroServices Working together .

Let's take a look at processing transactions in a distributed system in this article / How difficult is data consistency .

Suppose our business rules say , When a user places an order , If the price of the product is within the user's credit limit / Balance and the inventory of the product is available , Then the order will be satisfied . Otherwise, it will not be possible . It looks really simple . This is very easy to implement in the overall application . The whole workflow can be seen as 1 A single transaction . When everything is in a single database , Submit / It's easy to roll back . For distributed systems with multiple databases , It's going to be very complicated ! Let's first take a look at our architecture , See how to do it .

We have our own database in the following microservices .

  • Ordering services
  • Payment services
  • Inventory service

When the order service receives a request for a new order , It has to be checked against payment services and inventory services . We deduct the payment , Stock and finish the order ! If we deduct payment but don't have stock , What's going to happen ? How to roll back ? It's hard to involve multiple databases .

legend Saga Pattern

Usually , It's difficult to process transactions and maintain data consistency across all microservices . When multiple services are involved , For example, payment , stock , Fraud checks , Transportation inspection …..etc etc. , If there is no coordinator , It will be difficult to manage such a complex workflow through multiple steps . By introducing a separate service for the coordinator , Order services get rid of these redundant responsibilities . We didn't introduce any circular dependencies either .

stay here Check the project source code .

Each business transaction across multiple microservices is divided into local transactions specific to microservices , And execute them in order to complete the business workflow . It's called saga . It can be achieved in two ways .

  • Choreographer Choreography  Method
  • layout Orchestration  Method

In this paper , We're going to discuss based on Orchestration The legend of Saga.

In this mode , We're going to have a coordinator , A separate service , It will coordinate all transactions between all microservices . If everything goes well , It will make the order request complete , Otherwise, mark it as cancelled .

Let's see how this can be done . Our example architecture will be more or less like this !

  • In this demonstration , Communication between the coordinator and other services will be simple HTTP, Make it stateless in a non blocking asynchronous way .
  • We can also use Kafka The theme of communication . So , We have to use dispersion / Aggregation mode , The pattern is more like a stateful style .

Order The coordinator

It's a microservice , Responsible for coordinating all matters . It listens to the subject of the order creation . When creating a new order , It's going to serve everyone immediately ( Such as payment service / Inventory service, etc ) Create a separate request , And verify the response . If possible , Please execute the order . If one of them is not , Then cancel the order . It also tries to reset any local transactions that occur in any microservice .

We see all local transactions as 1 A single workflow . A workflow will contain multiple workflow steps .

  • Workflow steps

public interface WorkflowStep {
    WorkflowStepStatus getStatus();
    Mono<Boolean> process();
    Mono<Boolean> revert();
}

  • Workflow

public interface Workflow {
    List<WorkflowStep> getSteps();
}

  • In this case , about “ Order ” workflow , We have 2 A step . Every implementation should know how to do local transactions and how to reset .
  • The inventory step needs to be inherited WorkflowStep Interface

public class InventoryStep implements WorkflowStep {
    private final WebClient webClient;
    private final InventoryRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
    public InventoryStep(WebClient webClient, InventoryRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }
    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }
    @Override
    public Mono<Boolean> process() {
        return this.webClient
                .post()
                .uri("/inventory/deduct")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(InventoryResponseDTO.class)
                .map(r -> r.getStatus().equals(InventoryStatus.AVAILABLE))
                .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }
    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                    .post()
                    .uri("/inventory/add")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(Void.class)
                    .map(r ->true)
                    .onErrorReturn(false);
    }
}

  • The payment step is also to realize the two steps of processing and rollback in the interface

public class PaymentStep implements WorkflowStep {
    private final WebClient webClient;
    private final PaymentRequestDTO requestDTO;
    private WorkflowStepStatus stepStatus = WorkflowStepStatus.PENDING;
    public PaymentStep(WebClient webClient, PaymentRequestDTO requestDTO) {
        this.webClient = webClient;
        this.requestDTO = requestDTO;
    }
    @Override
    public WorkflowStepStatus getStatus() {
        return this.stepStatus;
    }
    @Override
    public Mono<Boolean> process() {
        return this.webClient
                    .post()
                    .uri("/payment/debit")
                    .body(BodyInserters.fromValue(this.requestDTO))
                    .retrieve()
                    .bodyToMono(PaymentResponseDTO.class)
                    .map(r -> r.getStatus().equals(PaymentStatus.PAYMENT_APPROVED))
                    .doOnNext(b -> this.stepStatus = b ? WorkflowStepStatus.COMPLETE : WorkflowStepStatus.FAILED);
    }
    @Override
    public Mono<Boolean> revert() {
        return this.webClient
                .post()
                .uri("/payment/credit")
                .body(BodyInserters.fromValue(this.requestDTO))
                .retrieve()
                .bodyToMono(Void.class)
                .map(r -> true)
                .onErrorReturn(false);
    }
}

  • service / Coordinator

@Service
public class OrchestratorService {
    @Autowired
    @Qualifier("payment")
    private WebClient paymentClient;
    @Autowired
    @Qualifier("inventory")
    private WebClient inventoryClient;
    public Mono<OrchestratorResponseDTO> orderProduct(final OrchestratorRequestDTO requestDTO){
        Workflow orderWorkflow = this.getOrderWorkflow(requestDTO);
        return Flux.fromStream(() -> orderWorkflow.getSteps().stream())
                .flatMap(WorkflowStep::process)
                .handle(((aBoolean, synchronousSink) -> {
                    if(aBoolean)
                        synchronousSink.next(true);
                    else
                        synchronousSink.error(new WorkflowException("create order failed!"));
                }))
                .then(Mono.fromCallable(() -> getResponseDTO(requestDTO, OrderStatus.ORDER_COMPLETED)))
                .onErrorResume(ex -> this.revertOrder(orderWorkflow, requestDTO));
    }
    private Mono<OrchestratorResponseDTO> revertOrder(final Workflow workflow, final OrchestratorRequestDTO requestDTO){
        return Flux.fromStream(() -> workflow.getSteps().stream())
                .filter(wf -> wf.getStatus().equals(WorkflowStepStatus.COMPLETE))
                .flatMap(WorkflowStep::revert)
                .retry(3)
                .then(Mono.just(this.getResponseDTO(requestDTO, OrderStatus.ORDER_CANCELLED)));
    }
    private Workflow getOrderWorkflow(OrchestratorRequestDTO requestDTO){
        WorkflowStep paymentStep = new PaymentStep(this.paymentClient, this.getPaymentRequestDTO(requestDTO));
        WorkflowStep inventoryStep = new InventoryStep(this.inventoryClient, this.getInventoryRequestDTO(requestDTO));
        return new OrderWorkflow(List.of(paymentStep, inventoryStep));
    }
    private OrchestratorResponseDTO getResponseDTO(OrchestratorRequestDTO requestDTO, OrderStatus status){
        OrchestratorResponseDTO responseDTO = new OrchestratorResponseDTO();
        responseDTO.setOrderId(requestDTO.getOrderId());
        responseDTO.setAmount(requestDTO.getAmount());
        responseDTO.setProductId(requestDTO.getProductId());
        responseDTO.setUserId(requestDTO.getUserId());
        responseDTO.setStatus(status);
        return responseDTO;
    }
    private PaymentRequestDTO getPaymentRequestDTO(OrchestratorRequestDTO requestDTO){
        PaymentRequestDTO paymentRequestDTO = new PaymentRequestDTO();
        paymentRequestDTO.setUserId(requestDTO.getUserId());
        paymentRequestDTO.setAmount(requestDTO.getAmount());
        paymentRequestDTO.setOrderId(requestDTO.getOrderId());
        return paymentRequestDTO;
    }
    private InventoryRequestDTO getInventoryRequestDTO(OrchestratorRequestDTO requestDTO){
        InventoryRequestDTO inventoryRequestDTO = new InventoryRequestDTO();
        inventoryRequestDTO.setUserId(requestDTO.getUserId());
        inventoryRequestDTO.setProductId(requestDTO.getProductId());
        inventoryRequestDTO.setOrderId(requestDTO.getOrderId());
        return inventoryRequestDTO;
    }
}

About the complete source code , please Here, download .

 

                   

版权声明
本文为[On jdon]所创,转载请带上原文链接,感谢

  1. Linux / MacOS 修改 ls 显示年月日的时间格式
  2. Linux / MacOS modify the time format of LS display date
  3. 7、Spring Boot检索
  4. 7. Spring boot search
  5. 7、Spring Boot检索
  6. 7. Spring boot search
  7. 10、Spring Boot分布式
  8. 10. Spring boot distributed
  9. Did you know that artifact can also manage SUSE Linux system dependencies
  10. 3、Spring Cloud Rest工程建立(通過IDEA建立)
  11. 3. Spring cloud rest project establishment (through idea)
  12. 4、Spring Cloud Eureka
  13. 4、Spring Cloud Eureka
  14. Did you know that artifact can also manage SUSE Linux system dependencies
  15. k8s搭建zk&pulsar集群——三节点
  16. 7、Spring Cloud Hystrix
  17. Dubbo series -- source code analysis of local service publishing
  18. Netty启动对HTTP/3的孵化器支持
  19. Netty launches incubator support for HTTP / 3
  20. Spring Webflux security configuration tutorial and source code - vinsguru
  21. WebClient: Spring的新的HTTP反应式客户端 - spring.io
  22. Webclient: spring's new HTTP reactive client- spring.io
  23. 使用Project Reactor进行反应式数据流 - spring.io
  24. Using project reactor for reactive data flow- spring.io
  25. Humor: k8s is not so hard to deploy on kubernetes personal blog
  26. UNIX domain socket tutorial for java16 - nipafx
  27. Using netty to realize the source code and tutorial of http2 server / client baeldung
  28. Using spring boot reactive r2dbc to realize crud operation source code of PostgreSQL Rajesh
  29. Spring Boot Liquibase基础教程 - josdem
  30. Vlad mihalcea: the best way to map query results to dto using JPA and Hibernate
  31. Using java enumeration in spring boot to automatically serialize parameters and save database - Dario
  32. In depth study of spring cloud load balancer - Piotr
  33. How to use completable future in springboot 2
  34. Basic course of spring boot liquibase - josdem
  35. Volatile side effect in Java: not using CPU cache
  36. Using redis PubSub and spring boot to realize microservice message model vinsguru
  37. Fault tolerant and reliable messaging of Apache Kafka and spring boot – Arnold galovics
  38. Java exception and tuning one stop solution
  39. Using playwright to implement automatic visual testing for Java API - applitools
  40. Oracle database official statement does not support VMWare
  41. [function] Oracle function series (1) -- character function
  42. [function] Oracle function series (1) -- character function
  43. Oracle character set simple diagram, Chinese garbled solution
  44. Oracle lock series
  45. No.2 process kthread of Linux kernel learning
  46. [password] Oracle user password series
  47. [Linux] migrate Oracle installation directory from root directory to logical volume
  48. Oracle 12cr1 RAC installation on VMware Workstation (middle) - graphical interface installation
  49. Driving of Oracle hint_ SITE
  50. Oracle 12cr1 RAC installation on VMware Workstation (middle) - graphical interface installation
  51. [knowledge point sorting] comparison of redo, undo and execution speed under nologing, append, archive and parallel in Oracle
  52. Introduction to Oracle parameters
  53. [method] how to limit IP access to Oracle database
  54. Oracle 12cr1 RAC installation on VMware Workstation (middle) - graphical interface installation
  55. Linux free - M command
  56. Oracle fast recovery area (FRA)
  57. Docker install MySQL 5.7
  58. Oracle 12C EM Express Series
  59. [DG] Oracle 19C uses DBCA to build physical DG
  60. Oracle 19C uses DBCA to build physical DG