RabbitMQ基本使用四(发布/订阅队列)

Hello, 大家好,我是一名在互联网捡破烂的程序员

在上一期呢,我们讲到了工作队列的使用,还没有打怪升级的小伙伴先去修炼哦

RabbitMQ工作队列

今天呢,我们要继续打怪升级哦。

今天我们来讲一讲比较高级的消息方式,嗯,我想一下,是什么呢?

哦,我想起来了,今天我们来讲一讲发布/订阅模式,那么开始我们的表演。。。

一、开篇前提

在上一篇中我们讲到了 工作队列 ,生产者生产一堆消息,发布到MQ中,创建的多个消费者自动消费消息。

缺点是什么呢?在此过程中,我们的消费者都是做的同一个工作,如果工作有很多不同的种类,那我们该咋办呢?如果我们想多个消费者消费同一个消息呢?

哈哈哈,没有什么可以难到我们的,我们要有坚持的精神,不管在什么时候,我们都要有求知欲的信念。我们一定要质问到底。

好了,这里我们就扯远了,我们回归正题

那么我们怎么解决?

这就要使用到我们的另外一种模式:发布/订阅模式

什么是发布/订阅模式呢?

  • 概念:

    概念其实很简单:我们需要做不同的事情—我们将向多个消费者发送消息,可以发布同一条消息也可以发布不同的消息。这种模式就叫做 发布/订阅模式

  • 场景假设:

    我们在注册过程中,一般注册成功后,会发送消息通知用户注册成功(失败)。如果在一个系统中,用户注册有邮箱和手机号码,如果手机号码用来接收验证码,邮箱用来通知用户注册的结果。利用MQ实现业务异步处理,如果使用工作队列的话,就会注册信息队列。注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱和手机号码发送信息。但是实际上邮箱和手机号信息发送是两个不同的业务逻辑,不应该放在一起处理。

  • 解决方案

    这时候我们就需要利用发布/订阅模式将信息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱,手机),并绑定在交换机上。这样生产者只需要向交换机(EXCHANGE)发送消息,两个队列都会接收到消息发送给消费者。

二、图例展示

上面的图是来自官方的发布/订阅模式图例

  • P:消息的生产者
  • X:交换机
  • 红色:队列
  • C1,C2:消息消费者

三、生产者

package com.codeworld.rabbitmqdemo.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 生产者--发布/订阅模式
* @author dreamcode
*/
public class Send {


/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机(分发:发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

// 循环发送消息
for (int i = 0; i < 20; i++) {

String message = "send: " + i;

System.out.println(message);

// 发送消息
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));

Thread.sleep(1000);
}

System.out.println("send End");

channel.close();

connection.close();
}

}

代码解释:

1.在发布/订阅模式中,我们又引入了一个新的概念

  • EXCHANGE(交换机)

    其实,交换机理解起来很简单,它的一端是接收生产者发送过来的消息,一端将消息发送给队列。所以,当交换机接收到一个消息时,它必须做出相应的处理,是发送给一个特定的队列?还是发给多个队列?还是丢弃?

    在前两期中,我们都是往队列中发送消息和获取消息。

    回忆一下我们前两张学习的内容

    • 一个生产者用来发送消息
    • 一个队列缓存消息
    • 一个或多个消费者消费队列中的消息

    然而,RabbitMQ的消息模式核心思想并不是那样的。。。

    核心思想是:生产者从不将任何消息直接发送到队列上。实际上,生产者根本不知道它发送的消息将被转发到那些队列

    实际上,生产者只能把消息发送给交换机(exchange),exchange只做一件简单的事情:一方面接收生产者发送过来的消息,另一方面将接收的消息发送给队列。一个exchange必须清楚的知道如何处理一条消息

  • 类型

    • direct
    • topic
    • headers
    • fanout

我们使用是最后一种,fanout广播模式

// 声明交换机(分发:发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

广播模式很简单。正如你可能从名称中猜到的那样,它只是将它接收的所有消息广播到它知道的队列。这正需要我们的模式

2.发布/订阅模式有几点和工作队列不一致

  • 在生产者中我们没有声明队列名称,而是声明了一个交换机名称

    在工作队列的生产者中我们是这样做的

/**
* 队列名称
*/
private static final String QUEUE_NAME = "work_queue";

// 并且只需要声明队列就可以
// 声明队列
channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);

在发布/订阅模式中我们只定义了一个交换机名称

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";

// 只需要声明交换机
// 声明交换机(分发:发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

这样的好处是我们只需要将我们的消息发送到交换机中,交换机再将消息发送到绑定的队列上

// 发送消息
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));

这一步就很好的证明了。。。

四、消费者

先直接上代码

  • 消费者1
package com.codeworld.rabbitmqdemo.ps;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者1---发布/订阅模式
* @author dreamcode
*/
public class Rec1 {

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";

/**
* 队列名称
*/
private static final String QUEUE_NAME = "ps_email";


public static void main(String[] args) throws IOException, TimeoutException {


ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机(发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

// 将队列绑定到交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

// 保证一次只发一个
int prefectCount = 1;

channel.basicQos(prefectCount);


Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

doWork(message);

}
};

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck, consumer);
}

private static void doWork(String message){

try {

System.out.println("*** deal task begin:" + message);

Thread.sleep(1000);


}catch (Exception e){

e.printStackTrace();

}
}
}
  • 消费者2
package com.codeworld.rabbitmqdemo.ps;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者2---发布/订阅模式
* @author dreamcode
*/
public class Rec2 {

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";

/**
* 队列名称
*/
private static final String QUEUE_NAME = "ps_phone";

public static void main(String[] args) throws IOException, TimeoutException {


ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机(发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

// 将队列绑定到交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

// 保证一次只发一个
int prefectCount = 1;

channel.basicQos(prefectCount);


Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

doWork(message);

}
};

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck, consumer);
}

private static void doWork(String message){

try {

System.out.println("*** deal task begin:" + message);

Thread.sleep(1000);


}catch (Exception e){

e.printStackTrace();

}
}
}

代码解释:

1.在消费者中我们只需要定义交换机名称和对应的队列名称

消费者1:

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";

/**
* 队列名称
*/
private static final String QUEUE_NAME = "ps_email";

我们这里定义的交换机和生产者中的交换机名称EXCHANGE_NAME = "ps_exchange"保持一致,也就是说我们要把队列QUEUE_NAME = "ps_email"绑定到交换机上面

消费者2:和消费者1一样,只是队列名称不一样,这样我们就可以监听不同的队列

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "ps_exchange";

/**
* 队列名称
*/
private static final String QUEUE_NAME = "ps_phone";

2. 声明交换机类型,这里我们就只列出一种消费者

// 声明交换机(发布/订阅模式)
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

3. 声明队列

// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

// 在这里我们没有使用队列的持久化

4. 将队列绑定到交换机上

// 将队列绑定到交换机上
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

五、补充(参照官网— 发布/订阅模式)

在我们的消费者中,如果我们不想手动命名我们的队列,我们可以使用临时队列

你是否还记得我们在 RabbitMQ基本使用三(工作队列) 使用具体的特定队列名称,创建队列名称对我们来说是至关重要的-我们需要将消费者指向同一个队列。如果要在生产者和使用者之间共享队列,则为队列指定名称非常重要。

但是,我们希望听到所有的消息,而不是仅仅是它们的部分信息。我们也只当前流动的消息感兴趣,而不是旧消息中。要解决这个问题,我们需要两件事

  • 首先,每当我们连接到 RabbitMQ,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者—让服务器为我们选择一个随机队列名称
  • 一旦我们断开使用者,队列应该自动删除

那么我们应该怎么使用方法呢?

在Java中,当我们不向queueDeclare()提供任何参数时,我们会创建一个非持久、独占的自动删除队列,该队列的名称生成方式:

String queueName = channel.queueDeclare().getQueue();

此时queueName包含一个随机队列名称,例如:它可能看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg

将生成的队列名称绑定到交换机上

六、截图

  • 交换机

  • 队列

  • 交换机绑定的队列

  • 运行结果截图

    生产者

    消费者1

    消费者2

七、结束

OK,在这里我们的发布/订阅模式就到这里了,不知道你有没有帮助到你呢?

若没有看懂请前往官网 RabbitMQ

看到这里的人都是技术人才,你的浏览就是我前进的动力

欢迎加入QQ群: