当前位置: 首页 > news >正文

网站文章分类莱芜 网站

网站文章分类,莱芜 网站,成都房地产网站建设,深圳企业官网网站建设哪家好#x1f389;#x1f389;欢迎来到我的CSDN主页#xff01;#x1f389;#x1f389; #x1f3c5;我是Java方文山#xff0c;一个在CSDN分享笔记的博主。#x1f4da;#x1f4da; #x1f31f;推荐给大家我的专栏《RabbitMQ实战》。#x1f3af;#x1f3af; 欢迎来到我的CSDN主页 我是Java方文山一个在CSDN分享笔记的博主。 推荐给大家我的专栏《RabbitMQ实战》。 点击这里就可以查看我的主页啦 Java方文山的个人主页 如果感觉还不错的话请给我点赞吧 期待你的加入一起学习一起进步 ✨前言 了解延迟队列之前我们先了解两个概念TTL和 DXL两个概念 TTL概念 TTL 顾名思义指的是消息的存活时间RabbitMQ可以通过x-message-tt参数来设置指定Queue队列和 Message消息上消息的存活时间它的值是一个非负整数单位为微秒。 RabbitMQ 可以从两种维度设置消息过期时间分别是队列和消息本身 设置队列过期时间那么队列中所有消息都具有相同的过期时间。 设置消息过期时间对队列中的某一条消息设置过期时间每条消息TTL都可以不同。 如果同时设置队列和队列中消息的TTL则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间一旦超过TTL过期时间则成为Dead Letter死信。 DLX概念 DLX全称为 Dead-Letter-Exchange可以称之为死信交换器也有人称之为死信邮箱。当 消息在一个队列中变成死信dead message之后它能被重新被发送到另一个交换器中这个 交换器就是 DLX绑定 DLX 的队列就称之为死信队列或者可以称之为延迟队列。 消息变成死信一般是由于以下几种情况 消息被拒绝 Basic.Reject/Basic.Nack 并且设置 requeue 参数为 false 消息过期TTL过期 队列达到最大长度。 延迟队列存储的对象是对应的延迟消息所谓“延迟消息”是指当消息被发送以后并不想让消费者立刻拿到消息而是等待特定时间后消费者才能拿到这个消息进行消费。 延迟队列的使用场景有很多比如 在订单系统中一个用户下单之后通常有 30 分钟的时间进行支付如果 30 分钟之内没有支付成功那么这个订单将进行异常处理这时就可以使用延迟队列来处理这些订单了。 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列当指令设定的时间到了再将指令推送到智能设备。 在 AMQP 协议中或者 RabbitMQ 本身没有直接支持延迟队列的功能但是可以通过前面 所介绍的 DLX 和 TTL 模拟出延迟队列的功能。 死信交换机的使用 1.案例分析 下面我会讲解两种使用死信的方式 第一种是我们不为正常的交换机设置消费者为该队列中的消息设置TTL如果消息过期了就会变为死信就会被发送到死信交换机中处理对应的事务 假设我们有一个订单系统订单有一个待支付状态如果在30分钟内未支付则自动变为已取消状态。我们可以通过 RabbitMQ 的 TTL 机制和死信队列来实现这个功能。 具体步骤如下 创建一个普通的交换机例如 orderExchange和一个普通的队列例如 orderQueue。将队列绑定到交换机上。 创建一个死信交换机例如 deadLetterExchange和一个死信队列例如 deadLetterQueue。将死信队列绑定到死信交换机上。 将普通队列设置为有 TTL 的队列并指定 TTL 的时间为30分钟。这样如果消息在队列中存活时间超过30分钟就会变为死信。 设置普通队列的死信交换机和死信路由键。当消息变为死信时会被发送到死信交换机并根据死信路由键路由到对应的死信队列。 创建一个死信消费者监听死信队列中的消息。当收到订单消息时判断订单是否已经支付如果未支付则将其修改为已取消状态。 这种方式可以灵活地处理订单超时自动取消的需求并且不需要为每个订单单独创建消费者降低了系统的复杂性。同时通过使用 RabbitMQ 的 TTL 机制和死信队列还可以实现其他类似的延迟任务处理场景。 第二种则是为正常队列创建一个消费者但是开启手动确认什么意思呢我们的RabbitMQ中的消费者都是自动消费的所以我们可以设置为手动确认消费我接收到你这个消息了但我还未处理而是由消费者主动发送确认信号ACK给 RabbitMQ告知消息已经成功处理这条消息才算是被消费了。 以下情况会使用这种方式 并发处理当多个消费者同时消费同一个队列中的消息时为了保证消息不被重复消费或丢失可以使用手动签收。消费者在处理完消息后手动发送 ACK 确认消息处理完成这样 RabbitMQ 就知道该消息已经被正确处理可以将其标记为已消费。 消息处理失败如果消费者在处理消息时发生了异常或错误可以选择不发送 ACK即不确认消息的消费完成。这样 RabbitMQ 就会重新将该消息发送给其他消费者进行处理确保消息不会丢失。 消息处理耗时较长某些消息的处理可能需要较长的时间为了防止 RabbitMQ 认为该消息处理超时而重新发送给其他消费者可以使用手动签收。消费者在开始处理消息时发送 ACK 告知 RabbitMQ 接收到消息并开始处理然后在处理完成后再发送 ACK 确认消息处理完成。 2.案例1实践 创建交换机、队列以及它们的绑定关系 package org.example.produce.config;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;Configuration SuppressWarnings(all) public class RabbitConfig {/*** 定义正常队列* return*/Beanpublic Queue QueueA(){MapString, Object config new HashMap();config.put(x-message-ttl, 10000);//message在该队列queue的存活时间最大为10秒config.put(x-dead-letter-exchange, deadExchange); //x-dead-letter-exchange参数是设置该队列的死信交换器DLXconfig.put(x-dead-letter-routing-key, bb);//x-dead-letter-routing-key参数是给这个DLX指定路由键return new Queue(QueueA, true, true, false, config);}/*** 定义死信队列* return*/Beanpublic Queue QueueB(){return new Queue(QueueB);}/*** 自定义直连交换机* return*/Beanpublic DirectExchange directExchangeA(){return new DirectExchange(direct-exchangeA,true,false);}/*** 自定义死信交换机* return*/Beanpublic DirectExchange directExchangeB(){return new DirectExchange(direct-exchangeB,true,false);}/*** 将正常队列与直连交换机进行绑定并设置路由键与死信交换机以及队列* return*/Beanpublic Binding bindingExchangeA(){return BindingBuilder.bind(QueueA()).to(directExchangeA()).with(aa);}/*** 将死信队列与死信交换机进行绑定并设置路由键* return*/Beanpublic Binding bindingExchangeB(){return BindingBuilder.bind(QueueB()).to(directExchangeB()).with(bb);}} x-message-ttl 参数是设置消息在队列中的存活时间单位为毫秒。在这个例子中设置了 10000 毫秒即 10 秒钟。如果一个消息在该队列中未被消费者接收并确认处理那么该消息就会被自动移除出队列避免消息的过期占用队列资源。 x-dead-letter-exchange 参数是设置该队列的死信交换器DLX表示当一个消息变成死信时会被发送到指定的 DLX 中。在这个例子中设置的死信交换器为 direct-exchangeB即将死信消息发送到名为 direct-exchangeB 的交换器。 x-dead-letter-routing-key 参数是给该 DLX 指定一个路由键。当消息变成死信时会根据该路由键将它发送到绑定该路由键的队列中。在这个例子中设置的路由键为 bb即将死信消息发送到名为 QueueB 的队列中。 创建消息的生产者 package org.example.produce.controller;import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.HashMap; import java.util.Map;RestController public class Sender {Autowiredprivate AmqpTemplate rabbitTemplate;RequestMapping(/send)public String send() {MapString,Object datanew HashMap();data.put(msg,订单ID:121452623345);rabbitTemplate.convertAndSend(direct-exchangeA,aa, data);return ;} } 创建死信队列的消费者  package org.example.produce.controller;import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.util.Map;Component RabbitListener(queues {QueueB}) public class BReceiver {RabbitHandlerpublic void handler(MapString,Object json){System.out.println(json);} } 效果展示  访问一下http://localhost:8081/send 访问一下http://118.178.124.148:15672 可以看到已经有我们的队列了现在我们开启消费者服务查看一下 也是可以拿到原先队列中的消息的说明我们的死信交换机和死信队列生效了 RabbitMQ死信队列优化 如果我们想要第一条消息在6s后变成了死信消息然后被消费者消费掉第二条消息在60s之后变成了死信消息然后被消费掉这样岂不是每增加一个新的时间需求就要新增一个队列这里只有6s和60s两个时间选项如果需要一个小时后处理那么就需要增加TTL为一个小时的队列如果是预定会议室然后提前通知这样的场景岂不是要增加无数个队列才能满足需求 其实我们可以增加一个延时队列用于接收设置为任意延时时长的消息增加一个相应的死信队列和routingkey 创建交换机、队列以及它们的绑定关系 package org.example.produce.config;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;Configuration SuppressWarnings(all) public class RabbitConfig {/*** 定义正常队列* return*/Beanpublic Queue QueueA(){MapString, Object config new HashMap();config.put(x-message-ttl, 10000);//message在该队列queue的存活时间最大为10秒config.put(x-dead-letter-exchange, direct-exchangeB); //x-dead-letter-exchange参数是设置该队列的死信交换器DLXconfig.put(x-dead-letter-routing-key, bb);//x-dead-letter-routing-key参数是给这个DLX指定路由键return new Queue(QueueA, true, true, false, config);}/*** 定义死信队列* return*/Beanpublic Queue QueueB(){return new Queue(QueueB);}// 声明延时队列C 不设置TTLBeanpublic Queue QueueC(){MapString, Object config new HashMap();// x-dead-letter-exchange 这里声明当前队列绑定的正常交换机config.put(x-dead-letter-exchange,direct-exchangeA);// x-dead-letter-routing-key 这里声明当前队列的死信路由keyconfig.put(x-dead-letter-routing-key, aa);return new Queue(QueueC, true, true, false, config);}/*** 自定义直连交换机* return*/Beanpublic DirectExchange directExchangeA(){return new DirectExchange(direct-exchangeA,true,false);}/*** 自定义死信交换机* return*/Beanpublic DirectExchange directExchangeB(){return new DirectExchange(direct-exchangeB,true,false);}/*** 自定义延迟交换机* return*/Beanpublic DirectExchange directExchangeC(){return new DirectExchange(direct-exchangeC,true,false);}/*** 将正常队列与直连交换机进行绑定并设置路由键与死信交换机以及队列* return*/Beanpublic Binding bindingExchangeA(){return BindingBuilder.bind(QueueA()).to(directExchangeA()).with(aa);}/*** 将死信队列与死信交换机进行绑定并设置路由键* return*/Beanpublic Binding bindingExchangeB(){return BindingBuilder.bind(QueueB()).to(directExchangeB()).with(bb);}/*** 将正常队列与直连交换机进行绑定并设置路由键与死信交换机以及队列* return*/Beanpublic Binding bindingExchangeC(){return BindingBuilder.bind(QueueC()).to(directExchangeC()).with(cc);}/} 创建消息的生产者 package org.example.produce.controller;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.Date; import java.util.HashMap; import java.util.Map;RestController Slf4j public class Sender {Autowiredprivate AmqpTemplate rabbitTemplate;RequestMapping(send02)public void sendMsg( Integer delay) {MapString,Object datanew HashMap();data.put(msg,延迟队列);rabbitTemplate.convertAndSend(direct-exchangeC, cc,data , message - {// 设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delay * 1000));return message;});} } 3.案例2实践 消息通过 ACK 确认是否被正确接收每个 Message 都要被确认acknowledged可以手动去 ACK 或自动 ACK 自动确认会在消息发送给消费者后立即确认但存在丢失消息的可能如果消费端消费逻辑抛出异常也就是消费端没有处理成功这条消息那么就相当于丢失了消息 如果消息已经被处理但后续代码抛出异常使用 Spring 进行管理的话消费端业务逻辑会进行回滚这也同样造成了实际意义的消息丢失 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认手动确认可以在业务失败后进行一些操作如果消息未被 ACK 则会发送到下一个消费者 如果某个服务忘记 ACK 了则 RabbitMQ 不会再发送数据给它因为 RabbitMQ 认为该服务的处理能力有限 ACK 机制还可以起到限流作用比如在接收到某条消息时休眠几秒钟 消息确认模式有 AcknowledgeMode.NONE自动确认 AcknowledgeMode.AUTO根据情况确认 AcknowledgeMode.MANUAL手动确认 配置yml文件关闭自动确认 server:port: 9999 spring:application:name: consumerabbitmq:host: localhostusername: weiweipassword: 123456port: 5672virtual-host: my_vhostlistener:simple:acknowledge-mode: manual 为QueueA创建一个消费者并且手动确认 在刚刚上一个案例中我们不是没有为正常队列创建消费者吗现在我们创建一个 package org.example.produce.controller;import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.Map;Component RabbitListener(queues {QueueA}) public class AReceiver {RabbitHandlerpublic void handler(MapString,Object json, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {System.out.println(QA接到消息json); // 打印接收到的消息channel.basicAck(tag,true); // 确认消息已被消费} } 需要注意的 basicAck 方法需要传递两个参数 deliveryTag唯一标识 ID当一个消费者向 RabbitMQ 注册后会建立起一个 Channel RabbitMQ 会用 basic.deliver 方法向消费者推送消息这个方法携带了一个 delivery tag 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID是一个单调递增的正整数delivery tag 的范围仅限于 Channel multiple为了减少网络流量手动确认可以被批处理当该参数为 true 时则可以一次性确认 delivery_tag 小于等于传入值的所有消息 现在我们看一下我们的消息是否会是怎么样的 2024-01-25 19:55:27.744 INFO 13668 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet dispatcherServlet 2024-01-25 19:55:27.744 INFO 13668 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet dispatcherServlet 2024-01-25 19:55:27.745 INFO 13668 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms QA接到消息{msg订单ID:121452623345} QA接到消息{msg订单ID:121452623345} 直接被QA接收消费那么如果我拒绝呢 消费者拒绝消息 package org.example.produce.controller;import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import java.util.Map;Component RabbitListener(queues {QueueA}) public class AReceiver {RabbitHandlerpublic void handler(MapString,Object json, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {System.out.println(拒绝json);channel.basicReject(tag,false); // 拒绝消息Thread.sleep(1000);} } basicReject(tag,false)是用于拒绝消息并可选择是否将消息重新放回队列的方法。 具体来说basicReject() 方法用于告知 RabbitMQ 服务器拒绝处理特定的消息并可以选择将消息重新放回队列中等待重新投递。这样可以用于处理无法处理的消息或者避免消息丢失。 参数 tag 表示消息的标签delivery tag它是一个唯一的标识符用于表示消息在通道中的位置。每个消息都会被分配一个单独的标签。通过将正确的标签传递给 basicReject() 方法可以告诉 RabbitMQ 服务器要拒绝哪个具体的消息。第二个参数 false 表示不将消息重新放回队列即将消息丢弃。如果将其设置为 true则表示将消息重新放回队列中等待重新投递。   被拒绝就会变成死信消息转到我们的死信交换机然后发送给死信队列 但是我们的死信也没有进行消费 只是消息保存在了队列中那是因为我们开启了全局的手动消息确认也就是上面所编写的配置我们只需要像刚刚那样手动确认即可   package org.example.produce.controller;import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import java.util.Map;Component RabbitListener(queues {QueueB}) public class BReceiver {RabbitHandlerpublic void handler(MapString,Object json, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {System.out.println(QB接到消息json); // 打印接收到的消息channel.basicAck(tag,true); // 确认消息已被消费} } 可以看到在消息被拒后消息就会跑到死信队列中做处理 2024-01-25 20:00:53.759 INFO 13444 --- [ restartedMain] org.example.produce.ProduceApplication : Started ProduceApplication in 3.916 seconds (JVM running for 4.403) 2024-01-25 20:01:16.094 INFO 13444 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet dispatcherServlet 2024-01-25 20:01:16.095 INFO 13444 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet dispatcherServlet 2024-01-25 20:01:16.095 INFO 13444 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms 拒绝{msg订单ID:121452623345} QB接到消息{msg订单ID:121452623345}​ 到这里我的分享就结束了欢迎到评论区探讨交流 如果觉得有用的话还请点个赞吧
http://www.yingshimen.cn/news/44355/

相关文章:

  • 百度站长提交网站地图国外房产中介网站
  • 慈溪高端网站设计网上做名片的网站
  • 网站首页轮播图怎么做的湖南做网站 找磐石网络一流
  • 永康哪有做网站的公司专做老酒的网站
  • 黄山工程建设信息网站php网站验证码
  • 建设银行网站用户登录阳江房产网58同城
  • 哈尔滨网站建设有限公司网页无法打开
  • 在线制作网站宣传视频贵安建设厅网站
  • 网站项目计划书谈一谈对网站开发的理解
  • 网站美化公司互联网招商项目
  • 建设部门户网站官方制作网站
  • 网站开发方案书简易做海报网站
  • 揭阳新站seo方案access 数据库做网站
  • 收费做网站百度指数可以用来干什么
  • 做网站 超速云企业网站建设设计公司
  • 农村电商怎么赚钱百度seo优化培训
  • 荥阳网站推广分类信息网站平台有哪些
  • 旅游公司网站 优帮云商业网站成功的原因
  • 网站开发建设价格普洱北京网站建设
  • 寻找郑州网站优化公司长沙专业网站设计公司
  • 关于网站建设的工作总结建设应用型网站的意义
  • 加强检察院门户网站建设猴哥影院在线电影观看
  • 深圳微商城网站设计价格品牌策划ppt
  • iis做网站黑彩网站建设运营
  • 东营 网站建设公司佛山网站建设 骏域
  • 肥城网站设计公司360站长平台链接提交
  • 用dedecms 做门户网站优惠券个人网站怎么做
  • 分享类网站源码资源企业网站排名优化价格
  • 建筑招工网站企业展厅设计制作
  • 事业单位做网站需要前置审批吗小说类网站功能建设