88bifa必发唯一官网做事行列

我们要一直等前面的耗时任务完成了之后才能接着处理后面耗时的任务,Java版,又称任务队列(task queue)

88bifa必发唯一官网 17

 

内容来自:RabbitMQ Tutorials
Java版

前面介绍了队列接收和发送消息,这篇将学习如何创建一个工作队列来处理在多个消费者之间分配耗时的任务。工作队列(work
queue),又称任务队列(task queue)。

在上一篇博客《RabbitMQ入门:Hello RabbitMQ
代码实例》中,我们通过指定的队列发送和接收消息,代码还算是比较简单的。


工作队列的目的是为了避免立刻执行资源密集型任务、减少等待时间。将消息发送到队列,工作进程在后台从队列取出任务并处理。

假设有这一些比较耗时的任务,按照上一次的那种方式,我们要一直等前面的耗时任务完成了之后才能接着处理后面耗时的任务,那要等多久才能处理完?别担心,我们今天的主角–工作队列就可以解决该问题。我们将围绕下面这个索引展开:

Work Queues

在第一个教程中,我们实现了从一个指定的队列中发送和接收消息。在这一部分,我们将会创建一个工作队列:用来讲耗时的任务分发给多个工作者。

工作队列的主要思想是避免这样的情况:直接去做一件资源密集型的任务,并且还得等它完成。相反,我们将任务安排到之后再去做。我们将任务封装为一个消息,并发到队列中。一个工作进程将会在后台取出任务并最终完成工作。如果开启多个工作进程,任务将会在这多个工作进程间共享。

这个概念在web应用中是非常有用的,因为web应用不可能在一个HTTP请求中去处理一个复杂的任务。


准备

通过Thread.sleep()来模拟耗时的任务,通过在消息的末尾添加”.”来表示处理时间,例如,Hello...表示耗时3秒。
发送端
NewTask.java:

package com.xxyh.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class NewTask {
    private static final String WORK_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(WORK_NAME, durable, false, false, null);

        // 发送10条记录,每次在后面添加"."
        for (int i = 0; i < 10; i++) {
            String dot = "";
            for (int j = 0; j <= i; j++) {
                dot += ".";
            }
            String message = "work queue " + dot + dot.length();
            channel.basicPublish("", WORK_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("utf-8"));
            System.out.println(Thread.currentThread().getName() + "发送消息:" + message);
        }
        channel.close();
        connection.close();
    }
}

接收端
Work.java:

package com.xxyh.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Work {

    private static final String WORK_QUEUE = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(WORK_QUEUE, durable, false, false, null);

        // 设置同一个消费者在同一时间只能消费一条消息
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(Thread.currentThread().getName() + "接收消息:" + message);
                try {
                    doWork(message);
                    System.out.println("消息接收完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {

                    // 确认消息已经收到
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 取消autoAck
        boolean autoAck = false;
        channel.basicConsume(WORK_QUEUE, autoAck, consumer);
    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}
  1. 什么是工作队列
  2. 代码准备
  3. 循环分发
  4. 消息确认
  5. 公平分发
  6. 消息持久化

准备

在上一个教程中,我们发送了“hello
world”的消息。现在,我们会发送一些代表复杂任务的字符串。我们没有真实的任务(比如调整图片大小、PDF文件加载等),所以我们使用Thread.sleep()方法来伪造耗时任务,假装我们很忙。我们用字符串中的点号.来表示任务的复杂性,一个点就表示需要耗时1秒,比如一个描述为hello...的假任务,它需要耗时3秒。

将上个教程中的Send.java中的代码稍作修改。因为这个程序会调度任务到工作队列,所以我们将它命名为NewTask.java

String message = "1.";

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

之前的Recv.java同样也要做些修改,它需要模拟消息中的点代表的耗时。因为它负责接收消息并处理任务,所以,将它命名为Worker.java

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

我们的假任务的执行:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}   

Round-robin分发

使用任务队列的好处就是容易处理并发工作。如果积累了大量的工作,只需要增加工作者就可以了。对于上面的示例代码,可以启动两个Work,然后启动NewTask,执行结果如下:
Work1:

pool-1-thread-4接收消息:work queue .1
消息接收完毕
pool-1-thread-5接收消息:work queue ...3
消息接收完毕
pool-1-thread-6接收消息:work queue .....5
消息接收完毕
pool-1-thread-7接收消息:work queue .......7
消息接收完毕
pool-1-thread-8接收消息:work queue .........9
消息接收完毕

Work2:

pool-1-thread-4接收消息:work queue ..2
消息接收完毕
pool-1-thread-5接收消息:work queue ....4
消息接收完毕
pool-1-thread-6接收消息:work queue ......6
消息接收完毕
pool-1-thread-7接收消息:work queue ........8
消息接收完毕
pool-1-thread-8接收消息:work queue ..........10
消息接收完毕

NewTask:

main发送消息:work queue .1
main发送消息:work queue ..2
main发送消息:work queue ...3
main发送消息:work queue ....4
main发送消息:work queue .....5
main发送消息:work queue ......6
main发送消息:work queue .......7
main发送消息:work queue ........8
main发送消息:work queue .........9
main发送消息:work queue ..........10

默认情况下,RabbitMQ会依次发送消息到下一个队列。一般情况下,每个消费者会收到数量大致相同的消息。这种分发消息的方法称为round-robin。

废话少说,直接展开。

循环分发

使用任务队列的一个优势在于容易并行处理。如果积压了大量的工作,我们只需要添加更多的工作者(上文中的Worker.java中的概念),这样很容易扩展。

首先,我们来尝试同时运行两个工作者实例(Worker.java)。它们都会从队列中获取消息,但具体是如何获取的呢?

启动NewTask,之后,可以依次将message修改为”2..”、”3…”、”4….”、”5…..”等,每修改一次就运行一次。
观察console中两个工作者的接收消息情况:

//其中之一的worker
 [x] Received '1.'
 [x] Done
 [x] Received '3...'
 [x] Done
 [x] Received '5....'
 [x] Done
 [x] Received '7....'
 [x] Done
//另一个worker
 [x] Received '2..'
 [x] Done
 [x] Received '4....'
 [x] Done
 [x] Received '6....'
 [x] Done
 [x] Received '8....'
 [x] Done

可以看出,默认情况下,RabbitMQ是轮流发送消息给下一个消费者,平均每个消费者接收到的消息数量是相等的。这种分发消息的方式叫做循环分发。你可以试一下开3个或更多工作者的情况。


消息确认(Message acknowledgment)

处理一个任务可能需要几秒钟的时间。如果一个消费者在执行过程中中断可能导致消息的丢失,显然这并不是我们希望的,RabbitMQ提供了消息确认机制,让消费者反馈已经收到消息的信息,然后RabbitMQ就可以自由产出这条消息了。

如果消费者中断了,没有发送确认消息,RabbitMQ会认为消息发送失败并将消息重新加入队列。如果同时有其他消费者在工作,它会马上重新发送该消息到另一个消费者。这种方式可以保证消息不丢失。

消息确认机制默认是开启的:

// 打开确认
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);

一、什么是工作队列

消息确认

完成一项任务可能会耗费几秒钟,你可能会问,假如其中一个消费者开始了一个非常耗时的任务,并在执行这个任务的时候崩溃了(也就是没有完成这个任务),将会发生什么事情。按照上面的代码,一旦RabbitMQ向消费者发出消息,消息就会立即从内存中移除。在这种情况下,如果你杀死一个工作者,我们将会失去它正在处理的消息,同时也会丢失所有发给这个工作者但这个工作者还未处理的消息。

但我们不想丢掉任务,如果一个工作者死掉,我们想将这个任务发给其他的工作者。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。消费者将会发送一个确认信息来告诉RabbitMQ,我已经接收到了消息,并且处理完了,你可以随便删它了。

如果一个消费者在发送确认信息前死去(连接或通道关闭、TCP连接丢失等),RabbitMQ将会认为该消息没有被完全处理并会重新将消息加入队列。如果此时有其他的消费者,RabbitMQ很快就会重新发送该消息到其他的消费者。通过这种方式,你完全可以保证没有消息丢失,即使某个消费者意外死亡。

对RabbitMQ而言,没有消息超时这一说。如果消费者死去,RabbitMQ将会重新发送消息。即使处理一个消息需要耗时很久很久也没有关系。

消息确认机制是默认打开的。只是在前面的代码中,我们显示地关掉了:boolean autoAck=true。将代码做如下修改:

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

注意到最上面的那句代码:

channel.basicQos(int prefetchCount);

其中的参数prefetchCount表示:maximum number of messages that the server will deliver

这样,就可以确保即使消费者挂了,消息也不会丢失。


消息持久化(Message durability)

上面的方式可以保证某个消费者中断时,消息不会丢失。然而,如果RabbitmqMQ中断或崩溃,它是不会记住队列和消息的。为了实现消息不丢失,需要将队列和消息持久化。首先将队列持久化:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

如果一个已经存在的队列没有被设置持久化(durable=false)的,是不能更改它的参数的。另外,必须在消息的发送端、可接收端同时修改队列的声明。

上面的设置保证了即使RabbitMQ重启,队列也不丢失。但是并不能保证消息一定能发送到接收端,还需要将消息持久化:

channel.basicPublish("", QUEUE_NAME,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes("utf-8"));

工作队列–用来将耗时的任务分发给多个消费者(工作者),主要解决这样的问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

消息持久化

通过上面的教程,我们知道如何确保消费者挂掉也不会丢失消息。但是,加入RabbitMQ服务器挂掉了怎么办?

如果关闭RabbitMQ服务或者RabbitMQ服务崩溃了,RabbitMQ就会丢掉所有的队列和消息:除非你告诉它不要这样。要确保RabbitMQ服务关闭或崩溃后消息不会丢失,要做两件事情:持久化队列、持久化消息。

首先,我们要确保RabbitMQ永远不会丢失我们的队列。怎么做呢?在声明队列的时候,指定durable参数为true。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

尽管上面的代码没有错,但是它不会按所想的那样将队列持久化:因为之前我们已经将hello这个队列设置了不持久化,RabbitMQ不允许重新定义已经存在的队列,否则就会报错。但是,我们有一个快速的解决办法:声明另外一个队列就行了,只要不叫hello,比如task_queue

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

现在,我们已经确保队列不会丢失了,那么如何将消息持久化呢:将MessageProperties的值设置为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

将消息标记为持久化并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘中,但是在RabbitMQ接收到消息和保存消息之间会与一个很短的时间窗。同时,RabbitMQ不会为每个消息做fsync(2)处理,消息可能仅仅保存到缓存中而不会真正地写入到磁盘中。这种持久化保证尽管不够健壮,但已经远远足够我们的简单任务队列。如果你需要更强大的保证,可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)


公平分发

在某些情况下,分发的效果并不让人满意。例如,在两个消费者的情况下,奇数任务多而偶数任务少,可能一个消费队列一直处于繁忙状态,而另一个几乎处于闲置状态。RabbitMQ并不知晓这一情况,它只负责分发队列里的消息,不考虑消息未确认的情况。它只是盲目地将第n个消息发送给第n个消费者。

channel.basicQos(1);

这行代码的作用是告诉RabbitMQ,不要在同一时间给同一个消费者超过1条消息。

二、代码准备

公平分发

你可能已经发现,循环消息分发并不是我们想要的。比如,有两个工作者,当奇数消息(如上文中的”1…”、”3…”、”5…”、”7…”)很耗时而偶数消息(如上文中的”2.”、”4.”、”6.”、”8.”)很简单的时候,其中一个工作者就会一直很忙而另一个工作者就会闲。然而RabbitMQ对这些一概不知,它只是在轮流平均地发消息。

这种情况的发生是因为,RabbitMQ
只是当消息进入队列时就分发出去,而没有查看每个工作者未返回确认信息的数量。

为了改变这种情况,我们可以使用basicQos方法,并将参数prefetchCount设为1。这样做,工作者就会告诉RabbitMQ:不要同时发送多个消息给我,每次只发1个,当我处理完这个消息并给你确认信息后,你再发给我下一个消息。这时候,RabbitMQ就不会轮流平均发送消息了,而是寻找闲着的工作者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意,如果所有的工作者都很忙,你的队列可能会装满,你必须留意这种情况:或者添加更多的工作者,或者采取其他策略。

完整代码:
NewTask.java

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

  1. 生产者类:NewTask.java

    public class NewTask {
        //队列名称
        public static final String QUEUE_NAME = "TASK_QUEUE";
        //队列是否需要持久化
        public static final boolean DURABLE = false;
    
        //需要发送的消息列表
        public static final String[] msgs = {"task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 1.connection & channel
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.queue
                channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);
    
                // 3.publish msg
                for (int i = 0; i < msgs.length; i++) {
                    channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes());
                    System.out.println("** new task ****:" + msgs[i]);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
    
        }
    }
    

     

  2. 消费者类:Work.java

    public class Work {
    
        public static void main(String[] args) {
            System.out.println("*** Work ***");
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try {
                //1.connection & channel
                final Channel channel = factory.newConnection().createChannel();
    
                //2.queue
                channel.queueDeclare(NewTask.QUEUE_NAME, NewTask.DURABLE, false, false, null);
    
                //3. consumer instance
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        //deal task
                        doWork(msg);
    
                    }
                };
    
                //4.do consumer
                boolean autoAck = true;
                channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        private static void doWork(String msg) {
            try {
                System.out.println("**** deal task begin :" + msg);
    
                //假装task比较耗时,通过sleep()来模拟需要消耗的时间
                if ("sleep".equals(msg)) {
                    Thread.sleep(1000 * 60);
                } else {
                    Thread.sleep(1000);
                }
    
                System.out.println("**** deal task finish :" + msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

     

  3. 再来一个消费者类:Work2.java,代码同Work.java一模一样。

说明

①与原文略有出入,如有疑问,请参考原文。
②原文是直接用javacp命令运行代码,用IDE更方便。

 

三、循环分发

我们先启动Work和Work2,然后启动NewTask,运行结果如下:

NewTask运行结果:

88bifa必发唯一官网 1

Work运行结果:

88bifa必发唯一官网 2

Work2运行结果:

 88bifa必发唯一官网 3

我们发现,消息生产者发送了6条消息,消费者work和work2分别分到了3个消息,而且是循环轮流分发到的,这种分发的方式就是循环分发。

四、消息确认

假如我们在发送的消息里面添加“sleep”

//需要发送的消息列表
    public static final String[] msgs = {"sleep", "task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};

根据代码中的实现,这个sleep要耗时1分钟,万一在这1分钟之内,工作进程崩溃了或者被kill了,会发生什么情况呢?根据上面的代码:

//4.do consumer
            boolean autoAck = true;
            channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);

自动确认为true,每次RabbitMQ向消费者发送消息之后,会自动发确认消息(我工作你放心,不会有问题),这个时候消息会立即从内存中删除。如果工作者挂了,那将会丢失它正在处理和未处理的所有工作,而且这些工作还不能再交由其他工作者处理,这种丢失属于客户端丢失。

我们来验证下,和刚才的步骤一样执行程序:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

根据上面的内容,消息生产者发送了7条消息, work2消费了1、3、5
三条,那剩下的sleep、2、4、6
这四条消息肯定是work来处理,只是sleep耗时一分钟
,时间差后面的还没来得及处理,这个时候我们kill掉work,去看下RabbitMQ
管理页面,没有未处理的消息,消息随着work被kill也跟着丢失了。

88bifa必发唯一官网 4

是不是很可怕?

为了应对这种情况,RabbitMQ支持消息确认。消费者处理完消息之后,会发送一个确认消息告诉RabbitMQ,消息处理完了,你可以删掉它了。

代码修改(Work.java和Work2.java同步修改):1.将自动确认改为false,2.消息处理之后再通过channel.basicAck进行消息确认

88bifa必发唯一官网 5

 修改完后,执行程序:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

然后kill掉work,去看RabbitMQ管理页面,会发现有4条未确认:

88bifa必发唯一官网 6

再去看下work2的控制台,work2将work未处理完和未来得及处理的消息都给处理了:

88bifa必发唯一官网 7

等work2处理完后,你再去看RabbitMQ管理页面,会发现页面的消息数值也都变成0
了。

 

五、公平分发

按照上面那种循环分发的方式,每个消费者会分到相同数量的任务,这样会有一个问题:假如有一些task非常耗时,之前的任务还没有完成,后面又来了那么多任务,来不及处理,那咋办?
有的消费者忙的不可开交,有的消费者却很快处理完事情然后无所事事浪费资源,那咋整?答案就是:公平分发。
怎么实现呢?

 发生上述问题的原因就是RabbitMQ收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量。因此我们可以使用basicQos方法,并将参数prefetchCount设为1,告诉RabbitMQ
我每次值处理一条消息,你要等我处理完了再分给我下一个。这样RabbitMQ就不会轮流分发了,而是寻找空闲的工作者进行分发。

代码修改(work和Work2同步修改):

88bifa必发唯一官网 8

执行代码:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep
**** deal task finish :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 2
**** deal task finish :task 2
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 4
**** deal task finish :task 4
**** deal task begin :task 5
**** deal task finish :task 5
**** deal task begin :task 6
**** deal task finish :task 6

Work只处理了sleep,Work2处理了1、2、3、4、5、6 这个六条消息。

六、消息持久化

上面说到消息确认的时候,提到了工作者被kill的情况。那如果RabbitMQ被stop掉了呢?我们来看下:

这次只启动Work和NewTask,不启动Work2,所有消息都交给Work来处理,控制台打印信息:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

在work处理sleep的过程中,我们停掉RabbitMQ服务

88bifa必发唯一官网 9

然后重新start服务并执行rabbitmq-plugins enable
rabbitmq_management命令,然后查看管理页面:

88bifa必发唯一官网 10

你会发现,所有消息都将被清空了。这种丢失属于服务端丢失

因此需要将消息进行持久化来应对这种情况。

持久化需要做两件事情:

  1. 队列持久化,在声明队列的时候,将第二个参数设为true88bifa必发唯一官网 11

       
      88bifa必发唯一官网 12

     另外,由于RabbitMQ不允许重新定义已经存在的队列,否则就会报错(上一篇博客中已经提到过了),因此我们将这次的队列名改下:88bifa必发唯一官网 13

     

  2. 消息持久化,在发送消息的时候,将第三个参数设为288bifa必发唯一官网 14

然后运行代码,在work处理sleep的时候将服务停掉,并重新启动且执行rabbitmq-plugins
enable rabbitmq_management命令,然后查看管理页面:

 88bifa必发唯一官网 15

一共7条消息,未确认的1条(sleep)和ready的6条(1、2、3、4、5、6)。消息被保存了下来。

 重新启动Work,所有消息被消费:

88bifa必发唯一官网 16

88bifa必发唯一官网 17