Spring Cloud Stream 在 Spring Integration 项目的基础上再进行了一些封装,提出一些新的概念,让开发者能够更简单地使用这套消息编程模型。如图所示,这是三者之间的关系:
如下图所示,这是 Spring Cloud Stream 的编程模型。通过 RabbitMQ Binder 构建 input Binding 用于读取 RabbitMQ 上的消息,将 payload 内容转成大写再通过 Kafka Binder 构建的 output Binding 写入到 Kafka 中。图上中间的 4 行非常简单的代码就可以完成从 RabbitMQ 读取消息再写入到 Kafka 的动作。
以下代码是使用 Spring Cloud Stream 以最简单的方式完成消息的发送和接收:
@SpringBootApplication@EnableBinding({Source.class, Sink.class}) // ①
public class SCSApplication {
public static void main(String[] args) {
new SpringApplicationBuilder().sources(SCSApplication.class)
.web(WebApplicationType.NONE).run(args);
}
@Autowired
Source source; // ②
@Bean
public CommandLineRunner runner() {
return (args) -> {
source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build()); // ③
};
}
@StreamListener(Sink.INPUT) // ④
@SendTo(Source.OUTPUT) // ⑤
public String receive(String msg) {
return msg.toUpperCase();
}
}
上述代码需要配置信息:
spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-binder
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.destination=test-output
spring.cloud.stream.bindings.output.binder=rocketmq
这里的 Input Binding 对应的 topic 是 test-input,group 是 test-input-binder,对应的 MQ 是 Kafka,Output Binding 对应的 topic 是 test-output,对应的 MQ 是 RocketMQ。
所以这段代码的意思是 以 test-input-binder 这个 group 去 Kafka 上读取 test-input 这个 topic 下的消息,把消息的内容转换成大写再发送给 RocketMQ 的 test-output topic 上。
当然,你也可以直接通过 沙箱环境 直接查看案例
阅读量:2050
点赞量:0
收藏量:0