Jiedao jdon 2021-05-04 12:36:46
stay Apache Kafka Introduction , We studied the distributed streaming media platform Apache Kafka. This time, , We will focus on Reactor Kafka, This library can be created from Project Reactor To Kafka Topics Of Reactive Streams, vice versa .

We'll use two small sample applications ,Paymentprocessor Gateway and PaymentValidator. The code for these applications can be found in here find .

Paymentprocessor The gateway provides a small web page , You can generate a random credit card number ( It's obviously a forgery ), And the amount paid . When the user clicks the submit button , The form will be submitted to API.API Have a point at Kafka Reaction flow of unacknowledged transaction topics on the cluster , The other side of the subject of this unconfirmed transaction is PaymentValidator, Listen for incoming messages to verify . then , These messages go through the response pipeline , The verification method prints it to the command line .

adopt Reactive Streams towards Kafka Send a message

Our application is built on Spring 5 and Spring Boot 2 above , Enables us to quickly set up and use Project Reactor.

Gateway The goal of the application is to set up from Web Controller to Kafka Clustered Reactive flow . This means that we need specific dependencies to webflux and reactor-kafka.


Spring Webflux RestController Offer payment API, by paymentGateway Class doPayment Method to create a Reactive flow .

/ ** 
     * Call returned Mono Will be sent to Spring Webflux, The latter depends on multi-reactor  The cycle of events and NIO 
     * Processing requests in a non blocking manner , So as to achieve more concurrent requests . The result will be
      Through a named Server Sent Events send out .
     ** /
@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
          When calling doPayment When the method is used , We send payment information , get Mono <Void> As a response .
          When our payment is successfully sent to Kafka The theme
         ** / 
        return paymentGateway.doPayment(payment);

paymentGateway Need one kafkaProducer, It enables us to put messages as part of the pipeline in Kafka In the theme . It can be used KafkaSender.create Method is easy to create , Pass many producer options .

 public PaymentGatewayImpl() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);
        kafkaProducer = KafkaSender.create(producerOptions);

After creating ,kafkaProducer Can be used to easily send our messages to the selected Kafka The theme , Become part of the pipeline started in the controller . Because the message is sent nonblocking to Kafka Clustered , So we can use projects Reactor And will come from Web API A large number of concurrent messages are routed to Kafka.

    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
        final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);
        String payload = toBinary(payment);
        SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
        return kafkaProducer.send(Mono.just(message)).next();
    private String toBinary(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);

from Kafka Themes create reaction streams

When there is no consumer monitoring , Sending a message to a topic doesn't make much sense , So our second application will use a reaction pipeline to listen for unacknowledged transaction topics . So , Use KafkaReceiver.create Method creation kafkaReceiver object , Similar to what we created before kafkaProducer Methods .

By using kafkaReceiver.receive Method , We can get receiverRecords Of Flux. In the topic we read, every message goes into receiverRecord in . After flowing into the application , They go further through the reaction tube . then , The messaging processEvent Method , This method calls paymentValidator, This method outputs some information to the console . Last , stay receiverOffset On the call acknowledge Method , towards Kafka The cluster sends an acknowledgement that the message has been processed .

   public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
        this.paymentValidator = paymentValidator;
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
        kafkaReceiver = KafkaReceiver.create(consumerOptions);
         * We create a receiver for new unconfirmed transactions
        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                    final PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.class);
    private void processEvent(PaymentEvent paymentEvent) {
    private <T> T fromBinary(String object, Class<T> resultType) {
        try {
            return objectMapper.readValue(object, resultType);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);

