baby sword‘s blog baby sword‘s blog
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)

zhengjian

不敢承担失去的风险,是不可能抓住梦想的
首页
  • java基础
  • java进阶
大数据
  • mysql

    • mysql索引
    • mysql日志
  • redis

    • 单机下的redis
    • 集群下的redis
  • Spring
  • springboot
  • RPC
  • netty
  • mybatis
  • maven
  • 消息队列
  • kafka
  • zookeeper
  • rocketmq
  • 七大设计原则
  • 创建型模式
  • 结构型模式
  • 行为型模式
  • SpringCloud

    • eureka
  • SpringCloud Alibaba

    • nacos
  • 计算机网络
  • 操作系统
  • 算法
  • 个人项目
  • 个人面试面经
  • 八股记忆
  • 工作积累
  • 逻辑题
  • 面试

    • 百度后端实习二面
GitHub (opens new window)
  • 消息队列

  • kafka

  • zookeeper

  • rocketMq

    • rocketMq简介
    • 入门安装运行
    • rocketMQ-java快通跑通
      • 一. 如何查看rocketmq服务是否启动
      • 二. java maven依赖
      • 三.生产者与消费者
    • rocketMQ中的消息类型
  • 中间件
  • rocketMq
xugaoyi
2023-10-18
目录

rocketMQ-java快通跑通

# 一. 如何查看rocketmq服务是否启动

以下有几种方法:

  1. 检查RocketMQ的启动日志:RocketMQ在启动过程中会生成日志文件,通常位于RocketMQ安装目录下的logs文件夹中。打开启动日志文件,查看其中的信息,特别是最后几行,以确定是否成功启动。

  2. 检查RocketMQ的进程:在Linux或Unix系统中,可以使用以下命令检查RocketMQ的进程是否在运行:

ps -ef | grep NamesrvStartup  # 检查Namesrv进程
ps -ef | grep BrokerStartup   # 检查Broker进程
1
2

如果能够找到与NamesrvStartup或BrokerStartup相关的进程信息,则表示RocketMQ已成功启动。

  1. 检查RocketMQ的端口是否监听:RocketMQ的Namesrv和Broker分别使用不同的端口进行通信。你可以使用以下命令检查相关端口是否被监听:
netstat -tln | grep 9876  # 检查Namesrv的监听端口,默认为9876
netstat -tln | grep 10911  # 检查Broker的监听端口,默认为10911
1
2

如果命令输出显示相关端口处于LISTEN状态,说明RocketMQ已成功启动并正在监听相应的端口。

  1. 测试RocketMQ的功能:你可以使用RocketMQ提供的命令行工具或客户端程序测试其功能是否正常。例如,你可以使用RocketMQ提供的命令行工具mqadmin发送和接收消息,或者使用客户端程序连接到RocketMQ并进行消息的发送和接收操作。如果这些操作都能够正常执行,说明RocketMQ已成功启动。

# 二. java maven依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.1.3</version>
</dependency>
1
2
3
4
5

# 三.生产者与消费者

生产者

package org.zj.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AsyncProducer {

    public static final int MESSAGE_COUNT = 1000;
    public static final String PRODUCER_GROUP = "group_test";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "Topic_ASYNC";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        final CountDownLatch countDownLatch = new CountDownLatch(MESSAGE_COUNT);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {
                final int index = i;
                Message msg = new Message(TOPIC, TAG, "order", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {

                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, throwable);
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

消费者

package org.zj.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class Consumer {
    public static final String CONSUMER_GROUP = "group_test";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC_1 = "Topic_ASYNC";
    public static final String TOPIC_2 = "Topic_SYNC";
    public static final String TOPIC_3 = "Topic_ONE_WAY";



    public static void main(String[] args) throws Exception{

        // 初始化一个确切的消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

        // nameserver服务地址
        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        /**
         * 消费最新消息
         * CONSUME_FROM_LAST_OFFSET,
         * 从头开始消费
         *CONSUME_FROM_FIRST_OFFSET,
         *CONSUME_FROM_TIMESTAMP;
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 订阅主题
        consumer.subscribe(TOPIC_1, "*");
        consumer.subscribe(TOPIC_2, "*");
        consumer.subscribe(TOPIC_3, "*");

        // 注册回调函数
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 初始化消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

参考: Client SDK (opens new window)

编辑 (opens new window)
上次更新: 2024/02/22, 14:03:19
入门安装运行
rocketMQ中的消息类型

← 入门安装运行 rocketMQ中的消息类型→

最近更新
01
spark基础
02-22
02
mysql读写分离和分库分表
02-22
03
数据库迁移
02-22
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式