RabbitMQ基本使用五(路由模式)

Hello, 大家好,我是一名在互联网捡破烂的程序员

在上一期呢,我们讲到了工作队列的使用,还没有打怪升级的小伙伴先去修炼哦

RabbitMQ基本使用(发布/订阅模式)

今天呢,我们要继续打怪升级哦。

今天我们来讲一讲比较高级的消息方式,嗯,我想一下,是什么呢?

不会又要翻车吧。。。

哦,我想起了起来呀,请开始我们的表演

一、开篇前提

今天呢?我们又来讲一讲新的模式——路由模式

何为路由模式呢?

我们先来回顾一下知识点吧。

在上一期中我们讲到了发布/订阅模式,我们生产者将消息发布到交换机(Exchange)中,然后交换机(Exchange)将消息发送到绑定在这个交换机上的全部队列。

那么问题来了?只要我们的队列绑定在这个交换机上,我们的队列都会收到交换机发送过来的消息。

每一个队列都会去消费同样的信息,那不是没事找事,脱了裤子放屁—多此一举吗

那么问题来了,如果在一个日志系统中,我只想要接收错误的日志消息,其他的我一个也不接受,哼,我就是这样的拽。。

那么我们就会使用到我们的路由模式呐

1. 何为路由模式(direct)

路由模式是在使用交换机的同时,生产者生产消息发送到交换机中,交换机根据指定的路由发送数据,消费者绑定路由接收消息。与发布/订阅不同的时,发布/订阅模式只要绑定了交换机的队列都会收到生产者向交换机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会声明发送给交换机的哪个路由,并且只有当消费者的队列绑定了交换机并且声明路由,才会收到数据。

P:消息生产者

X:交换机

红色:消息队列

C1,C2:消息消费者

error,info,warning:路由

2. 场景假设

我们还是来假设我们的注册功能,我们现在有两种注册方式,第一种使用邮箱注册,第二种使用手机号码注册,当我们使用邮箱注册时,我们只希望通过邮箱发送验证码,而使用手机号码注册时,我们只希望通过手机号码发送验证码。那么我们使用发布/订阅模式就不能满足我们的需求,使用发布/订阅模式时,我们的邮箱队列和手机号码队列都会去消费我们发出的消息。

那该咋办呢?呜呜呜呜

不要怕,用我们毛主席的一句话来打倒它,敌越强,我越强

直接干掉它,我们是打不死的小强,,,哇哈哈哈哈哈

那么我们使用什么秘密武器来消灭它呢,好吧,就是我们今天的主角路由模式

二、生产者

我们就废话不多说了,直接上代码

package com.codeworld.rabbitmqdemo.route;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 路由模式--生产者
* @author dreamcode
*/
public class Send {

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "route_exchange";

/**
* 邮箱路由的名称
*/
private static final String ROUTE_NAME_EMAIL = "route_email";

/**
* 手机号码路由的名称
*/
private static final String ROUTE_NAME_PHONE = "route_phone";


public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

// 邮箱信息
String emailMessage = "register email";

// 发送邮箱消息
channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME_EMAIL, null, emailMessage.getBytes("UTF-8"));

System.out.println("send:" + emailMessage);

// 手机号码信息
String phoneMessage = "register phone";

// 发送邮箱消息
channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME_PHONE, null, phoneMessage.getBytes("UTF-8"));

System.out.println("send:" + phoneMessage);

channel.close();

connection.close();
}
}

妈呀,什么鬼东西,上来就是一堆代码。。。

还是开始我们的代码解释吧

首先先说一下和发布/订阅模式的区别吧

在代码中我们可以看到有一点不同,路由模式中同样声明了交换机,我们可以清楚的看到还定义了什么路由

what?

1. 定义基本量

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "route_exchange";

/**
* 邮箱路由的名称
*/
private static final String ROUTE_NAME_EMAIL = "route_email";

/**
* 手机号码路由的名称
*/
private static final String ROUTE_NAME_PHONE = "route_phone";

是的,你没看错,我们定义了两个路由的名称,为什么要定义路由呢?

这就要根据我们的路由模式来解释了。

2. 连接并开启通道

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

3. 声明交换机类型

// 声明交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

这里需要注意的是,我们这里使用的类型是direct

而我们发布/订阅模式设置的是fanout

// 声明交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

为什么呢?我也不清楚,别个就是这样定义的。。。

哈哈哈哈

4. 发送消息

  • 发送邮件消息
// 邮箱信息
String emailMessage = "register email";

// 发送邮箱消息
channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME_EMAIL, null,emailMessage.getBytes("UTF-8"));

这里就是我们的重点来了,同样路由模式也是将消息发送到交换机(Exchange)中,然后也是由交换机(Exchange)发送出去

我们还定义了一个路由,这里定义的路由ROUTE_NAME_EMAIL我们的交换机将消息发送到这个路由中,我们消费者如果你需要这个消息,就定义同样的路由,那么你就可以消费这条消息了。

  • 发送手机信息
// 手机号码信息
String phoneMessage = "register phone";

// 发送邮箱消息
channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME_PHONE, null, phoneMessage.getBytes("UTF-8"));

这里就不用我们多逼逼了,和发送邮件消息一模一样的。

生产者就只有这一点了,这里就只是我们多加了一个路由,没有什么特别的

三、消费者

我们也是什么都不多说,直接上代码就是了

  • 消费者1
package com.codeworld.rabbitmqdemo.route;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者1--- 路由模式
* @author dreamcode
*/
public class Rec1 {

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "route_exchange";

/**
* 邮箱路由的名称
*/
private static final String ROUTE_NAME_EMAIL = "route_email";

/**
* 队列名称
*/
private static final String ROUTE_NAME_QUEUE = "route_queue_email";


public static void main(String[] args) throws IOException, TimeoutException {


ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

// 声明队列
channel.queueDeclare(ROUTE_NAME_QUEUE,false,false,false,null);

// 将队列绑定到交换机(指定路由为route_email)
channel.queueBind(ROUTE_NAME_QUEUE,EXCHANGE_NAME,ROUTE_NAME_EMAIL);

// 保证一次只分发一次
channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("rec:"+ message);

}
};

boolean autoAck = true;

channel.basicConsume(ROUTE_NAME_QUEUE,autoAck, consumer);
}
}

1. 定义基本量

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "route_exchange";

/**
* 邮箱路由的名称
*/
private static final String ROUTE_NAME_EMAIL = "route_email";

/**
* 队列名称
*/
private static final String ROUTE_NAME_QUEUE = "route_queue_email";

这里我们定义的基本量有

EXCHANGE_NAME:交换机名称 —–》这个就不过多的解释,但是要保证和你的生产者的交换机一致,因为我们要从交换机重接收发送出来的消息

ROUTE_NAME_EMAIL:路由名称 —–》这里我们定义的是邮件路由,意思是我们只接收邮件路由发送出来的消息

ROUTE_NAME_QUEUE:队列名称 —–》这个就不用多说了,不管你使用哪一种模式,队列是少不了的,我们的消息最后还是走的队列里

2. 连接并开启通道

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

3. 声明交换机类型

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

4. 声明队列

// 声明队列
channel.queueDeclare(ROUTE_NAME_QUEUE,false,false,false,null);

只要我们学习了前面的都知道,从发布/订阅模式开始我们的队列都是在消费者中定义,在简单模式和工作模式都是在生产者和消费者中都定义了队列名称。

5. 绑定队列并制定路由

// 将队列绑定到交换机(指定路由为route_email)
channel.queueBind(ROUTE_NAME_QUEUE,EXCHANGE_NAME,ROUTE_NAME_EMAIL);

这里就是我们的核心,决定了我们将获取那条路由中的消息

  • 消费者2
package com.codeworld.rabbitmqdemo.route;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者2--- 路由模式
* @author dreamcode
*/
public class Rec2 {

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "route_exchange";

/**
* 邮箱路由的名称
*/
private static final String ROUTE_NAME_PHONE = "route_phone";

/**
* 队列名称
*/
private static final String ROUTE_NAME_QUEUE = "route_queue_phone";


public static void main(String[] args) throws IOException, TimeoutException {


ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

// 声明队列
channel.queueDeclare(ROUTE_NAME_QUEUE,false,false,false,null);

// 将队列绑定到交换机(指定路由为route_email)
channel.queueBind(ROUTE_NAME_QUEUE,EXCHANGE_NAME,ROUTE_NAME_PHONE);

// 保证一次只分发一次
channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("rec:"+ message);

}
};

boolean autoAck = true;

channel.basicConsume(ROUTE_NAME_QUEUE,autoAck, consumer);
}
}

消费者2和消费者1一模一样,这里就不用过得解释了。

四、截图

  • 交换机

  • 队列

  • 交换机绑定的队列

  • 运行截图

    生产者

​ 消费者1(email路由)

​ 消费者2(phone路由)

五、结束

OK,在这里我们的路由模式就到这里了,不知道你有没有帮助到你呢?

若没有看懂请前往官网 RabbitMQ

看到这里的人都是技术人才,你的浏览就是我前进的动力

欢迎加入QQ群: