您好,欢迎来到爱玩科技网。
搜索
您的当前位置:首页RocketMQ—Producer(五)路由队列选择

RocketMQ—Producer(五)路由队列选择

来源:爱玩科技网

前言

路由队列选择的作用在于发送消息时可以指定发送到某个broker队列,或均衡发送到broker队列,其作用就是在于选择合适的队列进行消息发送。

目前客户端队列选择分为三种方式:

  • 第一种:可根据MessageQueueSelector的实现或自扩展实现选择队列;
  • 第二种:未开启Broker故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制(默认是此种实现方式);
  • 第三种:开启Broker故障延迟机制(sendLatencyFaultEnable:true),会根据brokerName的可用性选择队列发送。

接下来我们就以这三种方式展开讨论。

一、队列选择

MessageQueueSelector方式队列选择在了解MessageQueueSelector的方式进行队列选择时,我们先回顾下MQProducer接口:里面有多个方法签名带参数MessageQueueSelector,其实就是表明使用此种方式选择消息队列需要显示穿参数才能使用;用下面这个接口方法进行举例分析:

SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

接下来我们直接看内部实现源码如何实现的:

DefaultMQProducerImpl#sendSelectImpl

private SendResult sendSelectImpl(
  Message msg, MessageQueueSelector selector,
  Object arg, final CommunicationMode communicationMode,
  final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, 
  MQBrokerException, InterruptedException {
  long beginStartTime = System.currentTimeMillis();
  this.makeSureStateOK();     // 状态检测
  Validators.checkMessage(msg, this.defaultMQProducer); // 消息验证

  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  if (topicPublishInfo != null && topicPublishInfo.ok()) {
      MessageQueue mq = null;
      try {
          mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); // 1-队列选择
      } catch (Throwable e) {
          throw new MQClientException("select message queue throwed exception.", e);
      }

      long costTime = System.currentTimeMillis() - beginStartTime;
      if (timeout < costTime) {
          throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
      }
      if (mq != null) {
          return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); // 熟悉的配方,前面已经分析
      } else {
          throw new MQClientException("select message queue return null.", null); // 异常抛出
      }
  }
  throw new MQClientException("No route info for this topic, "

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- aiwanbo.com 版权所有 赣ICP备2024042808号-3

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务