关闭
Hit
enter
to search or
ESC
to close
May I Suggest ?
#leanote #leanote blog #code #hello world
Okeeper's Blog
Home
Archives
Tags
DevOps
软件笔记
Spring
学习
JVM系列
关于我
CentOS6.5安装部署RocketMQ-3.5.8
无
1077
0
0
zhangyue
官方文档QuckStart: https://rocketmq.incubator.apache.org/docs/quick-start/ #1.下载RocketMQ ``` #网盘地址 https://pan.baidu.com/s/1qYno5Ne ``` #2.解压 ``` unzip alibaba-rocketmq3.5.8.zip mv alibaba-rocketmq /usr/local/ cd /usr/local/alibaba-rocketmq ``` #3.启动 ##1.启动nameserver ``` > nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success... ``` ##2.启动broker ``` > nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success... ``` > 可以使用`-c` 参数指定配置文件,如 `nohup sh bin/mqbroker -n localhost:9876 -c conf/2m-noslave/broker-a.properties &` ##3. 停止rocketmq ``` #停止broker ./mqshutdown broker #停止nameserver ./mqshutdown namesrv ``` #4. RocketMQ命令 请参考 http://blog.csdn.net/xianymo/article/details/42737433 ## 删除topic ``` cd bin/ #查看topic ./mqadmin topicList -n 192.168.212.17:9876 #删除 ./mqadmin deleteTopic -c DefaultCluster -n 192.168.212.17:9876 -t %RETRY%CONSUMER_REQUEST_MESSAGE_0002 #查看Cluster信息 ./mqadmin clusterList -n 192.168.212.17:9876 ``` ## 查看consumer消费者所有topic数据堆积情况 ``` ./mqadmin consumerProgress -n 192.168.23.159:9876 -g consumerGroupName ``` ## 查看topic信息列表详情统计 ``` ./mqadmin topicstatus -n 192.168.212.17:9876 -t ORDER_REQUEST_TOPIC_0001 ``` #4.客户端测试程序 ##Producer ```java import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args){ DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("192.168.212.17:9876"); ///producer.setVipChannelEnabled(false); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PullTopic", "pull", "1", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } } ``` ##Consumer ```java package com.alibaba.rocketmq.example.quickstart; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer1"); consumer.setNamesrvAddr("192.168.212.17:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } } ```
觉得不错,点个赞?
Please enable JavaScript to view the
comments powered by Disqus.
comments powered by
Disqus
文章目录