Spring Cloud Stream事件路由 - spring.io

解道jdon 2021-04-08 12:30:01
spring stream 事件 路由 Cloud


Spring Cloud Stream(SCSt)的事件路由有以下功能:a)将事件路由到特定事件订阅者,或b)将事件订阅者产生的事件路由到特定目的地。

让我们快速看一下基于注释的编程模型中的工作方式。在本文中,我们将其称为路由“ TO”和路由“ FROM”。

为了路由到事件订阅者,我们使用condition了StreamListener注释的属性,如下所示:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}

是有关此方法的更多详细信息。

而且,为了从事件订阅者进行路由,我们使用了动态绑定目标 -这种方法允许框架根据单个事件中提供的某些指令将框架绑定到目标。

具有函数的事件路由

使用函数性方法,我们可以通过一些附加函数以更简洁明了的方式完成上述所有操作。

路由“ TO”

可以通过依赖Spring Cloud Function(SCF)中可用的路由功能来实现路由“ TO”功能。您可以通过设置spring.cloud.stream.function.routing.enabled属性来显式启用路由,也可以通过设置spring.cloud.function.routing-expression属性并使用Spring Expression Language(SpEL)提供路由指令来隐式启用路由。路由指令应导致路由到“ TO”的功能的定义。

对于路由目的,路由目的地的名称是functionRouter-in-0(见RoutingFunction.FUNCTION_NAME和描述的绑定命名约定在这里)。

当一个消息被发送到该目的地,路由功能尝试确定哪些实际功能需要来处理这样的事件。它首先试图访问spring.cloud.function.routing-expression消息报头,并且如果提供,确定实际的函数调用的名称。这是最动态的方法。第二种最动态的方法是提供spring.cloud.function.definition标头,其中应包含将“ TO”路由到的函数的定义。两种方法都需要通过设置spring.cloud.stream.function.routing.enabled属性来明确启用路由功能。

至于以前版本中没有的其他功能,spring.cloud.function.routing-expression也可以用作应用程序属性。例如,请考虑无论传入事件如何,表达式都相同的情况,如本文前面显示的基于注释的示例(例如,spring.cloud.function.routing-expression=headers['type']=='order')。对于这种方法,您无需显式启用路由功能,因为spring.cloud.function.routing-expression作为应用程序属性具有相同的效果。

尽管很简单,但以下是上述方法之一的完整示例:

@SpringBootApplication
public class RoutingStreamApplication {
  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
      "--spring.cloud.function.routing-expression="
      + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }
  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

通过发送消息到functionRouter-in-0,这是由rabbit或kafka绑定暴露的,基于消息系统时间的nanoTime()方法返回值,消息将被路由到Consumer相应的'even'或'odd'

路由“ FROM”

和以前一样,路由“ FROM”依赖于SCSt的“动态绑定目标”功能。但是,与路由“ TO”一样,还有许多其他功能。

以下示例显示了基础知识:

@Autowired
private BinderAwareChannelResolver resolver;
public Consumer<String> send(Message message) {   
     MessageChannel destination = resolver
        .resolveDestination(message.getHeaders().get("type"))
     Message outgoingMessage = . . . // your code
     destination.send(outgoingMessage);
}

您所需要的只是BinderAwareChannelResolver的引用(在后面的示例中自动注入)。然后,您可以使用一些逻辑来确定目标名称(在本例中,我们使用“类型”标头的值)。确定目的地名称后,您可以通过使用该BinderAwareChannelResolver.resolveDestination(..)操作并向其发送消息来获取对其的引用。这就是全部。

上述方法的缺点是某些特定于框架的抽象会泄漏到您的代码中。看一下您需要了解BinderAwareChannelResolver和的MessageChannel事实。实际上,前面示例中的大多数代码都是样板代码。

一种更动态,更少泄漏的方法是依靠spring.cloud.stream.sendto.destination属性,这有效地完成了上述所有操作-但在幕后。下面的示例演示如何使用此方法:

@SpringBootApplication
public class RoutingStreamApplication {
  @Bean
  public Function<Message<String>, Message<String>> process() {
    return message -> {
      // some logic to process incoming message
      Message<String> outgoingMessage = MessageBuilder
        .withPayload("Hello")
        .setHeader("spring.cloud.stream.sendto.destination", "even")
        .build();
       return outgoingMessage;
     };
  }
}

我们不再需要注入BinderAwareChannelResolver执行解析MessageChannel。我们只需创建一个新Message,指定一个头部header:框架使用这个标头即可动态解析目标。

路由源

最后但并非最不重要的一点,让我们看一下路由“ FROM”的另一个流行用例,其中数据源起源于SCSt的上下文之外,但需要路由到适当的目的地:

@Controller
public class SourceWithDynamicDestination {
    @Autowired
    private ObjectMapper jsonMapper;
    private final EmitterProcessor<?> processor = EmitterProcessor.create();
    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, 
      @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) 
      throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destination = payload.get("id");
        Message<?> message =
          MessageBuilder.withPayload(payload)
           .setHeader("spring.cloud.stream.sendto.destination", destination)
           .build();
        processor.onNext(message);
    }
    @Bean
    public Supplier<Flux<?>> source() {
        return () -> processor;
    }
}

然后,我们可以通过运行以下curl命令来查看结果:

curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' http://localhost:8080

在这里,我们借助Supplier<Flux<?>>bean 既使用函数性方法又使用反应式范例的混合。我们有一个简单的MVC控制器,我们希望根据内容的'id'属性值将请求路由到下游。尽管EmitterProcessor此处的详细信息及其用法是另一篇文章的主题,但重要的是它演示了一个功能齐全的应用程序,其中HTTP请求被动态路由到目标绑定程序管理的目的地。

在GitHub上查看Spring Cloud Stream

 

              

版权声明
本文为[解道jdon]所创,转载请带上原文链接,感谢
https://www.jdon.com/53375

  1. [TTS] AIX - & gt; Linux -- Based on RMAN (real environment)
  2. 为什么学编程大部分人选Java编程语言?
  3. Redis 高可用篇:你管这叫 Sentinel 哨兵集群原理
  4. redis 为什么把简单的字符串设计成 SDS?
  5. [TTS] transfer table space AIX - & gt; Linux based on RMAN
  6. Linux 网卡数据收发过程分析
  7. Redis 高可用篇:你管这叫 Sentinel 哨兵集群原
  8. Redis 6.X Cluster 集群搭建
  9. [TTS] transfer table space AIX ASM - & gt; Linux ASM
  10. [TTS] transfer table space Linux ASM - & gt; AIX ASM
  11. 高性能通讯框架——Netty
  12. Brief introduction and test of orchestrator, a high availability management tool for MySQL
  13. [TTS] transfer table space Linux - & gt; AIX based on RMAN
  14. A love diary about http
  15. [rocketmq source code analysis] in depth message storage (3)
  16. Implementation of service configuration center with spring cloud + Nacos (Hoxton version)
  17. SiCp: abstraction of construction process -- object oriented explanation
  18. springboot网上点餐系统
  19. 【SPM】oracle如何固定执行计划
  20. 用好HugePage,告别Linux性能故障
  21. 3 W word long text, java basic interview questions! It's amazing!!!
  22. Spring cloud upgrade road - 2020.0. X - 3. Accesslog configuration of undertow
  23. Win10 uninstall mysql5.7
  24. CentOS下dotnet Core使用HttpWebRequest进行HTTP通讯,系统存在大量CLOSE_WAIT连接问题的分析,已解决。
  25. MySQL batch insert, how not to insert duplicate data?
  26. K8s cronjob application example
  27. Unconventional method, easy to deal with Oracle database critical exception
  28. How to use sqlplus - prelim in Oracle hang
  29. How to search Oracle official documents in full text
  30. Install mysql8.0 on win10
  31. Oracle OCR的备份与恢复
  32. Oracle kill session相关问题
  33. 《Oracle DBA工作笔记》第二章 常用工具和问题分析
  34. Oracle回收站及flashback drop
  35. Hand in hand to teach you to write a spring IOC container
  36. Exception in Java (1) - basic concept
  37. 3w 字长文爆肝 Java 基础面试题!太顶了!!!
  38. Error 2059 when Navicat connects to win10 mysql8.0
  39. Parameter reminder causing Oracle Performance jitter
  40. 「技术分享」Java线程状态间的互相转换看这个就行了
  41. 国产控件短平快,在Java中以编程形式将 XML 转为 Excel
  42. Oracle RAC high availability failure of risk alert
  43. Process scheduling bugs in running oracle on small computers
  44. Oracle memory over consumption risk alert
  45. 【硬核】23种设计模式娓娓道来,助你优雅的编写出漂亮代码!
  46. springboot整合spring security最完整,只看这一篇就够了
  47. Oracle SQL monitor
  48. Using Bifrost to realize data synchronization of MySQL
  49. Reveal the principle of Oracle database truncate
  50. Read this article, Oracle SQL optimization article does not need to read!
  51. IntelliJ IDEA 2021.1 破解,IDEA 2021.1激活破解,激活持续更新
  52. Kafka performance: why so fast?
  53. Two high frequency design interview questions: how to design HashMap and thread pool
  54. Why most people choose Java programming language to learn programming?
  55. Redis high availability: you call this sentinel cluster principle
  56. Why does redis design simple strings as SDS?
  57. Analysis of data sending and receiving process of Linux network card
  58. Redis high availability: what do you call sentinel cluster
  59. Redis 6. X cluster construction
  60. Netty: a high performance communication framework