博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【学习】025 RocketMQ
阅读量:5238 次
发布时间:2019-06-14

本文共 12334 字,大约阅读时间需要 41 分钟。

RocketMQ概述

RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式 高效的订阅者水平扩展能力 实时的消息订阅机制 亿级消息堆积能力

RocketMQ包含的组件

NameServer:单点,供Producer和Consumer获取Broker地址

Producer:产生并发送消息

Consumer:接受并消费消息

Broker:消息暂存,消息转发

Name Server

Name Server是RocketMQ的寻址服务。用于把Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。

Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。

对于一个Name Server集群列表,客户端连接Name Server的时候,只会选择随机连接一个结点,以做到负载均衡。

Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。

如果中途所有Name Server全都挂了,影响到路由信息的更新,不会影响和Broker的通信。

Broker

Broker是处理消息存储,转发等处理的服务器。

Broker以group分开,每个group只允许一个master,若干个slave。

只有master才能进行写入操作,slave不允许。

slave从master中同步数据。同步策略取决于master的配置,可以采用同步双写,异步复制两种。

客户端消费可以从master和slave消费。在默认情况下,消费者都从master消费,在master挂后,客户端由于从Name Server中感知到Broker挂机,就会从slave消费。

Broker向所有的NameServer结点建立长连接,注册Topic信息。

RocketMQ优点

1.强调集群无单点,可扩展

2.任意一点高可用,水平可扩展

3.海量消息堆积能力,消息堆积后,写入低延迟。

4.支持上万个队列

5.消息失败重试机制

6.消息可查询

7.开源社区活跃

8.成熟度(经过双十一考验)

RocketMQ环境安装

服务器配置

192.168.110.187 nameServer1,brokerServer1192.168.110.188 nameServer2,brokerServer2

添加Host文件

vi /etc/hosts192.168.110.187 rocketmq-nameserver1192.168.110.187 rocketmq-master1192.168.110.188 rocketmq-nameserver2192.168.110.188 rocketmq-master2service network restart

注意: Error:No suitable device found: no device found for connection "System eth0"

解决办法:

(1)ifconfig -a 查看物理 MAC HWADDR 的值

(2)vim 编辑文件 /etc/sysconfig/network-scripts/ifcfg-eth0中修改ifconfig中查出的MAC HWADDR值;

上传安装包

# 上传alibaba-rocketmq-3.2.6.tar.gz文件至/usr/localtar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/localmv alibaba-rocketmq alibaba-rocketmq-3.2.6ln -s alibaba-rocketmq-3.2.6 rocketmq

创建存储路径【两台机器】

mkdir /usr/local/rocketmq/storemkdir /usr/local/rocketmq/store/commitlogmkdir /usr/local/rocketmq/store/consumequeuemkdir /usr/local/rocketmq/store/index

RocketMQ配置文件【两台机器】

vim /usr/local/rocketmq/conf/2m-noslave/broker-a.propertiesvim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties

修改日志配置文件【两台机器】

mkdir -p /usr/local/rocketmq/logscd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改启动NameServer【两台机器】

vim /usr/local/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
vim /usr/local/rocketmq/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -

XX:PermSize=128m -XX:MaxPermSize=320m"

启动NameServer【两台机器】

cd /usr/local/rocketmq/binnohup sh mqnamesrv &

启动BrokerServer A

cd /usr/local/rocketmq/binnohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &netstat -ntlpjpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

启动BrokerServer B

cd /usr/local/rocketmq/binnohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &netstat -ntlpjpstail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.logtail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

RocketMQ Console

rocketmq-web-console 部署到webapps目录中。

/usr/local/apache-tomcat-7.0.65/webapps/rocketmq-web-console/WEB-INF/classes/

修改config.properties

rocketmq.namesrv.addr=192.168.110.195:9876;192.168.110.199:9876

运行效果

安装jdk环境

vi /etc/profile

export JAVA_HOME=/usr/local/jdk1.7.0_80

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$JAVA_HOME/bin:$PATH

source /etc/profile

Java操作RocketMQ

pom文件依赖

com.alibaba.rocketmq
rocketmq-client
3.0.10
com.alibaba.rocketmq
rocketmq-all
3.0.10
pom
ch.qos.logback
logback-classic
1.1.1
ch.qos.logback
logback-core
1.1.1
junit
junit
4.10
test

生产者

package com.hongmoshui;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class Producer{    public static void main(String[] args) throws MQClientException    {        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");        producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");        producer.setInstanceName("producer");        producer.start();        try        {            for (int i = 0; i < 10; i++)            {                // 每秒发送一次MQ                Thread.sleep(1000);                // topic:主题名称,tag:临时值,body:内容                Message msg = new Message("hongmoshui-topic", "TagA", ("hongmoshui-" + i).getBytes());                SendResult sendResult = producer.send(msg);                System.out.println(sendResult.toString());            }        }        catch (Exception e)        {            e.printStackTrace();        }        producer.shutdown();    }}

消费者

package com.hongmoshui;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer{    public static void main(String[] args) throws MQClientException    {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");        consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");        consumer.setInstanceName("consumer");        consumer.subscribe("hongmoshui-topic", "TagA");        consumer.registerMessageListener(new MessageListenerConcurrently()        {            public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getMsgId() + "---" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }}

RocketMQ重试机制

MQ 消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。

MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

package com.hongmoshui.test2;import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer{    public static void main(String[] args) throws MQClientException    {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");        consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");        consumer.setInstanceName("consumer");        consumer.subscribe("hongmoshui-topic", "TagA");        consumer.registerMessageListener(new MessageListenerConcurrently()        {            public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getMsgId() + "---" + new String(msg.getBody())); } try { int i = 1 / 0; } catch (Exception e) { e.printStackTrace(); // 需要重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 不需要重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }}

注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

RocketMQ如何解决消息幂等

注意:每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

解决办法:使用自定义全局ID判断幂等,例如流水ID、订单号

使用msg.setKeys 进行区分

生产者:

package com.hongmoshui.test3;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class Producer{    public static void main(String[] args) throws MQClientException    {        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");        producer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876");        producer.setInstanceName("producer");        producer.start();        try        {            for (int i = 0; i < 1; i++)            {                // 每秒发送一次MQ                Thread.sleep(1000);                // topic:主题名称,tag:临时值,body内容                Message msg = new Message("hongmoshui-topic", "TagA", ("hongmoshui-" + i).getBytes());                msg.setKeys(System.currentTimeMillis() + "");                SendResult sendResult = producer.send(msg);                System.out.println(sendResult.toString());            }        }        catch (Exception e)        {            e.printStackTrace();        }        producer.shutdown();    }}

消费者:

package com.hongmoshui.test3;import java.util.HashMap;import java.util.List;import java.util.Map;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer{    static private Map
logMap = new HashMap
(); public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("192.168.110.195:9876;192.168.110.199:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("hongmoshui-topic", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { String key = null; String msgId = null; try { for (MessageExt msg : msgs) { key = msg.getKeys(); if (logMap.containsKey(key)) { // 无需继续重试。 System.out.println("key:" + key + ",无需重试..."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } msgId = msg.getMsgId(); System.out.println("key:" + key + ",msgid:" + msgId + "---" + new String(msg.getBody())); int i = 1 / 0; } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { logMap.put(key, msgId); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }}

 

转载于:https://www.cnblogs.com/hongmoshui/p/10999072.html

你可能感兴趣的文章
KDESVN中commit时出现containing working copy admin area is missing错误提示
查看>>
利用AOP写2PC框架(二)
查看>>
【动态规划】skiing
查看>>
java定时器的使用(Timer)
查看>>
Android实现静默安装与卸载
查看>>
ef codefirst VS里修改数据表结构后更新到数据库
查看>>
boost 同步定时器
查看>>
[ROS] Chinese MOOC || Chapter-4.4 Action
查看>>
简单的数据库操作
查看>>
解决php -v查看到版本与phpinfo()版本不一致问题
查看>>
Java反射之修改常量值
查看>>
jmeter远程分布执行遇到的网卡坑(A Test is currently running,stop or ....)
查看>>
iOS-解决iOS8及以上设置applicationIconBadgeNumber报错的问题
查看>>
亡灵序曲-The Dawn
查看>>
MySQL中的隔离级别和悲观锁及乐观锁示例
查看>>
手机端h5 ajax 上传图片支持微信内置浏览器
查看>>
Redmine
查看>>
HtmlEditor常用模式
查看>>
Another app is currently holding the yum lock; waiting for it to exit.. yum被锁定无法使用
查看>>
帧的最小长度 CSMA/CD
查看>>