关闭
Hit
enter
to search or
ESC
to close
May I Suggest ?
#leanote #leanote blog #code #hello world
Okeeper's Blog
Home
Archives
Tags
DevOps
软件笔记
Spring
学习
JVM系列
关于我
Spring Cloud Stream 使用笔记
无
1398
0
0
zhangyue
#Spring Cloud Stream Spring 已经对JMS中间件做了很多封装,比如Spring对ampq队列协议的封装`spring-rabbit.jar``,以及企业集成的实现`Spring Integration`(解决服务间的异步通讯交互问题),而Spring Cloud Stream是对针对消息中间件的进一步封装,Stream已经支持Kafka/Rabbit MQ/Redis/Gemfire. Spring Cloud Stream官网模型图如下: **![](https://leanote.com/api/file/getImage?fileId=5a0d7305ab6441677d002467)** - Middleware:一些消息中间件,如Kafka/RabbitMQ - Binder:粘合剂,将Middleware和Stream应用粘合起来,不同Middleware对应不同的Binder。初始化一些中间件的连接信息 - Channel:通道,应用程序通过一个明确的Binder与外界(中间件)通信。消息发布使用output通道,消息监听使用input通道 - ApplicationCore:Stream自己实现的消息机制封装,包括分区、分组、发布订阅的语义,与具体中间件无关,这会让开发人员很容易地以相同的代码使用不同类型的中间件。 > 官方定义:Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot建立独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。它提供了来自几家供应商的中间件的意见配置,介绍了持久发布订阅语义,消费者组和分区的概念。 Spring Integration # 使用 ## 1. 配置 ``` #设置接受者input的队列名称 这里的test_channel 对应@Input或@Output的value spring.cloud.stream.bindings.test_channel.destination=sink-channel spring.cloud.stream.bindings.test_channel2.destination=sink-channel2 #rabbitMQ服务器地址 配置rabbit spring.cloud.stream.binders.rabbit.type=rabbit #environment 属性会在spring stream加载时加载环境变量属性中,会影视业到RabbitProperties spring.cloud.stream.binders.rabbit.environment.spring.rabbitmq.host=localhost spring.cloud.stream.binders.rabbit.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbit.environment.spring.rabbitmq.username=guest spring.cloud.stream.binders.rabbit.environment.spring.rabbitmq.password=guest spring.cloud.stream.binders.rabbit.environment.spring.rabbitmq.virtual-host=/ ``` ## 2.声明channel的inputs和outputs ``` public interface MyChannel { /** * 这里的 test_channel 对应配置文件中的 spring.cloud.stream.bindings.test_channel */ String TEST_CHANNEL = "test_channel"; @Input(TEST_CHANNEL) SubscribableChannel testChannelInput(); @Output(TEST_CHANNEL) MessageChannel testChannelOutput(); //----------------------------- String TEST_CHANNEL_TOW = "test_channel2"; @Input(TEST_CHANNEL_TOW) SubscribableChannel testChannelInputTow(); @Output(TEST_CHANNEL_TOW) MessageChannel testChannelOutputTow(); } ``` > 如果@Input未指定value时默认就是方法名,@Input和@Output的value对应配置中的spring.cloud.stream.bindings.*, 使用@Output注解是,改channel必须在配置文件中配置初始化,否则将报错` Dispatcher has no subscribers` ## 3. 开启`MyChannel`接口的实现,在启动类中加上`@EnableBinding({MyChannel.class})`,spring将会为其启动生成实现类 ``` @Slf4j @EnableBinding({MyChannel.class,Processor.class}) @SpringBootApplication public class BootApplication { public static void main(String[] args) { SpringApplication.run(BootApplication.class, args); } } ``` ## 4. 使用注解`@InboundChannelAdapter(MyChannel.TEST_CHANNEL)`绑定生产者 ``` @Slf4j @Service public class ProducerService { @InboundChannelAdapter(MyChannel.TEST_CHANNEL) public String timerMessageSource() { String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); log.info("publish message success:"+format); return format; } } ``` `@InboundChannelAdapter`默认在后台开启一个定时任务,每隔1s执行一次 # 5. 使用注解`@StreamListener`或者`@ServiceActivator`监听队列消息 ``` @Slf4j @Service public class ReceiverService { /** * @StreamListener 是Stream提供的注解,Spring Integration也有一个类似功能的注解@ServiceActivator,两者都有监听通道功能, * 区别是@StreamListener可以根据contentType去解析数据,比如一个json格式的数据,@StreamListener可以自动解析成Java bean * @param payload */ @ServiceActivator(inputChannel = MyChannel.TEST_CHANNEL, outputChannel = MyChannel.TEST_CHANNEL_TOW) //@StreamListener(MyChannel.TEST_CHANNEL)//读取input通道的数据 //@SendTo(MyChannel.TEST_CHANNEL)//经过方法处理后输出到output通道 public String receivedMsg(Object payload) { log.info("Received--" + payload); return "Received--" + payload; } @StreamListener(MyChannel.TEST_CHANNEL_TOW)//读取input通道的数据 public void receivedChannelTow(Object payload) { log.info("Received Tow: " + payload); } } ``` ## 6.执行结果 ``` 2017-11-16 18:55:16.423 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ProducerService : publish message success:2017-11-16 18:55:16 2017-11-16 18:55:16.423 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ReceiverService : Received--2017-11-16 18:55:16 2017-11-16 18:55:16.423 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ReceiverService : Received Tow: Received--2017-11-16 18:55:16 2017-11-16 18:55:17.424 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ProducerService : publish message success:2017-11-16 18:55:17 2017-11-16 18:55:17.424 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ReceiverService : Received--2017-11-16 18:55:17 2017-11-16 18:55:17.424 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ReceiverService : Received Tow: Received--2017-11-16 18:55:17 2017-11-16 18:55:18.426 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ProducerService : publish message success:2017-11-16 18:55:18 2017-11-16 18:55:18.426 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ReceiverService : Received--2017-11-16 18:55:18 2017-11-16 18:55:18.426 INFO 42168 --- [ask-scheduler-5] com.okeeper.service.ReceiverService : Received Tow: Received--2017-11-16 18:55:18 ``` 每隔一秒执行一次
觉得不错,点个赞?
Please enable JavaScript to view the
comments powered by Disqus.
comments powered by
Disqus
文章目录