【88bifa必发唯一官网】罗克etMQ运行进度之Consumer,源代码解读

内部直接调用DefaultMQPullConsumerImpl的start()方法,Consumer消费拉取的消息的方式有两种,消费消息可以分成pull和push方式

88bifa必发唯一官网 13

Pull消费者客户端(主动拉取音讯的消费者)即组织了DefaultMQPullConsumer对象,DefaultMQPullConsumer承袭了ClientConfig类。大家先看其构造方法

consumer 1.启动

1. 简介

消费音信能够分成pull和push格局,push新闻使用相比轻松,因为罗克etMQ已经援助大家封装了绝大大多流水生产线,大家若是重写回调函数就能够。

下边大家就以push消费方式为例,分析下那1部分源代码流程。

[java] view plain copy
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {  
    this.consumerGroup = consumerGroup;  
    defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);  
}  

分别其余新闻中间件由broker做负载均衡并主动向consumer投递音信,罗克etMq是遵照拉情势拉取音讯,consumer做负载均衡并经过长轮询向broker拉音讯。

贰. 消费者运转流程图

88bifa必发唯一官网 1

 

Consumer消费拉取的新闻的方法有三种

三.顾客类图

88bifa必发唯一官网 2

此间只是简短设置了consumerGroup消费者组名,表示顾客属于哪个组。构造了DefaultMQPullConsumerImpl的实例,DefaultMQPullConsumerImpl的构造方法异常的粗略,只是绑定了DefaultMQPullConsumer、配置了流传的rpcHook。

一.     
Push方式:rocketmq已经提供了很完美的贯彻,consumer通过长轮询拉取音讯后回调MessageListener接口完毕形成消费,应用体系壹旦MessageListener完结工作逻辑就能够

四. 买主源代码流程

DefaultMQPullConsumer内部卷入了DefaultMQPullConsumerImpl,个中还维护这有的安顿消息。这里维护着顾客订阅的topic集结。

二.     
Pull方式:完全由业务种类去调整,定期拉取新闻,钦点队列消费等等,当然这里需求专门的学问连串去依据本身的作业供给去贯彻

四.一 消费客户端运维

依附官方(https://github.com/apache/rocketmq)提供的例证,Consumer.java里面使用DefaultMQPushConsumer运维音讯消费者,如下%E6%8F%90%E4%BE%9B%E7%9A%84%E4%BE%8B%E5%AD%90%EF%BC%8CConsumer.java%E9%87%8C%E9%9D%A2%E4%BD%BF%E7%94%A8DefaultMQPushConsumer%E5%90%AF%E5%8A%A8%E6%B6%88%E6%81%AF%E6%B6%88%E8%B4%B9%E8%80%85%EF%BC%8C%E5%A6%82%E4%B8%8B):

//初始化DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
//设置命名服务,参考namesrv的启动
consumer.setNamesrvAddr("localhost:9876");
//设置消费起始位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅消费的主题和过滤符
consumer.subscribe("TopicTest", "*");
//设置消息回调函数
consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
      ConsumeConcurrentlyContext context) {
      System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
});
//启动消费者
consumer.start();
[java] view plain copy
private Set<String> registerTopics = new HashSet<String>();  

上面介绍暗中认可以push方式为主,因为诸多是由push消费形式来利用rocketmq的。 

四.2 新闻者运维

大家随后看consumer.start()方法

@Override
public void start() throws MQClientException {
     this.defaultMQPushConsumerImpl.start();
}

DefaultMQPushConsumerImpl.java

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                ...

                this.checkConfig();//检查参数

                ...

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                ...

                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                ...
                this.offsetStore.load();

                ...

                this.consumeMessageService.start();

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                ...

                mQClientFactory.start();

                this.serviceState = ServiceState.RUNNING;
            ...
        }

        ...
    }

在开端化一群参数之后,然后调用mQClientFactory.start();

private MQClientInstance mQClientFactory;

实在那么些命名有一点点意想不到啊(Ali程序员手抖了?),为何MQClientInstance类型的变量名称为mQClientFactory

这继续看MQClientInstance的start

一体消费者客户端的起步,调用了DefaultMQPullConsumer的start()方法,内部直接调用DefaultMQPullConsumerImpl的start()方法,这一个start方法加了synchronized修饰。

consumer运行流程

4.3 MQClientInstance

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    ...
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
               ...
            }
        }
    }

各行代码的效率就如源代码里面包车型地铁注释一样,器重看下pullMessageService.start和rebalance瑟维斯.start
pullMessageService.start成效是绵绵从1个围堵队列之中获得pullRequest请求,然后去罗克etMQ
broker里面获取消息。
设若未有pullRequest的话,那么它将卡住。
那么,pullRequest请求是怎么放进去的啊?这些将在看rebalanceService了。

[java] view plain copy
    public synchronized void start() throws MQClientException {  
        switch (this.serviceState) {  
            case CREATE_JUST:  
                this.serviceState = ServiceState.START_FAILED;  

                this.checkConfig();  

                this.copySubscription();  

                if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {  
                    this.defaultMQPullConsumer.changeInstanceNameToPID();  
                }  

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer  
                                                                                                                , this.rpcHook);  

                this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());  
                this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());  
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());  
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);  

                this.pullAPIWrapper = new PullAPIWrapper(  
                    mQClientFactory,  
                    this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());  
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);  

                if (this.defaultMQPullConsumer.getOffsetStore() != null) {  
                    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();  
                } else {  
                    switch (this.defaultMQPullConsumer.getMessageModel()) {  
                        case BROADCASTING:  
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer  
                                                                                                        .getConsumerGroup());  
                            break;  
                        case CLUSTERING:  
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer  
                                                                                                        .getConsumerGroup());  
                            break;  
                        default:  
                            break;  
                    }  
                    this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);  
                }  

                this.offsetStore.load();  

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);  
                if (!registerOK) {  
                    this.serviceState = ServiceState.CREATE_JUST;  

                    throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()  
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl  
                                                                                            .GROUP_NAME_DUPLICATE_URL), null);  
                }  

                mQClientFactory.start();  
                log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());  
                this.serviceState = ServiceState.RUNNING;  
                break;  
            case RUNNING:  
            case START_FAILED:  
            case SHUTDOWN_ALREADY:  
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "  
                    + this.serviceState  
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),  
                    null);  
            default:  
                break;  
        }  

    }  

指定group

4.4 pullMessageService.start

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

@Override
public void run() {
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                ..
            }
     }
}

顺手说一句,pullMessage瑟维斯和rebalanceService都以接二连三自ServiceThread

public class PullMessageService extends ServiceThread {}

ServiceThread轻巧封装了线程的起步,调用start方法,就能够调用它的run方法。

public ServiceThread() {
        this.thread = new Thread(this, this.getServiceName()); //把当前对象作为runnable传入线程构造函数
    }

    public void start() {
        this.thread.start();
    }

如此起步线程将在方便一点,看起来舒服一点。

哦,继续剖析此前的深入分析。

从pullMessage瑟维斯的run方法能够观望它是从阻塞队列pullRequestQueue里面获取pullRequest,假诺未有那么将封堵。(如若不清楚java阻塞的运用,清百度)

实行完一遍pullReqeust之后,再持续下贰回获得阻塞队列,因为它是个while循环。

就此,我们须求剖析下pullRequest放进队列的流程,也正是rebalanceService.

一早先的serverState的状态自然为CREAT_JUST,调用checkConfig(),个中第1对ConsumerGroup进行表达,非空,合法(符合正则规则,且长度不抢先配置最大值),且不为私下认可值(幸免消费者集群名抵触),然后对顾客新闻格局、音讯队列分配算法举办非空、合法校验。

订阅topic

4.5 rebalanceService

public class RebalanceService extends ServiceThread {
    @Override
    public void run() {
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}       

MQClientInstance.java

public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }

DefaultMQPushConsumerImpl.java

@Override
    public void doRebalance() {
        if (!this.pause) {
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
    }

RebalanceImpl.java

public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }

private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                ....
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {

                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

同步跟下来,来到了RebalanceImpl.java的rebalanceByTopic方法,那一个艺术里面有四个case(Broadcasting和Clustering)也正是消息消费的八个方式,广播和集群音讯。
播音的话,全体的监听者都会摄取音信,集群的话,唯有2个顾客能够接到,我们以集群信息为例。
先大概解释下在rebalanceByTopic里面要做如何。

  1. 从namesrv获取broker里面这几个topic的买主多少
  2. 从namesrv获取broker那些topic的新闻队列数量
  3. 据说前两部获得的数额进行负荷均衡计算,总括出当下消费者客户端分配到的音信队列。
  4. 遵照分配到的信息队列,去broker请求这些音信队列之中的音信。

上边代码毫米mqset就是这一个topic的消费队列,一般是四个,然而那一个值是能够修改的,存储的岗位在~/store/config/topics.json里面,比如:

"TopicTest":{
        "order":false,
        "perm":6,
        "readQueueNums":4,
        "topicFilterType":"SINGLE_TAG",
        "topicName":"TopicTest",
        "topicSysFlag":0,
        "writeQueueNums":4
}    

能够修改readQueueNums和writeQueueNums为别的值

try {
     allocateResult = strategy.allocate(
                 this.consumerGroup,
         this.mQClientFactory.getClientId(),
         mqAll,
         cidAll);
  } catch (Throwable e) {
         return;
  }

这段代码便是客户端依照取获得的那几个topic消费者多少和音信队列数量,使用负载均衡计谋总结出方今客户端能够选取的音信队列。
负载均衡计策代码在那一个岗位。

88bifa必发唯一官网 3

那大家延续4.4pullMessageService.start深入分析,因为rebalanceService已经把pullRequest放到了绿灯队列。

关于消费者新闻格局有布罗兹Casting(广播)跟Clustering(集群)两种、暗中同意是Clustering(集群)配置在DefaultMQPullConsumer中。关于消费者的新闻分配算法,在DefaultMQPullConsumer中实现存暗许的新闻分配算法,allocateMessageQueueStrategy
= new
AllocateMessageQueueAveragely();(平均分配算法)。其促成了AllocateMessageQueueStrategy接口,注重看其促成的allocate()方法。

登记消息监听管理器,当消息赶到时费用音讯

4.6 PullMessageService.run

@Override
    public void run() {
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {

            }
        }
    }

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {

        }
    }

调用到DefaultMQPushConsumerImpl.pullMessage(pullRequest)那一个点子里面。

[java] view plain copy
@Override  
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,  
    List<String> cidAll) {  
    if (currentCID == null || currentCID.length() < 1) {  
        throw new IllegalArgumentException("currentCID is empty");  
    }  
    if (mqAll == null || mqAll.isEmpty()) {  
        throw new IllegalArgumentException("mqAll is null or mqAll empty");  
    }  
    if (cidAll == null || cidAll.isEmpty()) {  
        throw new IllegalArgumentException("cidAll is null or cidAll empty");  
    }  

    List<MessageQueue> result = new ArrayList<MessageQueue>();  
    if (!cidAll.contains(currentCID)) {  
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",  
            consumerGroup,  
            currentCID,  
            cidAll);  
        return result;  
    }  

    int index = cidAll.indexOf(currentCID);  
    int mod = mqAll.size() % cidAll.size();  
    int averageSize =  
        mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()  
            + 1 : mqAll.size() / cidAll.size());  
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;  
    int range = Math.min(averageSize, mqAll.size() - startIndex);  
    for (int i = 0; i < range; i++) {  
        result.add(mqAll.get((startIndex + i) % mqAll.size()));  
    }  
    return result;  
}  

消费端Start

4.6.1

public void pullMessage(final PullRequest pullRequest) {
        ...

        final long beginTimestamp = System.currentTimeMillis();

        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                System.out.printf("pullcallback onsuccess: " + pullResult + " %n");
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {

                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispathToConsume);
                            }
                            break;
                    }
                }
            }

            @Override
            public void onException(Throwable e) {
                                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };

        try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }

地方这段代码主要正是安装音讯获得后的回调函数PullCallback
pullCallback,然后调用pullAPIWrapper.pullKernelImpl去Broker里面获取音信。

获得成功后,就能回调pullCallback的onSuccess方法的FOUND case分支。

在pullCallback的onSucess方法的FOUND
case分支,会依据回调是联合依旧异步,分为二种处境,如下:

88bifa必发唯一官网 4

联手新闻和异步音信差距的源代码达成现在再讲。

 

        复制订阅关系

流传的参数有日前消费者id,全部信息队列数组,以及当前具备顾客数组。先轻易表达非空,再经过消费者数组大小跟新闻队列大小依据平均算法算出脚下消费者该分红哪些消息队列会集。逻辑简单。罗克etMQ还提供了循环平均、一致性哈希、配置分配等算法,这里私下认可使用平均分配。

        初始化rebalance变量

大家再重临DefaultMQPullConsumerImpl的start()方法,checkConfig后,调用copySubscription()方法,将配备在DefaultMQPullConsumer中的topic音讯构产生并构形成subscriptionData数据结构,以topic为key以subscriptionData为value以键值对情势存到rebalanceImpl的subscriptionInner中。

        营造offsetStore消费进程存款和储蓄对象

[java] view plain copy
private void copySubscription() throws MQClientException {  
    try {  
        Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();  
        if (registerTopics != null) {  
            for (final String topic : registerTopics) {  
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),  
                    topic, SubscriptionData.SUB_ALL);  
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);  
            }  
        }  
    } catch (Exception e) {  
        throw new MQClientException("subscription exception", e);  
    }  
}  

        运行消费音信服务

  

        向mqClientFactory注册本消费者

接下去从MQCLientManager中得到MQClient的实例,这些手续跟生产者客户端一样。

        运维client端远程通讯

再将来是对rebalanceImpl的计划,我们根本看下rebalanceImpl,它是在DefaultMQPullConsumerImpl成员中平昔组织private
RebalanceImpl rebalanceImpl = new
RebalancePullImpl(this);即在DefaultMQPullConsumerImpl开端化的时候协会。接下来对其消费者组名、音讯情势(私下认可集群)、队列分配算法(暗中同意平均分配)、消费者客户端实例进行安插,配置新闻都是从DefaultMQPullConsumer中获得。

        运维按期职责

[java] view plain copy
public abstract class RebalanceImpl {  
    protected static final Logger log = ClientLogger.getLog();  
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);  
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =  
        new ConcurrentHashMap<String, Set<MessageQueue>>();  
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =  
        new ConcurrentHashMap<String, SubscriptionData>();  
    protected String consumerGroup;  
    protected MessageModel messageModel;  
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;  
    protected MQClientInstance mQClientFactory;  

                  按时获得nameserver地址

接下去构造了PullAPIWrapper,仅仅调用其构造方法,轻易的配备下

                  定期从nameserver获取topic路由新闻

[java] view plain copy
public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {  
    this.mQClientFactory = mQClientFactory;  
    this.consumerGroup = consumerGroup;  
    this.unitMode = unitMode;  
}  

                  定期清理下线的borker

 

                  按时向具备broker发送心跳消息,(包蕴订阅关系)

接下来起始化消费者的offsetStore,offset即偏移量,能够理解为消费进程,这里依据分歧的新闻情势来选用分歧的国策。假使是广播情势,那么富有消费者都应有吸收接纳订阅的新闻,那么每一种顾客只应该团结开销的开支队列的快慢,那么必要把消费进程即offsetStore存于地方使用LocalFileOffsetStroe,相反的例如是集群情势,那么集群中的消费者来平均开支新闻队列,那么应该把消费进程存于远程应用RemoteBrokerOffsetStore。然后调用相应的load方法加载。

                 
定期长久化Consumer消费进程(广播存款和储蓄到本地,集群存款和储蓄到Broker)

随后将如今消费者登记在MQ客户端实例上未来,调用MQClientInstance的start()方法,运行消费者客户端。

                  总结音讯关照

[java] view plain copy
    public void start() throws MQClientException {  

        synchronized (this) {  
            switch (this.serviceState) {  
                case CREATE_JUST:  
                    this.serviceState = ServiceState.START_FAILED;  
                    // If not specified,looking address from name server  
                    if (null == this.clientConfig.getNamesrvAddr()) {  
                        this.mQClientAPIImpl.fetchNameServerAddr();  
                    }  
                    // Start request-response channel  
                    this.mQClientAPIImpl.start();  
                    // Start various schedule tasks  
                    this.startScheduledTask();  
                    // Start pull service  
                    this.pullMessageService.start();  
                    // Start rebalance service  
                    this.rebalanceService.start();  
                    // Start push service  
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);  
                    log.info("the client factory [{}] start OK", this.clientId);  
                    this.serviceState = ServiceState.RUNNING;  
                    break;  
                case RUNNING:  
                    break;  
                case SHUTDOWN_ALREADY:  
                    break;  
                case START_FAILED:  
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed."  
                                                                                                                                , null);  
                default:  
                    break;  
            }  
        }  
    }  

                  动态调解消费线程池

 

        运维拉音讯服务PullMessageService

看样子此间应该很熟悉,跟生产者客户端这里是同等段代码,无非深入分析路由音信并做到路由新闻的布局,运维netty客户端,运维定时义务(定期更新从名称服务器获取路由音讯更新本地路由消息,心跳,调解线程数量),前边运行pull
server、rebalance service、push
service末了把serviceState状态设为Running表示客户端运营。

        运维消费端负载均衡服务Rebalance瑟维斯

我们在这里关键看下RebalanceService的运转。上边贴出的是RebalanceService的run()方法。

        从namesrv更新topic路由新闻

[java] view plain copy
@Override  
public void run() {  
    log.info(this.getServiceName() + " service started");  

    while (!this.isStopped()) {  
        this.waitForRunning(waitInterval);  
        this.mqClientFactory.doRebalance();  
    }  

    log.info(this.getServiceName() + " service end");  
}  

        向全体broker发送心跳音讯,(包蕴订阅关系)

能够看来,只要那个线程未有被截至(客户端没休息),会一贯循环调用客户端的doRebalance()方法。

        唤醒Rebalance服务线程

[java] view plain copy
public void doRebalance() {  
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {  
        MQConsumerInner impl = entry.getValue();  
        if (impl != null) {  
            try {  
                impl.doRebalance();  
            } catch (Throwable e) {  
                log.error("doRebalance exception", e);  
            }  
        }  
    }  
}  

88bifa必发唯一官网 5

MQClientInstance遍历consumerTable(在此以前注册的时候以consumerGroup为key,以顾客客户端DefaultMQPullConsumerImpl为value存入consumerTable中)中的每种成分,循环调用其成分的doRebalance()方法。那我们看DefaultMQPullConsumerImpl的doRebalance方法。

consumer 二.消费端负载均衡

1 [java] view plain copy
2 @Override  
3 public void doRebalance() {  
4     if (this.rebalanceImpl != null) {  
5         this.rebalanceImpl.doRebalance(false);  
6     }  
7 }  

消费端负载均衡

直白调用了rebalanceImpl的doRebalance方法

88bifa必发唯一官网 6

[java] view plain copy
public void doRebalance(final boolean isOrder) {  
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();  
    if (subTable != null) {  
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {  
            final String topic = entry.getKey();  
            try {  
                this.rebalanceByTopic(topic, isOrder);  
            } catch (Throwable e) {  
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  
                    log.warn("rebalanceByTopic Exception", e);  
                }  
            }  
        }  
    }  

    this.truncateMessageQueueNotMyTopic();  
}  

88bifa必发唯一官网 7

能够看来先取得subTable即subscriptionInner,在此之前基于配置的各种topic生成的SubscriptionData数据结构的map。先遍历该map,获得每一个topic,针对种种topic调用rebalanceByTopic()

费用端会通过RebalanceService线程,10分钟做一回基于topic下的持有队列负载

 1 [java] view plain copy
 2     private void rebalanceByTopic(final String topic, final boolean isOrder) {  
 3         switch (messageModel) {  
 4             case BROADCASTING: {  
 5                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);  
 6                 if (mqSet != null) {  
 7                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);  
 8                     if (changed) {  
 9                         this.messageQueueChanged(topic, mqSet, mqSet);  
10                         log.info("messageQueueChanged {} {} {} {}",  
11                             consumerGroup,  
12                             topic,  
13                             mqSet,  
14                             mqSet);  
15                     }  
16                 } else {  
17                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  
18                 }  
19                 break;  
20             }  
21             case CLUSTERING: {  
22                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);  
23                 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);  
24                 if (null == mqSet) {  
25                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  
26                         log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  
27                     }  
28                 }  
29   
30                 if (null == cidAll) {  
31                     log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);  
32                 }  
33   
34                 if (mqSet != null && cidAll != null) {  
35                     List<MessageQueue> mqAll = new ArrayList<MessageQueue>();  
36                     mqAll.addAll(mqSet);  
37   
38                     Collections.sort(mqAll);  
39                     Collections.sort(cidAll);  
40   
41                     AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;  
42   
43                     List<MessageQueue> allocateResult = null;  
44                     try {  
45                         allocateResult = strategy.allocate(  
46                             this.consumerGroup,  
47                             this.mQClientFactory.getClientId(),  
48                             mqAll,  
49                             cidAll);  
50                     } catch (Throwable e) {  
51                         log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",  
52                              strategy.getName(), e);  
53                         return;  
54                     }  
55   
56                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();  
57                     if (allocateResult != null) {  
58                         allocateResultSet.addAll(allocateResult);  
59                     }  
60   
61                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);  
62                     if (changed) {  
63                         log.info(  
64                             "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}  
65                         , cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",  
66                             strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),  
67                             allocateResultSet.size(), allocateResultSet);  
68                         this.messageQueueChanged(topic, mqSet, allocateResultSet);  
69                     }  
70                 }  
71                 break;  
72             }  
73             default:  
74                 break;  
75         }  
76     }  

消费端遍历自个儿的有着topic,依次调rebalanceByTopic

咱俩先珍视关心集群方式下,先猎取topic的地面路由新闻,再通过topic跟那些消费者的组名,调用netty客户端的协同网络访问topic钦赐的broker,从broker端获得与其总是的且是内定消费组名下订阅内定topic的顾客id的群集。然后采用暗许的分红算法的allocate()实行队列给顾客平均分配。然后调用updateProcessQueueTableInRebalance()方法判定是或不是再一次队列分配。

基于topic获取此topic下的具有queue

 1 [java] view plain copy
 2 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,  
 3     final boolean isOrder) {  
 4     boolean changed = false;  
 5   
 6     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();  
 7     while (it.hasNext()) {  
 8         Entry<MessageQueue, ProcessQueue> next = it.next();  
 9         MessageQueue mq = next.getKey();  
10         ProcessQueue pq = next.getValue();  
11   
12         if (mq.getTopic().equals(topic)) {  
13             if (!mqSet.contains(mq)) {  
14                 pq.setDropped(true);  
15                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {  
16                     it.remove();  
17                     changed = true;  
18                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);  
19                 }  
20             } else if (pq.isPullExpired()) {  
21                 switch (this.consumeType()) {  
22                     case CONSUME_ACTIVELY:  
23                         break;  
24                     case CONSUME_PASSIVELY:  
25                         pq.setDropped(true);  
26                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {  
27                             it.remove();  
28                             changed = true;  
29                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",  
30                                 consumerGroup, mq);  
31                         }  
32                         break;  
33                     default:  
34                         break;  
35                 }  
36             }  
37         }  
38     }  
39   
40     List<PullRequest> pullRequestList = new ArrayList<PullRequest>();  
41     for (MessageQueue mq : mqSet) {  
42         if (!this.processQueueTable.containsKey(mq)) {  
43             if (isOrder && !this.lock(mq)) {  
44                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);  
45                 continue;  
46             }  
47   
48             this.removeDirtyOffset(mq);  
49             ProcessQueue pq = new ProcessQueue();  
50             long nextOffset = this.computePullFromWhere(mq);  
51             if (nextOffset >= 0) {  
52                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);  
53                 if (pre != null) {  
54                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);  
55                 } else {  
56                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);  
57                     PullRequest pullRequest = new PullRequest();  
58                     pullRequest.setConsumerGroup(consumerGroup);  
59                     pullRequest.setNextOffset(nextOffset);  
60                     pullRequest.setMessageQueue(mq);  
61                     pullRequest.setProcessQueue(pq);  
62                     pullRequestList.add(pullRequest);  
63                     changed = true;  
64                 }  
65             } else {  
66                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);  
67             }  
68         }  
69     }  
70   
71     this.dispatchPullRequest(pullRequestList);  
72   
73     return changed;  
74 }  

选料1台broker获取基于group的具有消费端(有心跳向全数broker注册客户端音信)

先遍历processQueueTable,看其topic下的该管理音信队列是或不是还是应该管理,由于新分配之后,音信队列只怕会转移,所以原该管理的音信队列恐怕没供给管理,由此没供给管理的音讯队列移除。当然也可能有不小恐怕多出必要管理的音信队列,于是供给树立其与processQueue的应和关系,先调用computerPullFromWhere得到该条新闻后一次拉取数据的职责,在RebalancePullImpl中落到实处了该格局直接再次回到0,把该管理的mq封装成pq后,更新到processQueueTable中。若有立异,无论是扩充依旧删除,则changed都设为true。(那个地点讲的有个别模糊,他是客户端pull与push差异的首要,实际上push可是是在pull之上封装了下操作,后边大家会另行回到深入分析。)

慎选队列分配政策实例AllocateMessageQueueStrategy实行分配算法,获取队列群集SetmqSet

措施再次回到后,倘若changed为true,会调用messageQueueChanged方法来公告铺排在DefaultMQPullConsumer中的相关messageQueueListener,大家得以见见RebalancePullImpl中的实现。

一)  平均分配算法,其实是相近于分页的算法

 1 [java] view plain copy
 2 @Override  
 3 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {  
 4     MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();  
 5     if (messageQueueListener != null) {  
 6         try {  
 7             messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);  
 8         } catch (Throwable e) {  
 9             log.error("messageQueueChanged exception", e);  
10         }  
11     }  
12 }  

将享有queue排好序类似于记录

播音形式则相比较简单,由于具有顾客都要拍卖,少了队列分配那么些手续。

将兼具消费端consumer排好序,相当于页数

正文转发自:

然后拿走当前consumer所在页面应该分配到的queue

 

二)  根据安排来分配队列, 也正是说在consumer运维的时候钦定了queue

三)  依照机房来配置队列

Consumer运行的时候会钦赐在哪些机房的新闻

获取钦定机房的queue

下一场在实行如一)平均算法

遵照分配队列的结果更新ProccessQueueTable一)      比对mqSet
将盈余的类别删除,当broker当机或然加上,会促成分配到mqSet变化,

a)       
将不在被本consumer消费的messagequeue的ProcessQueue删除,其实是安装ProcessQueue的droped属性为true

b)        将越过两份中从未拉取动作ProcessQueue删除

//TODO 为何要刨除掉,两分钟后来了音信怎么做?

//

2)      增加新扩张队列,比对mqSet,给新扩充的messagequeue

构建长轮询对象PullRequest对象,会从broker获撤消费的进程

营造这些队列的ProcessQueue

将PullRequest对象派发到长轮询拉音讯服务(单线程异步拉取)

注:ProcessQueue正在被消费的队列,

(壹)   
长轮询拉取到音信都会先存款和储蓄到ProcessQueue的TreeMap集合中,消费调后会去除掉,用来调整consumer消息堆叠,

TreeMap key是消息在此ConsumeQueue队列中索引

(二)    对于顺序消息消费管理

locked属性:当consumer端向broker申请锁队列成功后安装true,唯有被锁定的processqueue本领被推行消费

rollback: 将消费在msgTreeMapTemp中的新闻,放回msgTreeMap重新消费

commit: 将不时表msgTreeMapTemp数据清空,代表消费产生,放回最大偏移值

(三)   
这里是个TreeMap,对key即新闻的offset进行排序,这么些样可以使得音讯举办各种消费

consumer 3.长轮询

罗克etmq的消息是由consumer端主动到broker拉取的,
consumer向broker发送拉新闻请求,
PullMessageService服务通过1个线程将封堵队列LinkedBlockingQueue中的PullRequest到broker拉打消息 
   

 
DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法施行向broker拉音信动作

1.      获取ProcessQueue判读是不是drop的, drop为true再次来到

贰.      给ProcessQueue设置拉音信时间戳

3.     
流量控制,正在消费队列中国国投息(未被消费的)超过阀值,稍后在推行拉音信

四.     
流量调节,正在消费队列中国国投息的跨度超越阀值(暗许3000),稍后在消费

伍.      遵照topic获取订阅关系

陆.      创设拉音信回调对象PullBack,
从broker拉取音讯(异步拉取)重回结果是回调

柒.      从内部存款和储蓄器中获取commitOffsetValue  //TODO
这几个值跟pullRequest.getNextOffset不一样

8.      构建sysFlag  pull接口用到的flag

九.      调底层通讯层向broker发送拉音讯请求

倘使master压力过大,会提议去slave拉取消息

万1是到broker拉取信息清楚实时提交标志位,因为slave不允许实时提交消费进程,能够定时提交

//TODO 关于master拉音信实时提交指的是何等?

10.  拉到新闻后回调PullCallback

拍卖broker重回结果pullResult

          更新从哪些broker(master 依旧slave)拉取新闻

          反连串化新闻

          音讯过滤

          消息中放入队列最大最小offset,方便使用来感知音讯聚成堆度

将消息插手正在管理队列ProcessQueue

将消息提交到消费消息服务ConsumeMessage瑟维斯

流控管理, 借使pullInterval参数大于0
(拉信息间隔,假如为了降低拉取速度,能够设置大于0的值),延迟再实践拉新闻, 
即便pullInterval为0马上在施行拉音讯动作

序列图

一.      向broker发送长轮询请求

88bifa必发唯一官网 8

  1.   Broker接收长轮询请求

88bifa必发唯一官网 9

  1.      Consumer接收broker响应

88bifa必发唯一官网 10

长轮询活动图:

一张图画不下,再来一张

88bifa必发唯一官网 11

consumer
4.长轮询push消息-并发音讯

由此长轮询拉取到消息后会提交到新闻服务ConsumeMessageConcurrentlyService,

ConsumeMessageConcurrentlyServic的submitConsumeRequest方法营造ConsumeRequest职责交给到线程池。

长轮询向broker拉取新闻是批量拉取的, 暗许设置批量的值为pullBatchSize=
3二,可布置

消费端consumer创设3个费用消息职责ConsumeRequest消费一群消息的个数是可布置的consumeMessageBatch马克斯Size
= 1, 暗中认可批量个数为3个

         ConsumeRequest 职分run方法试行   

推断proccessQueue是还是不是被droped的, 舍弃直接回到,不在消费音信

                  营造并行消费上下文

                 
给新闻设置消费败北时候的retrytopic,当新闻发送失败的时候发送到topic为%RET奥迪Q三Y%groupname的体系中

                 
调MessageListenerConcurrently监听器的consumeMessage方法消费消息,重返消费结果

                 
假设ProcessQueue的droped为true,不管理结果,不更新offset,
但其实这里消费端是开支了新闻的,这种状态感觉有被再次消费的高风险

                  管理消费结果

消费成功,
对于批次消费新闻,重返消费成功并不意味着享有新闻都开销成功,可是消费新闻的时候若是遇上海消防费消息战败间接放回,依据ackIndex来标识成功消费到哪个地方了

                            消费战败, ackIndex设置为-一

播音形式发送战败的消息屏弃,
广播方式对于倒闭重试代价过高,对全部集群质量会有非常大影响,退步重试作用交由使用管理

集群格局,
将花费失利的音讯一条条的发送到broker的重试队列中去,假设那时候还会有发送到重试队列发送失利的音讯,这就在cosumer的本土线程按期五分钟以往重试重新消费新闻,在走二次地点的费用流程。

                 
删除正在消费的行列processQueue中此番消费的音讯,放回消费进程

                 
更新消费进程,这里只是三个内部存款和储蓄器offsettable的翻新,后边有定期义务立异到broker上去

88bifa必发唯一官网 12

consumer
5.长轮询push音信-顺序消费音讯

梯次消费服务ConsumeMessageConcurrentlyService营造的时候

                  营造3个线程池来收纳消费请求ConsumeRequest

                 
营造1个单线程的本地线程,用来稍后定时重新消费ConsumeRequest,
用来推行定期周期性(1秒)钟锁队列职责

        周期性锁队列lockMQPeriodically

                 
获取正在消费队列列表ProcessQueueTable全部MesssageQueue,
创设基于broker归类成MessageQueue群集Map>

                  遍历Map>的brokername,
获取broker的master机器地址,将brokerName的Set发送到broker请求锁定那个队列。

在broker端锁定队列,其实就是在broker的queue中标志一下消费端,表示那么些queue被有个别client锁定。
Broker会再次回到成功锁定队列的聚焦,

依附成功锁定的MessageQueue,设置相应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false 
               
通过长轮询拉取到音信后会提交到音信服务ConsumeMessageOrderlyService,ConsumeMessageOrderlyService的submitConsumeRequest方法营造ConsumeRequest职责交给到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。
ConsumeRequest职责的run方法        推断proccessQueue是不是被droped的,
扬弃直接回到,不在消费音信       
每一种messagequeue都会转换三个行列锁来保障在日前consumer内,同四个队列串行消费, 
     
判定processQueue的lock属性是不是为true,lock属性是或不是过期,如若为false或许逾期,放到本地线程稍后锁定在费用。
假如lock为true且未有过期,开头消费音讯       
总括职分执行的时刻即使超过一分钟且线程数小于队列数情状下,将processqueue,
messagequeue重新创设ConsumeRequest加到线程池10ms后在花费,那样防守个别队列被饿死 
      获取客户端的消费批次个数,暗中认可一堆次为一条       
从proccessqueue获取批次新闻, processqueue.takeMessags(batchSize),
从msgTreeMap中移除音讯放到一时map中msgTreeMapTemp,这些不常map用来回滚新闻和commit音讯来兑现事物消费 
      调回调接口消费音信,重临状态对象ConsumeOrderlyStatus       
依据消费情状,处理结果壹) 
非事物情势,自动提交音信音讯状态为success:调用processQueue.commit方法 
                获取msgTreeMapTemp的末梢二个key,表示提交的 offset     
            清空msgTreeMapTemp的音讯,已经打响消费2) 
事物提交,由用户来调控提交回滚(精卫专项使用)    更新消费进程,
这里的翻新只是多个内部存储器offsetTable的翻新,后边有定时义务按时更新到broker上去

88bifa必发唯一官网 13

consumer 六.新闻消费

消费者主动拉取信息消费,客户端通过类DefaultMQPullConsumer

        客户端能够内定特定MessageQueue

       
也足以经过DefaultMQPullConsumer.fetchMessageQueuesInBalance(topic)
获撤消费的类别

        业务和谐赢得消费队列,自个儿到broker拉取新闻,以及本身更新消费进程

因为个中贯彻跟push情势类似就不在啰嗦,用法也呼吁看示例代码去

consumer 7.shutdown

DefaultMQPushConsumerImpl  关闭消费端

        关闭消费线程

        将分配到的Set的费用进程保存到broker

利用DefaultMQPushConsumerImpl获取ProcessQueueTable的keyset的messagequeue去获取

RemoteBrokerOffsetStore.offsetTableMap中的消费进度,

offsetTable中的messagequeue的值,在update的时候尽管未有对号入座的Messagequeue会营造,
可是也会rebalance的时候将未有分配到的messagequeue删除

rebalance会将offsettable中从不分配到messagequeue删除,
但是在从offsettable删除以前会将offset保存到broker

        Unregiser客户端

        pullMessageService关闭

        scheduledExecutorService关闭,关闭部分客户端的起的定期职分

        mqClientApi关闭

        rebalanceService关闭