消息队列之rocketMq详细安装教程与使用详解

rocketMq下载安装

选择版本进行下载:http://rocketmq.apache.org/release_notes/

我选择4.9版本的进行下载

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

有点让人疑惑的是这个地址在谷歌浏览器里打开下载不了,需要在ie浏览器下载。

下载完成后的目录结构如下:

在这里插入图片描述

添加环境变量 ROCKETMQ_HOME

在这里插入图片描述

这里我是手动把文件名字改了,不让那么长。这个自己随意改就好。

rocketMq启动

在bin目录打开cmd命令窗口

启动NAMESERVER: start mqnamesrv.cmd

启动BROKER: start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

注意:如果启动时提示请设置ROCKETMQ_HOME,电脑重启一下就好了

在这里插入图片描述

插件部署

下载地址:点此下载

下载完成解压后,打开此文件

在这里插入图片描述
在这里插入图片描述

返回根目录

在这里插入图片描述

进入rocketMq-console目录,打开cmd,

在此处执行:mvn clean package -Dmaven.test.skip=true,进入编译阶段

在这里插入图片描述

显示success,打包成功。

执行命令:

java -jar rocketmq-console-ng-2.0.0.jar

启动完成,访问localhost:8888

在这里插入图片描述

至此,mq环境搭建完成,可以通过控制层查看topic信息等。

快速使用

这里创建两个工程,一个生产者,负责生产消息;一个消费者,负责消费。

在这里插入图片描述

添加依赖

完整依赖如下,可按自己需求进行调整。我这里使用的是log4j2日志框架,springboot 2.1.7版本。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ssw</groupId>
    <artifactId>rocketmq-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-producer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--日志相关log4j2-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <!--去除logback依赖包,去掉springboot默认配置-->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 引入log4j2依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
生产者配置
application配置
server.port=5001
rocketMq.address=127.0.0.1:9876
rocketMq.group=mq_producer
生产者配置类
@Component
public class MessageProducer {
    @Value("${rocketMq.address}")
    private String rocketMqAddress;
    @Value("${rocketMq.group}")
    private String producerGroup;
    private DefaultMQProducer producer;
    //对象在用之前必须要调用一次,只能初始化一次
    public void start() {
        try {
            producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(rocketMqAddress);
            this.producer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 异步发送mq
     * @param topic
     * @param tags
     * @param body
     */
    @Async
    public void sendMessage(String topic, String tags, String body) {
        try {
            Message message = new Message(topic, tags, body.getBytes("UTF-8"));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("消息生产成功, sendResult:{}" + sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    Integer retryTimes = producer.getRetryTimesWhenSendAsyncFailed();
                    System.out.println("消息生产失败, retryTimes:{}" + retryTimes);
                }
            });
        } catch (Exception e) {
            System.out.println("消息生产失败,topic:{}, tags:{}, body:{}" + topic + "-----" + tags + "-----" + body);
        }
    }
}
自启动配置
@Component
public class Start implements CommandLineRunner {
    @Autowired
    private MessageProducer messageProducer;
    @Override
    public void run(String... args) throws Exception {
        messageProducer.start();
    }
}
创建控制层
@RestController
@RequestMapping("production")
public class ProductionController {
    @Autowired
    private MessageProducer messageProducer;
    /**
     * 生产数据
     * @param topic
     * @param tags
     * @param text
     * @return
     */
    @RequestMapping("push")
    public boolean callback(String topic, String tags, String text) {
        try {
            messageProducer.sendMessage(topic, tags, text);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}
测试生产

访问测试生产消息的地址:http://localhost:5001/production/push?topic=test_topic

在这里插入图片描述

消费者配置
application配置
server.port=5002
rocketMq.address=127.0.0.1:9876
rocketMq.group=mq_consumer
#监听topic
rocketMq.Listener.topic=test_topic
消费者配置类
@Log4j2
@Component
public class ListenerService {
    @Value("${rocketMq.group}")
    private String consumerGroup;
    @Value("${rocketMq.address}")
    private String address;
    @Value("${rocketMq.Listener.topic}")
    private String listenerTopic;

    private DefaultMQPushConsumer consumer;

    public void start() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(address);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe(listenerTopic, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, content) -> {
            Message msg = msgs.get(0);
            String topic = msg.getTopic();
            try {
                String body = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                String keys = msg.getKeys();
                log.error(topic + "---消费成功---" + topic + ",tags=" + tags + ",keys=" + keys + ",msg=" + body);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (UnsupportedEncodingException e) {
                log.error(topic + "---消费失败");
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
    }
}
自启动配置
@Component
public class Start implements CommandLineRunner {
    @Autowired
    private ListenerService listenerService;

    @Override
    public void run(String... args) throws Exception {
        listenerService.start();
    }
}
启动

当生产者生产topic为test_topic,消费者就可以接收到。

在这里插入图片描述

上述所有示例代码,已上传gitee,可自行下载。

点此查看