通过Spring Boot Webflux实现Reactor Kafka

解道jdon 2021-05-04 12:34:53
spring 实现 boot reactor webflux


Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka。这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然。

我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator。这些应用程序的代码可以在这里找到。

Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的),以及支付金额。当用户单击提交按钮时,表单将提交给网关的API。API具有针对Kafka群集上的未确认事务主题的反应流,这个未确认事务的主题的另外一边消费者是PaymentValidator,监听要验证的传入消息。然后,这些消息通过响应管道,验证方法将其打印到命令行。

通过Reactive Streams向Kafka发送消息

我们的应用程序构建在Spring 5和Spring Boot 2之上,使我们能够快速设置和使用Project Reactor。

Gateway应用程序的目标是设置从Web控制器到Kafka集群的Reactive流。这意味着我们需要特定的依赖关系来弹簧webflux和reactor-kafka。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController提供支付API,为paymentGateway类的doPayment方法创建一个Reactive流。

/ ** 
     *调用返回的Mono将被发送到Spring Webflux,后者依赖于multi-reactor 事件循环和NIO 
     *以非阻塞方式处理请求,从而实现更多的并发请求。结果将
     通过一个名为Server Sent Events 发送。
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
         当调用doPayment方法时,我们发送付款信息,获得Mono <Void>作为响应。
         当我们的付款成功发送事件到Kafka主题
         ** / 
        return paymentGateway.doPayment(payment);
    }

paymentGateway需要一个kafkaProducer,它使我们能够将消息作为管道的一部分放在Kafka主题中。它可以使用KafkaSender.create方法轻松创建,传递许多生产者选项。

 public PaymentGatewayImpl() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
        kafkaProducer = KafkaSender.create(producerOptions);
    }

创建之后,kafkaProducer可以用来轻松地将我们的消息发送到选择的Kafka主题,成为控制器中启动的管道的一部分。因为消息是以非阻塞方式发送到Kafka集群的,所以我们可以使用项目Reactor的事件循环接收并将来自Web API的大量并发消息路由到Kafka。

 @Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
        final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
        String payload = toBinary(payment);
        SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
        return kafkaProducer.send(Mono.just(message)).next();
    }
    private String toBinary(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);
        }
    }

从Kafka主题创建反应流

当没有消费者监听时,向主题发送消息没有多大意义,因此我们的第二个应用程序将使用一个反应管道来监听未确认的事务主题。为此,使用KafkaReceiver.create方法创建kafkaReceiver对象,类似于我们之前创建kafkaProducer的方法。

通过使用kafkaReceiver.receive方法,我们可以获得receiverRecords的Flux。进入我们读取的主题中每条消息都放入receiverRecord中。流入应用程序后,它们会进一步通过反应管道。然后,这些消息传递processEvent方法,该方法调用paymentValidator,该方法将一些信息输出到控制台。最后,在receiverOffset上调用acknowledge方法,向Kafka集群发送一条消息已被处理的确认。

   public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
        this.paymentValidator = paymentValidator;
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("unconfirmed-transactions"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
        kafkaReceiver = KafkaReceiver.create(consumerOptions);
        /**
         * We create a receiver for new unconfirmed transactions
         */
        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    /**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */
                    final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
                    processEvent(paymentEvent);
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }
    private void processEvent(PaymentEvent paymentEvent) {
        paymentValidator.calculateResult(paymentEvent);
    }
    private <T> T fromBinary(String object, Class<T> resultType) {
        try {
            return objectMapper.readValue(object, resultType);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

可以在此处找到此示例的代码

版权声明
本文为[解道jdon]所创,转载请带上原文链接,感谢
https://www.jdon.com/52083

  1. Spring 3中异步方法调用
  2. AOP相关讨论
  3. jf能支持的表现层目前只有struts 1.x么?
  4. 在j2ee中实现一般java对象数据库的方法。
  5. FTP connecting windows and Linux
  6. Decorator design pattern - gene zeiniss
  7. Asynchronous method call in spring 3
  8. Discussion on AOP
  9. Is struts 1. X the only presentation layer supported by JF?
  10. The method of realizing general Java object database in J2EE.
  11. PDF转HTML工具——用springboot包装pdf2htmlEX命令行工具
  12. Pdf to HTML tool -- Wrapping pdf2htmlex command line tool with springboot
  13. MySQL 的 in 查询不走索引?我拿什么拯救你!
  14. MySQL in query does not go index? What can I do to save you!
  15. PDF转HTML工具——用springboot包装pdf2htmlEX命令行工具
  16. Pdf to HTML tool -- Wrapping pdf2htmlex command line tool with springboot
  17. Java小白入门必学!最全数据类型和运算符笔记,附实例
  18. Java Xiaobai introduction must learn! Notes on the most complete data types and operators, with examples
  19. Spring MVC请求与响应
  20. Spring MVC request and response
  21. Java 11已经不再完全免费,不要陷入Oracle的Java 11陷阱
  22. Vue.js比jQuery更容易学习
  23. 启动/删除Docker容器时出现问题 - 如何修复
  24. eclipse run on server时出现了错误信息.求急!!
  25. 请教高手一个关于lunce的问题:java.io.IOException: Cannot rename ...\segments.new
  26. Java 11 is no longer completely free. Don't fall into the Java 11 trap of Oracle
  27. Vue. JS is easier to learn than jQuery
  28. Problem starting / deleting docker container - how to fix it
  29. There is an error message in eclipse run on server!!
  30. Ask a question about lunce: java.io.ioexception: cannot rename... \ segments.new
  31. 从零搭建Spring Boot脚手架(2):集成mybatis
  32. 从零搭建Spring Boot脚手架(4):手写Mybatis通用Mapper
  33. 只知道java反射,宁知道内省吗?
  34. Build spring boot scaffold from scratch (2): integrate mybatis
  35. Build spring boot scaffold from scratch (4): handwritten mybatis general mapper
  36. Do you prefer introspection to reflection?
  37. ASP调用SDK微信分享好友、朋友圈
  38. ASP calls SDK wechat to share friends and circle of friends
  39. BAT 必问的 MySQL 面试题你都会吗?
  40. Do you know all the MySQL interview questions that bat must ask?
  41. ASP调用SDK微信分享好友、朋友圈
  42. ASP calls SDK wechat to share friends and circle of friends
  43. SpringCloud(六)Bus消息总线
  44. 详解JavaScript中的正则表达式
  45. Springcloud (6) bus message bus
  46. Explain regular expressions in JavaScript
  47. Java 响应式关系数据库连接了解一下
  48. Java14它真的来了, 真是尾气都吃不到了
  49. 视频:使用Docker搭建RabbitMQ环境
  50. Java responsive relational database connection
  51. Java14 it's really coming. I can't eat the exhaust
  52. Video: building rabbitmq environment with docker
  53. SpringCloud(六)Bus消息总线
  54. 详解JavaScript中的正则表达式
  55. Springcloud (6) bus message bus
  56. Explain regular expressions in JavaScript
  57. Docker实战:用docker-compose搭建Laravel开发环境
  58. Docker: building laravel development environment with docker compose
  59. 求助,JAVA如何获取系统当前所有进程
  60. 有人用过JMeter或用HttpUnit写过测试吗????