2.Spring Cloud Stream-灵析社区

没晒干的咸鱼

一、Spring Cloud Stream 是一套基于消息的事件驱动开发框架。

Spring Cloud Stream 在 Spring Integration 项目的基础上再进行了一些封装,提出一些新的概念,让开发者能够更简单地使用这套消息编程模型。如图所示,这是三者之间的关系:

如下图所示,这是 Spring Cloud Stream 的编程模型。通过 RabbitMQ Binder 构建 input Binding 用于读取 RabbitMQ 上的消息,将 payload 内容转成大写再通过 Kafka Binder 构建的 output Binding 写入到 Kafka 中。图上中间的 4 行非常简单的代码就可以完成从 RabbitMQ 读取消息再写入到 Kafka 的动作。

以下代码是使用 Spring Cloud Stream 以最简单的方式完成消息的发送和接收:

@SpringBootApplication@EnableBinding({Source.class, Sink.class})  // ①
public class SCSApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder().sources(SCSApplication.class)
            .web(WebApplicationType.NONE).run(args);
    }
    @Autowired
    Source source;  // ②
    @Bean
    public CommandLineRunner runner() {
        return (args) -> {
            source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build());  // ③
        };
    }
    @StreamListener(Sink.INPUT)  // ④
    @SendTo(Source.OUTPUT)  // ⑤
    public String receive(String msg) {
        return msg.toUpperCase();
    }
}
  1. 使用 @EnableBinding 注解,注解里面有两个参数 Source 和 Sink,它们都是接口。Source 接口内部有个 MessageChannel 类型返回值的 output 方法,被 @Output 注解修饰表示这是一个 Output Binding;Sink 接口内部有个 SubscribableChannel 类型返回值的 intput 方法,被 @Input 注解修饰表示这是一个 Input Binding。@EnableBinding 注解会针对这两个接口生成动态代理。
  2. 注入 @EnableBinding 注解对于 Source 接口生成的动态代理。
  3. 使用 @EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel 发送一条消息。最终消息会被发送到消息中间件对应的 topic 里。
  4. @StreamListener 注解订阅 @EnableBinding 注解对于 Sink 接口生成的动态代理内部的 SubscribableChannel 中的消息,这里会订阅到消息中间件对应的 topic 和 group。
  5. 消息处理结果发送到 @EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel。最终消息会被发送到消息中间件对应的topic 里。

上述代码需要配置信息:

spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-binder
spring.cloud.stream.bindings.input.binder=kafka

spring.cloud.stream.bindings.output.destination=test-output
spring.cloud.stream.bindings.output.binder=rocketmq

这里的 Input Binding 对应的 topic 是 test-input,group 是 test-input-binder,对应的 MQ 是 Kafka,Output Binding 对应的 topic 是 test-output,对应的 MQ 是 RocketMQ。

所以这段代码的意思是 以 test-input-binder 这个 group 去 Kafka 上读取 test-input 这个 topic 下的消息,把消息的内容转换成大写再发送给 RocketMQ 的 test-output topic 上。

当然,你也可以直接通过 沙箱环境 直接查看案例

阅读量:2050

点赞量:0

收藏量:0