当前位置: 首页 > news >正文

学做网站的书籍企业门户网站的意义

学做网站的书籍,企业门户网站的意义,奉化网络推广,素材网有哪些文章目录 一、虚拟机二、关于消息的API发布消息直接交换机 DIRECT 转发规则扇出交换机 FANOUT 转发规则主题交换机 TOPIC 转发规则匹配规则Router类 订阅消息消费者队列如何给订阅的消费者发送消息自动发送消息至订阅者 应答消息 三、代码编写 一、虚拟机 接下来要创建虚拟机,… 文章目录 一、虚拟机二、关于消息的API发布消息直接交换机 DIRECT 转发规则扇出交换机 FANOUT 转发规则主题交换机 TOPIC 转发规则匹配规则Router类 订阅消息消费者队列如何给订阅的消费者发送消息自动发送消息至订阅者 应答消息 三、代码编写 一、虚拟机 接下来要创建虚拟机,每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.同时提供 api 供上层调用. 在这里咱们实现的单虚拟机,并没有提供创建虚拟机,销毁虚拟机的功能,但是为了方便后续的扩展,咱们要约定好如何区分多个虚拟机之间的交换机,队列,绑定关系. 不同虚拟机当然可以拥有相同名字的交换机等. 比如 虚拟机A中 拥有 交换机C, 虚拟机B中 拥有 交换机C, 咱们视为以上情况是合法的. 咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字. 即 客户要在虚拟机 VirtualHostA 中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC. 当然也有其他方案,大家可以自由发挥. 二、关于消息的API 这是虚拟机要提供给上层的API 前6个API咱们已经写好了,只需要直接调用下层的API即可. 咱们现在来考虑后 7 - 9 这三个API的实现. 发布消息 发布消息API:其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列. 绑定关系 Binding 中有一个 bindingKey 属性 消息 Message 中 有一个 routingKey 属性 下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义. 直接交换机 DIRECT 转发规则 在直接交换机中, bindingKye是无意义的, routingKey是要转发到的队列的队列名. 直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中. 扇出交换机 FANOUT 转发规则 在扇出交换机中, bindingKye是无意义的, routingKey是无意义的. 扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的. 主题交换机 TOPIC 转发规则 在主题交换机中, bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁), routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙). 主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列. 匹配规则 routingKey规则 由数字,字母,下划线组成使用 . 将routingKey分成多个部分. bindingKey规则 由数字,字母,下划线组成使用 . 将routingKey分成多个部分.支持两种特殊的符号作为通配符 * 与 # (*和#必须是作为被 . 分割出来的单独部分如 aaa*.bb就是非法的) * 可以匹配任何一个独立的部分 # 可以匹配0个或多个的独立部分 匹配规则 Router类 在core包中创建 Router类,来完成对bindingKey与routingKey的校验与匹配. /*** 这个类用来检查 bindingKey与routingKey 是否合法* 以及 bindingKey与routingKey 的匹配功能,* 以及 根据不同交换机的转发规则判断 消息 Message 是否可以转发给对应的绑定队列*/ public class Router {// bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.// 检查 BindingKey 是否合法public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i 0; i bindingKey.length(); i) {char ch bindingKey.charAt(i);if (ch A ch Z) {continue;}if (ch a ch z) {continue;}if (ch 0 ch 9) {continue;}if (ch _ || ch . || ch * || ch #) {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况.String[] words bindingKey.split(\\.);for (String word : words) {// 检查 word 长度 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() 1 (word.contains(*) || word.contains(#))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 这样约定是因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性 提升不大~~// 1. aaa.#.#.bbb 非法// 2. aaa.#.*.bbb 非法// 3. aaa.*.#.bbb 非法// 4. aaa.*.*.bbb 合法for (int i 0; i words.length - 1; i) {// 连续两个 ##if (words[i].equals(#) words[i 1].equals(#)) {return false;}// # 连着 *if (words[i].equals(#) words[i 1].equals(*)) {return false;}// * 连着 #if (words[i].equals(*) words[i 1].equals(#)) {return false;}}return true;}// 检查 RoutingKey 是否合法// routingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分public boolean checkRoutingKey(String routingKey) {if (routingKey.length() 0) {// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 return true;}for (int i 0; i routingKey.length(); i) {char ch routingKey.charAt(i);// 判定该字符是否是大写字母if (ch A ch Z) {continue;}// 判定该字母是否是小写字母if (ch a ch z) {continue;}// 判定该字母是否是阿拉伯数字if (ch 0 ch 9) {continue;}// 判定是否是 _ 或者 .if (ch _ || ch .) {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}// 用来判断该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {// 根据不同的 exchangeType 使用不同的转发规则if (exchangeType ExchangeType.FANOUT) {// 如果是 FANOUT 类型则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType ExchangeType.TOPIC) {return routeTopic(binding,message);} else {throw new MqException([Router] 交换机类型非法! exchangeTypeexchangeType);}}// 用来匹配 bindingKey与routingKeyprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens binding.getBindingKey().split(\\.);String[] routingTokens message.getRoutingKey().split(\\.);// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex 0;int routingIndex 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 1, 不适合使用 forwhile (bindingIndex bindingTokens.length routingIndex routingTokens.length) {if (bindingTokens[bindingIndex].equals(*)) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex;routingIndex;continue;} else if (bindingTokens[bindingIndex].equals(#)) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex;if (bindingIndex bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex;routingIndex;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex;routingIndex;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的.if (bindingIndex bindingTokens.length routingIndex routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i routingIndex; i routingTokens.length; i) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;} }订阅消息 消费者 咱们要创建一个消费者类,其中有消费者的信息,以及该消费者订阅的队列的名字, 以及消息应答模式,以及回调函数. 回调函数是什么? 是让消费者自己设定一个函数,当有新的消息转发给该消费者后,就执行这个回调函数 /*** 消费者的回调函数*/ FunctionalInterface public interface Consumer {// Delivery 是 “投递” 的意思这个方法预期是在每次服务器收到消息之后来调用// 通过这个方法把消息推送给对应的消费者// 这里的方法名与参数列表都是参考 RabbitMQ 的void handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException; }/*** 表示一个消费者(完整的执行环境)*/ Data public class ConsumerEnv {// 消费者信息private String consumerTag;// 订阅队列的名字private String queueName;// true - 自动应答, false - 手动应答private boolean autoAck;// 回调函数private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag consumerTag;this.queueName queueName;this.autoAck autoAck;this.consumer consumer;} }队列如何给订阅的消费者发送消息 这里咱们要想清楚的是,一个队列可以有多个消费者, 新来的消息要转发给哪个消费者呢? 咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息. 因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者, /*** 表示一个存储消息的队列* MSG 》Message* 消息队列的使用者是消费者*/ Data public class MSGQueue {// 表示队列的身份标识private String name;// 表示队列是否持久化private boolean durable false;// true - 这个队列只能被一个消费者使用false - 大家都能使用这个队列// 后续代码不实现相关功能private boolean exclusive false;// true - 没人使用后自动删除false - 没人使用不自动删除private boolean autoDelete false;// 表示扩展参数后续代码没有实现private MapString,Object arguments new HashMap();// 当前队列有哪些消费者订阅了private ListConsumerEnv consumerEnvList new ArrayList();// 记录当前取到了第几个消费者方便实现轮询策略private AtomicInteger consumerSeq new AtomicInteger(0);// 添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}// 删除订阅者暂不考虑// 挑选一个订阅者用来处理当前的消息(按照轮询的方式)public ConsumerEnv chooseConsumer() {// 无人订阅if (consumerEnvList.size() 0) {return null;}int index consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public void setArguments(String arguments) {ObjectMapper objectMapper new ObjectMapper();try {this.arguments objectMapper.readValue(arguments, new TypeReferenceHashMapString,Object() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(MapString,Object arguments) {this.arguments arguments;}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);} }自动发送消息至订阅者 那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者. 咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者. 应答消息 应答消息共有两种模式. 自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)手动应答:需要消费者手动调用应答方法(确保消费者收到消息了) 三、代码编写 /*** 通过这个类, 来表示 虚拟主机.* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.* 同时提供 api 供上层调用.* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了.*/Data public class VirtualHost {private String virtualHostName;private Router router new Router();private DiskDataCenter diskDataCenter new DiskDataCenter();private MemoryDataCenter memoryDataCenter new MemoryDataCenter();// 操作交换机的锁对象,防止多线程操作交换机时,出现线程安全问题,如 创建了两个拥有相同身份标识的交换机private final Object exchangeLocker new Object();// 操作队列的锁对象,防止多线程,操作队列时,出现线程安全问题,如 创建了两个拥有相同身份标识的队列private final Object queueLocker new Object();// 消费者管理中心ConsumerManager consumerManager new ConsumerManager(this);public VirtualHost(String virtualHostName) {this.virtualHostName virtualHostName;// MemoryDataCenter 并不需要初始化当 new MemoryDataCenter();时里面所需的数据结构也都已经创建好了// DiskDataCenter 就需要初始化操作去建库建表建文件和初始化数据的设定diskDataCenter.init();try {// 将硬盘中已有的数据恢复到内存中memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();}}// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,MapString,Object arguments) {exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {// 把交换机的名字, 加上虚拟主机作为前缀.// 1.判断交换机是否存在直接从内存查询if (memoryDataCenter.getExchange(exchangeName) ! null) {// 该交换机已经存在!System.out.println([VirtualHost] 交换机已经存在! exchangeName exchangeName);return true;}// 2.创建交换机Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3.把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);System.out.println([VirtualHost] 交换机创建成功! exchangeName exchangeName);// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.}return true;} catch (Exception e) {System.out.println([VirtualHost] 交换机创建失败! exchangeName exchangeName);e.printStackTrace();return false;}}// 删除交换机public boolean exchangeDelete(String exchangeName) {exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker) {// 1.判断交换机是否存在Exchange existsExchange memoryDataCenter.getExchange(exchangeName);if (existsExchange null) {throw new MqException([virtualHostName] 交换机不存在无法删除!);}if (existsExchange.isDurable()) {// 2.删除硬盘上的数据diskDataCenter.deleteExchange(exchangeName);}// 3.删除内存上的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println([VirtualHost] 交换机删除成功! exchangeName exchangeName);}return true;}catch (Exception e) {System.out.println([VirtualHost] 交换机删除失败! exchangeName exchangeName);e.printStackTrace();return false;}}// 创建队列public boolean queueuDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,MapString,Object arguments) {// 给队列名字加前缀queueName virtualHostName queueName;try {synchronized (queueLocker) {// 1.判断队列是否存在if (memoryDataCenter.getQueue(queueName) ! null) {System.out.println([VirtualHost] 队列已经存在! queueName queueName);return true;}// 2.创建队列MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3.向硬盘中写入数据if (durable) {diskDataCenter.insertQueue(queue);}// 4.向内存中写入数据memoryDataCenter.insertQueue(queue);System.out.println([VirtualHost] 队列创建成功! queueName queueName);}return true;} catch (Exception e) {System.out.println([VirtualHost] 队列创建失败! queueName queueName);e.printStackTrace();return false;}}// 删除队列public boolean queueDelete(String queueeName) {queueeName virtualHostName queueeName;try {synchronized (queueLocker) {MSGQueue existsQueue memoryDataCenter.getQueue(queueeName);// 1.判断交换机是否存在if (existsQueue null) {throw new MqException([virtualHostName] 队列不存在无法删除!);}// 2.删除硬盘上的数据if (existsQueue.isDurable()) {diskDataCenter.deleteQueue(queueeName);}// 3.删除内存上的数据memoryDataCenter.deleteQueue(queueeName);System.out.println([VirtualHost] 队列删除成功! queueeName queueeName);}return true;}catch (Exception e) {System.out.println([VirtualHost] 队列删除失败! queueeName queueeName);e.printStackTrace();return false;}}// 创建绑定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey) {exchangeName virtualHostName exchangeName;queueName virtualHostName queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在对应的交换机 队列是否存在,bindingKey是否合法if (memoryDataCenter.getBinding(exchangeName, queueName) ! null) {return true;}Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException([VirtualHost] 交换机不存在! exchangeName exchangeName);}MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 队列不存在! queueName queueName);}if (!router.checkBindingKey(bindingKey)) {throw new MqException([VirtualHost] bindingKey 非法! bindingKey bindingKey);}// 2.创建绑定Binding binding new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 3.写入硬盘if (exchange.isDurable() queue.isDurable()) {diskDataCenter.insertBinding(binding);}// 4.写入内存memoryDataCenter.insertBinding(binding);System.out.println([VirtualHost] 绑定创建成功! exchangeName exchangeName , queueName queueName);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 绑定创建失败! exchangeName exchangeName , queueName queueName);e.printStackTrace();return false;}}// 删除绑定public boolean bindingDelete(String exchangeName,String queueName) {exchangeName virtualHostName exchangeName;queueName virtualHostName queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在对应的交换机 队列是否存在,bindingKey是否合法Binding binding memoryDataCenter.getBinding(exchangeName, queueName);if (binding null) {throw new MqException([VirtualHost] 删除绑定失败! 绑定不存在! exchangeName exchangeName , queueName queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println([VirtualHost] 删除绑定成功!);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 删除绑定失败!);e.printStackTrace();return false;}}// 发送消息到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {exchangeName virtualHostName exchangeName;// 1.检查 routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException([VirtualHost] routingKey 非法! routingKey routingKey);}// 2.查找交换机对象Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException([VirtualHost] 交换机不存在! exchangeName exchangeName);}// 3.判断交换机绑定类型if (exchange.getType() ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.// 4.查找对应的队列String queueName virtualHostName routingKey;MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 队列不存在! queueName queueName);}// 5.构造 messageMessage message Message.createMessageWithId(routingKey, basicProperties, body);// 6.发送消息sendMessage(queue,message);return true;} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMapString, Binding bindingsMap memoryDataCenter.getBinding(exchangeName);for (Map.EntryString,Binding entry : bindingsMap.entrySet()) {// 6.获取绑定对象判断对应的队列是否存在Binding binding entry.getValue();MSGQueue queue memoryDataCenter.getQueue(binding.getQueueName());if (queue null){// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println([VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName binding.getQueueName());continue;}// 7.构造消息Message message Message.createMessageWithId(routingKey, basicProperties, body);// 8.判断该消息是否可以发送给该队列if (router.route(exchange.getType(),binding,message)) {sendMessage(queue,message);}}}return true;} catch (Exception e) {System.out.println([VirtualHost] 消息发送失败!);e.printStackTrace();return false;}}// 发送消息到硬盘与内存private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息即 把消息写入到 硬盘 和 内存 中int deliverMode message.getDeliverMode();// deliverMode 为 1 不持久化为 2 持久化if (deliverMode 2) {diskDataCenter.sendMessage(queue,message);}// 写入内存memoryDataCenter.sendMessage(queue,message);// 通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());}// 订阅消息// 添加一个队列的订阅者当队列收到消息之后就要把消息推送给对应的订阅者// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后应答的方式 为 true 自动应答为 false 手动应答// consumer: 是一个回调函数,此处类型设定成函数式接口这样后续调用 basicConsume 并且传实参的时候就可以写作 lambda样子public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println([VirtualHost] basicConsume成功 queueName queueName);return true;} catch (Exception e) {System.out.println([VirtualHost] basicConsume失败 queueName queueName);e.printStackTrace();return false;}}// 消息的手动应答public boolean basicAck(String queueName,String messageId) {queueName virtualHostName queueName;try {// 1.获取消息和队列Message message memoryDataCenter.getMessage(messageId);if (message null) {throw new MqException([VirtualHost] 要确认的消息不存在! messageId messageId);}MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 要确认的队列不存在! queueName queueName);}// 2.删除硬盘上的消息if (message.getDeliverMode() 2) {diskDataCenter.deleteMessage(queue, message);}// 3.删除消息中心的消息memoryDataCenter.removeMessage(messageId);// 4.删除待确认消息中的消息memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println([VirtualHost] basicAck 成功! 消息被成功确认! queueName queueName , messageId messageId);return true;} catch (Exception e) {System.out.println([VirtualHost] basicAck 失败! 消息被成功确认! queueName queueName , messageId messageId);e.printStackTrace();return false;}}}
http://www.yingshimen.cn/news/31568/

相关文章:

  • 商城英文googleseo排名
  • 金华做网站最专业的公司互联网产品品牌推广
  • 个人网站命名的要求企业网站底部
  • 网站建设维护费合同网站建设如果没有源代码
  • 建设银行对账单查询网站求大神帮忙做网站
  • 网站备案接入商上海ktv目前营业情况
  • 广州市白云区建设局 网站常州建设银行新北分行网站
  • 保险网站西安手机网站建设
  • 信宜市建设局网站济南软件外包
  • 怎么做垂直门户网站怎样免费申请永久域名
  • 贵阳网站建设公司网站建设及服务合同书
  • 网站定制开发需要什么资质学编程需要具备什么条件
  • 网站定位广告响应式网站是指自适应吗
  • 网站建设后台管理流程网站服务器哪家好些
  • 哪些网站做免费送东西的广告6wordpress 发布 工具
  • 仿网站建设教程视频哪些网站可以做网站
  • wap网站模板下载龙岗网站建设哪家技术好
  • 云南旅行社网站建设一手房发帖网站怎样做
  • 家具网站开发设计论文网站建设耂首先金手指
  • 网站建设说明书怎么写手机自助建站永久免费
  • 做网站推广需要做什么石家庄城乡建设厅网站
  • h5网站用什么软件做东营市建设监理协会网站
  • 青浦徐泾网站建设抖音小程序推广怎么挂才有收益
  • 河北seo网站设计做直播网站视频教程
  • phpcms网站后台cms建站详细教程
  • 高米店网站建设公司站群源码
  • 网站建设 开票企业电话
  • 可以做视频推广的网站包装设计模板网站
  • 河南住房和城乡建设局网站php做商品网站
  • 电脑网站 源码临漳网站建站