网站开发如何验证,企业网站建设方案平台,教育机构退费法律规定,大一html5网页设计代码【MacOS】RocketMQ 搭建Java客户端 文章目录 【MacOS】RocketMQ 搭建Java客户端一、引入RocketMQ客户端依赖1.maven工程#xff0c;在你的pom.xml中添加RocketMQ客户端依赖#xff1a;2.gradle工程添加库 二、创建生产者和消费者1.创建一个生产者消费者1.创建一个PullConsume…【MacOS】RocketMQ 搭建Java客户端 文章目录 【MacOS】RocketMQ 搭建Java客户端一、引入RocketMQ客户端依赖1.maven工程在你的pom.xml中添加RocketMQ客户端依赖2.gradle工程添加库 二、创建生产者和消费者1.创建一个生产者消费者1.创建一个PullConsumer2.创建一个PushConsumer 三、遇到的问题1.连接失败的问题2.主题没有找到 一、引入RocketMQ客户端依赖
1.maven工程在你的pom.xml中添加RocketMQ客户端依赖
dependencies!-- 添加RocketMQ客户端依赖 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion版本跟你下载的rocketmq版本一样/version/dependency
/dependencies2.gradle工程添加库
compile org.apache.rocketmq:rocketmq-client:你的版本号注意 客户端和服务端版本要一致否则会发射管一些奇怪的问题要到控制台创建Topic队列名称
二、创建生产者和消费者
1.创建一个生产者
mport com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;/*** author pengxiaoping* date 2024年10月18日 11:27*/
public class Producer {public static void main(String[] args) {Producer.producer();}public static void producer() {//创建DefaultMQProducer消息生产者对象DefaultMQProducer producer new DefaultMQProducer(TestProducerGroup);//设置NameServer 多个节点间用分号分割producer.setNamesrvAddr(localhost:9876);try {//与NameServer建立长连接producer.start();//发送十条数据for (int i 1; i 10; i) {//1S中发送一次Thread.sleep(1000);JSONObject json new JSONObject();json.put(orderId,i1);json.put(desc,这是第i1个订单);//数据正文String data json.toJSONString();/*创建消息Message消息三个参数topic 代表消息主题自定义自定义TopicOrder代表订单主题代表订单主题tags 代表标志用于消费者接收数据时进行数据筛选。PAY_TAG代表支付相关信息body 代表消息内容*/Message message new Message(TopicOrder, PAY_TAG, data.getBytes());//发送消息获取发送结果SendResult result producer.send(message);//将发送结果对象打印在控制台System.out.println(消息已发送MsgId: result.getMsgId() 发送状态: result.getSendStatus());}}catch (Exception e){e.printStackTrace();}finally {try {producer.shutdown();} catch (Exception e) {}}}}消费者
对于Consumer来说他有两种基础的工作方式pull和push。 区别pushbroker端来了消息以后主动将消息从broker端向consumer端推送。 pull对于consumer来说主动往broker发一个请求然后broker在通过请求响应给consumer一批消息。一般用push模式。
1.创建一个PullConsumer
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class PullConsumer {public static volatile boolean running true;public static void consumer() {//创建pull消费者对象DefaultLitePullConsumer litePullConsumer new DefaultLitePullConsumer(TestPullConsumerGroup);//设置NameServer节点litePullConsumer.setNamesrvAddr(localhost:9876);try {//订阅主题与Push相同litePullConsumer.subscribe(TopicOrder, *);//每次拉取数据条目数litePullConsumer.setPullBatchSize(10);//启动消费者litePullConsumer.start();while (running) {ListMessageExt messageExts litePullConsumer.poll();//批量数据处理for (MessageExt msg : messageExts) {System.out.println(消费者获取数据: msg.getMsgId() newString(msg.getBody()));}}}catch (Exception e){e.printStackTrace();}finally {litePullConsumer.shutdown();}}public static void main(String[] args) {PullConsumer.consumer();}
}2.创建一个PushConsumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class PushConsumer {public static void consumer() {//创建消费者对象DefaultMQPushConsumer consumer new DefaultMQPushConsumer(TestConsumerGroup);try {//设置NameServer节点consumer.setNamesrvAddr(localhost:9876);/*订阅主题consumer.subscribe包含两个参数topic: 说明消费者从Broker订阅哪一个主题这一项要与Provider保持一致。subExpression: 子表达式用于筛选tags。同一个主题下可以包含很多不同的tagssubExpression用于筛选符合条件的tags进行接收。例如设置为*则代表接收所有tags数据。例如设置为PAY_TAG则Broker中只有tagsPAY_TAG的消息会被接收而其他的就会被排除在外。*/consumer.subscribe(TopicOrder, *);//创建监听当有新的消息监听程序会及时捕捉并加以处理。consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) {//批量数据处理for (MessageExt msg : msgs) {System.out.println(消费者获取数据: msg.getMsgId() newString(msg.getBody()));}//返回数据已接收标识return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者与Broker建立长连接开始监听。consumer.start();} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {PushConsumer.consumer();}
}三、遇到的问题
1.连接失败的问题
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failedat org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:572)at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:2050)at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:2041)at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:782)... 6 more检查代码中设置的NameServer地址是否正确跟配置文件中的NAME_ADDR地址一致。确保没有拼写错误、IP 地址或域名准确以及端口号正确。检查 RocketMQ 的 NameServer 是否已经启动并且正在运行。查看 NameServer 的日志文件确认没有错误或异常情况。如果 NameServer 没有启动需要启动它。确保 NameServer 的配置正确并且没有与其他服务冲突。
2.主题没有找到
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details.at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:879)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1564)at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:475)at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:78)
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See https://rocketmq.apache.org/docs/bestPractice/06FAQ for further details.at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:879)at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1564)at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:475)at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:78)主题不存在或者未被创建 #在控制台创建主题 -n namesrv 的地址。-t 主题名。-c 指定所在集群
sh bin/mqadmin updateTopic -n localhost:9876 -t TopicOrder -c DefaultCluster#出现这个创建成功
create topic to 172.16.224.140:10911 success.
TopicConfig [topicNameTopicOrder, readQueueNums8, writeQueueNums8, permRW-, topicFilterTypeSINGLE_TAG, topicSysFlag0, orderfalse, attributes{}]