RabbitMQ基本使用三(工作队列)

Hello, 大家好,我是一名在互联网捡破烂的程序员

在上一期呢,我们讲到了简单队列的使用,还没有打怪升级的小伙伴先去修炼哦

RabbitMQ简单队列

今天呢,我们接着讲工作队列,工作队列呢,其实和简单队列差不多,相当于一对多之间的关系。

有一个生产者,多个消费者

一、开篇前提

在第一个教程中 RabbitMQ简单队列 我们编写了从命令队列发送和接收消息的程序。在此中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并等待它完成。相反,我们计划稍后完成的任务。我们将任务封装为信息,并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行任务。运行许多工作时,任务将在它们之间共享

二、简单使用

  • 生产者

生产者产生一条消息,发送到RabbitMQ中,然后直接退出

直接上代码

package com.codeworld.rabbitmqdemo.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 生产者
* @author dreamcode
*/
public class Send {


/**
* 队列名称
*/
private static final String QUEUE_NAME = "work_queue";

private static final String[] msgs = {"task1", "task2", "task3", "task4"};

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 发送消息
for (int i = 0; i < msgs.length; i++) {

channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes());

System.out.println("* New Task:" + msgs[i]);
}
// 关闭通道
channel.close();

connection.close();
}
}

代码说明:

定义队列名称:private static final String QUEUE_NAME = "work_queue"

在消费者中一样的要定义同样的队列名称,这样我们才是监听的同一个队列

队列是否持久化:private static final boolean DURABLE = true;

如果我们想要队列一直保存,那么我们可以设置这个为true

也就是这里面的 channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);

那么问题来了?

当我们在并发量大的时候,如果只有一个消费者,哇,那不是要累死了吗

完全是招架不住的,这还会拖慢我们的速度,给用户有不好的体验。。。

因此我们需要创建多个消费者来帮助我们消费我们的消息,这样系统的速度会提高很多。。

那么下面开始实现我们的消费者

  • 消费者

这里我们只创建了两个消费者

消费者1

package com.codeworld.rabbitmqdemo.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者1
* @author dreamcode
*/
public class Rec1 {


/**
* 队列名称
*/
private static final String QUEUE_NAME = "work_queue";


public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false,false,false,null);

System.out.println("Wait for message. To exit press CTRL+C");


Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

doWork(message);
}
};
channel.basicConsume(QUEUE_NAME,false, consumer);

}

// 每隔一秒打印一条消息
private static void doWork(String message){

try {

System.out.println("*** deal task begin:" + message);

Thread.sleep(1000);


}catch (Exception e){

e.printStackTrace();

}
}
}

消费者2

package com.codeworld.rabbitmqdemo.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者2
* @author dreamcode
*/
public class Rec2 {


/**
* 队列名称
*/
private static final String QUEUE_NAME = "work_queue";


public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false,false,false,null);

System.out.println("Wait for message. To exit press CTRL+C");


Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

doWork(message);
}
};

channel.basicConsume(QUEUE_NAME,false, consumer);

}

// 每隔一秒打印消息
private static void doWork(String message){

try {

System.out.println("*** deal task begin:" + message);

Thread.sleep(1000);

}catch (Exception e){

e.printStackTrace();

}
}
}

我们来看看结果

生产者发送消息到MQ中

消费者消费消息

  • 消费者1

  • 消费者2

默认情况下,RabbitMQ会按顺序将每条消息发送到下一个使用者。平均而言,每个使用者将获得相同数量的信息。这种分发消息的方式称为循环。与三个或三个以上的使用者一起使用

三、消息确认

完成任务可能需要几秒钟。你可能知道,如果其中一个消费者开始执行一项长期的任务,并且只有部分完成,会发生什么情况。使用我们当前的代码,一旦 RabbitMQ向使用者传递消息,它会立即将其标记为删除。在这种情况下,如果你杀死了一个工作任务,我们将丢失它刚刚处理的消息。我们还将丢失发送给此特定工作人员但尚未处理的所有消息。

那这不就玩完了吗,这就要炸了呀

但是我们不想丢失任何任务。如果一个工人死了,我们希望将任务交付给另外一个工作人员。

因此,为了确保消息永不丢失,RabbitMQ支持消息确认。消费者会发回一个确认(现在)消息,告诉 RabbitMQ已收到,处理特定消息,并且 RabbitMQ可以自由删除它。

如果使用者死亡(其通道关闭,连接关闭,或TCP连接丢失),而不发送 ack, RabbitMQ将了解信息未完成处理,并将重新排队。如果同时有其他在线消费者,它将迅速将其重新交付给其他消费者。这样,即使工人偶尔会死亡,你也能够保证不会丢失任何消息。

没有任何消息超时;当消费者死亡时,RabbitMQ将重新传递消息。即使处理消息需要更长的时间,也没有关系。

默认情况下,手动消息确认处于打开状态。在前面的实例中,我们通过autoAck标志显式关闭了它们。这时候将此标志设置为false,并在完成任务后向工作人员发送适当的确认。


Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

doWork(message);

}
};

boolean autoAck = false;

channel.basicConsume(QUEUE_NAME,autoAck, consumer);

使用此代码,我们可以确定,即使你在处理消息时使用CTRL+C杀死了工作人员,也不会丢失任何内容。工作人员死亡后不久,所有未确认的消息都将重新传递。

四、消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ服务器停止,我们的任务仍将丢失。

遇到情况,不要慌,一步一步稳扎稳打,向请进,冲啊。。。

RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。

需要两件事情来确保消息不会丢失:我们需要将队列和消息标记为持久。

  • 首先,我们需要确保队列在兔子MQ节点重新启动后存活下来。为此,我们需要声明为持久
/**
* 队列是否需要持久化
*/
private static final boolean DURABLE = true;

// 声明队列
channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);

注意:

尽管这样设置是正确的的,但是在目前设置中不会起作用。这是因为我们已经定义了这个队列,但是这个队列不持久。RabbitMQ不允许重新定义具有不同参数的现有队列。并将错误返回给尝试执行此操作的任何程序。但有一个快速的解决方法,让我们重新声明一个名称不同的队列,例如:task_queue

/**
* 队列是否需要持久化
*/
private static final boolean DURABLE = true;

// 声明队列
channel.queueDeclare("task_queue", DURABLE, false, false, null);

设置了生产者队列为持久化,同时需要将消费者也设置为队列持久化,不然会报错。。。

channel.queueDeclare(QUEUE_NAME, true,false,false,null);

这样我们的队列持久化就设置完毕了,这样我们就可以保证消息不会丢失,就算 RabbitMQ服务停止了,那么当我们重启 RabbitMQ就会保证我们队列还存在,这样我们就可以继续消费我们的信息。

关于消息持久性的说明

将邮件标记为持久消息并不完全保证不会丢失消息。尽管他告诉 RabbitMQ将消息保存到磁盘,但当 RabbitMQ已接收的消息但尚未保存消息时,仍有很短的时间窗口,此外, RabbitMQ不会针对每条消息执行fsync(2),它可能只是保存在缓存中,而不是真正的写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说,这已经足够了。

五、公平派单

问题来了。。

你可能会注意到,调度仍然不能完全按照我们想要的工作方式工作。例如,在有两个工作人员的情况下,当所有奇怪的消息都是重复的,甚至是轻的,一个工作人员会持续忙碌,另外一个几乎不会做任何工作。

RabbitMQ对此义一无所知,仍然会均匀的发送消息。

这是因为 RabbitMQ在消息进入队列时只调度消息。它不查看使用者的未确认数。它只盲目的向第n级消费者发送每一个消息。

那应该怎么办呢?

不要慌,要做到心不慌,手不抖,来跟着我们的感觉走。。。

为了解决,我们可是使用basicQos方法与prefetchCount=1来设置。这就是要告诉 RabbitMQ一次不要向工作人员传递多条消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它将其调度给下一个仍然不忙碌的工作人员。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

六、整理完结

  • 生产者
package com.codeworld.rabbitmqdemo.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 生产者
* @author dreamcode
*/
public class Send {


/**
* 队列名称
*/
private static final String QUEUE_NAME = "work_queue";

/**
* 队列是否需要持久化
*/
private static final boolean DURABLE = true;

private static final String[] msgs = {"task1", "task2", "task3"};

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);

// 发送消息
for (int i = 0; i < 20; i++) {

String message = "New Task" + i;

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println("* New Task:" + i);
}

// 关闭通道
channel.close();

connection.close();
}
}
  • 消费者

    消费者1

package com.codeworld.rabbitmqdemo.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者1
* @author dreamcode
*/
public class Rec1 {


/**
* 队列名称
*/
private static final String QUEUE_NAME = "work_queue";

/**
* 队列是否需要持久化
*/
private static final boolean DURABLE = true;

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, DURABLE,false,false,null);

System.out.println("Wait for message. To exit press CTRL+C");


Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

doWork(message);

}
};

channel.basicQos(1);

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck, consumer);

}


private static void doWork(String message){

try {

System.out.println("*** deal task begin:" + message);

Thread.sleep(1000);


}catch (Exception e){

e.printStackTrace();

}
}
}

消费者2

package com.codeworld.rabbitmqdemo.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者2
* @author dreamcode
*/
public class Rec2 {


/**
* 队列名称
*/
private static final String QUEUE_NAME = "work_queue";

/**
* 队列是否需要持久化
*/
private static final boolean DURABLE = true;

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, DURABLE,false,false,null);

System.out.println("Wait for message. To exit press CTRL+C");


Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

doWork(message);
}
};

channel.basicQos(1);

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck, consumer);

}


private static void doWork(String message){

try {

System.out.println("*** deal task begin:" + message);

Thread.sleep(1000);


}catch (Exception e){

e.printStackTrace();

}
}
}

OK,在这里我们的工作队列就到这里了,不知道你有没有帮助到你呢?

若没有看懂请前往官网 RabbitMQ

看到这里的人都是技术人才,你的浏览就是我前进的动力

欢迎加入QQ群: