Realization of reactor Kafka through spring boot Webflux

Jiedao jdon 2021-05-04 12:36:46
realization reactor kafka spring boot


stay Apache Kafka Introduction , We studied the distributed streaming media platform Apache Kafka. This time, , We will focus on Reactor Kafka, This library can be created from Project Reactor To Kafka Topics Of Reactive Streams, vice versa .

We'll use two small sample applications ,Paymentprocessor Gateway and PaymentValidator. The code for these applications can be found in here find .

Paymentprocessor The gateway provides a small web page , You can generate a random credit card number ( It's obviously a forgery ), And the amount paid . When the user clicks the submit button , The form will be submitted to API.API Have a point at Kafka Reaction flow of unacknowledged transaction topics on the cluster , The other side of the subject of this unconfirmed transaction is PaymentValidator, Listen for incoming messages to verify . then , These messages go through the response pipeline , The verification method prints it to the command line .

adopt Reactive Streams towards Kafka Send a message

Our application is built on Spring 5 and Spring Boot 2 above , Enables us to quickly set up and use Project Reactor.

Gateway The goal of the application is to set up from Web Controller to Kafka Clustered Reactive flow . This means that we need specific dependencies to webflux and reactor-kafka.

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController Offer payment API, by paymentGateway Class doPayment Method to create a Reactive flow .

/ ** 
     * Call returned Mono Will be sent to Spring Webflux, The latter depends on multi-reactor  The cycle of events and NIO 
     * Processing requests in a non blocking manner , So as to achieve more concurrent requests . The result will be
      Through a named Server Sent Events send out .
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
          When calling doPayment When the method is used , We send payment information , get Mono <Void> As a response .
          When our payment is successfully sent to Kafka The theme
         ** / 
        return paymentGateway.doPayment(payment);
    }

paymentGateway Need one kafkaProducer, It enables us to put messages as part of the pipeline in Kafka In the theme . It can be used KafkaSender.create Method is easy to create , Pass many producer options .

 public PaymentGatewayImpl() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
        kafkaProducer = KafkaSender.create(producerOptions);
    }

After creating ,kafkaProducer Can be used to easily send our messages to the selected Kafka The theme , Become part of the pipeline started in the controller . Because the message is sent nonblocking to Kafka Clustered , So we can use projects Reactor And will come from Web API A large number of concurrent messages are routed to Kafka.

 @Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
        final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
        String payload = toBinary(payment);
        SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
        return kafkaProducer.send(Mono.just(message)).next();
    }
    private String toBinary(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);
        }
    }

from Kafka Themes create reaction streams

When there is no consumer monitoring , Sending a message to a topic doesn't make much sense , So our second application will use a reaction pipeline to listen for unacknowledged transaction topics . So , Use KafkaReceiver.create Method creation kafkaReceiver object , Similar to what we created before kafkaProducer Methods .

By using kafkaReceiver.receive Method , We can get receiverRecords Of Flux. In the topic we read, every message goes into receiverRecord in . After flowing into the application , They go further through the reaction tube . then , The messaging processEvent Method , This method calls paymentValidator, This method outputs some information to the console . Last , stay receiverOffset On the call acknowledge Method , towards Kafka The cluster sends an acknowledgement that the message has been processed .

   public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
        this.paymentValidator = paymentValidator;
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("unconfirmed-transactions"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
        kafkaReceiver = KafkaReceiver.create(consumerOptions);
        /**
         * We create a receiver for new unconfirmed transactions
         */
        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */
                    final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
                    processEvent(paymentEvent);
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }
    private void processEvent(PaymentEvent paymentEvent) {
        paymentValidator.calculateResult(paymentEvent);
    }
    private <T> T fromBinary(String object, Class<T> resultType) {
        try {
            return objectMapper.readValue(object, resultType);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

Can be in here Find the code for this example

版权声明
本文为[Jiedao jdon]所创,转载请带上原文链接,感谢
https://javamana.com/2021/05/20210504123224092M.html

  1. Microservice mode: business process saga mode of spring boot + Kafka - vinsguru
  2. Linux / MacOS 修改 ls 显示年月日的时间格式
  3. Linux / MacOS modify the time format of LS display date
  4. 7、Spring Boot检索
  5. 7. Spring boot search
  6. 7、Spring Boot检索
  7. 7. Spring boot search
  8. 10、Spring Boot分布式
  9. 10. Spring boot distributed
  10. Did you know that artifact can also manage SUSE Linux system dependencies
  11. 3、Spring Cloud Rest工程建立(通過IDEA建立)
  12. 3. Spring cloud rest project establishment (through idea)
  13. 4、Spring Cloud Eureka
  14. 4、Spring Cloud Eureka
  15. Did you know that artifact can also manage SUSE Linux system dependencies
  16. k8s搭建zk&pulsar集群——三节点
  17. 7、Spring Cloud Hystrix
  18. Dubbo series -- source code analysis of local service publishing
  19. Netty启动对HTTP/3的孵化器支持
  20. Netty launches incubator support for HTTP / 3
  21. Spring Webflux security configuration tutorial and source code - vinsguru
  22. WebClient: Spring的新的HTTP反应式客户端 - spring.io
  23. Webclient: spring's new HTTP reactive client- spring.io
  24. 使用Project Reactor进行反应式数据流 - spring.io
  25. Using project reactor for reactive data flow- spring.io
  26. Humor: k8s is not so hard to deploy on kubernetes personal blog
  27. UNIX domain socket tutorial for java16 - nipafx
  28. Using netty to realize the source code and tutorial of http2 server / client baeldung
  29. Using spring boot reactive r2dbc to realize crud operation source code of PostgreSQL Rajesh
  30. Spring Boot Liquibase基础教程 - josdem
  31. Vlad mihalcea: the best way to map query results to dto using JPA and Hibernate
  32. Using java enumeration in spring boot to automatically serialize parameters and save database - Dario
  33. In depth study of spring cloud load balancer - Piotr
  34. How to use completable future in springboot 2
  35. Basic course of spring boot liquibase - josdem
  36. Volatile side effect in Java: not using CPU cache
  37. Using redis PubSub and spring boot to realize microservice message model vinsguru
  38. Fault tolerant and reliable messaging of Apache Kafka and spring boot – Arnold galovics
  39. Java exception and tuning one stop solution
  40. Using playwright to implement automatic visual testing for Java API - applitools
  41. Oracle database official statement does not support VMWare
  42. [function] Oracle function series (1) -- character function
  43. [function] Oracle function series (1) -- character function
  44. Oracle character set simple diagram, Chinese garbled solution
  45. Oracle lock series
  46. No.2 process kthread of Linux kernel learning
  47. [password] Oracle user password series
  48. [Linux] migrate Oracle installation directory from root directory to logical volume
  49. Oracle 12cr1 RAC installation on VMware Workstation (middle) - graphical interface installation
  50. Driving of Oracle hint_ SITE
  51. Oracle 12cr1 RAC installation on VMware Workstation (middle) - graphical interface installation
  52. [knowledge point sorting] comparison of redo, undo and execution speed under nologing, append, archive and parallel in Oracle
  53. Introduction to Oracle parameters
  54. [method] how to limit IP access to Oracle database
  55. Oracle 12cr1 RAC installation on VMware Workstation (middle) - graphical interface installation
  56. Linux free - M command
  57. Oracle fast recovery area (FRA)
  58. Docker install MySQL 5.7
  59. Oracle 12C EM Express Series
  60. [DG] Oracle 19C uses DBCA to build physical DG