RabbitMQ基本使用六(主题模式)

Hello, 大家好,我是一名在互联网捡破烂的程序员

在上一期呢,我们讲到了工作队列的使用,还没有打怪升级的小伙伴先去修炼哦

RabbitMQ基本使用(路由模式)

今天呢,我们要继续打怪升级哦。

今天我们来讲一讲比较高级的消息方式,嗯,我想一下,是什么呢?

这下不会再翻车的????

哼哼,我已经提前看过了

那就开始我们的表演了

一、开篇前提

那我们就开始吧,今天就讲一讲主题模式

何为主题模式呢?

在此之前我们也要回顾上一期的内容

在上一期我们讲到了路由模式。路由模式的概念呢,就是我们想要接收哪一个路由发送过来的消息,我们在消费者中就定义相同路由名称就OK了。

那么缺点是什么呢?路由模式下,我们定义的路由键是固定的。如果定义了很多路由,那么会定义很多个路由键,这样就不好维护了,这不就又炸了吗?

心不慌,手不抖,我们跟着感觉走

1. 何为主题模式(topics)

那么主题模式呢?其实和路由模式类似,路由模式指定的路由建是固定的,而主题模式是可以模糊匹配路由键,就类似于SQL语句中的 =like的关系

P:消息生产者

X:交换机

Q1,Q2:队列

C1,C2:消息消费者

topics模式与routing模式比较接近,topics模式不能具有任意的routingKey,必须由一个英文句点号"."分割的字符串(我们将被句点号"."分割开的每一段独立的字符串称为一个单词),比如:fc.orange.fox,主题模式下的路由键routingKey 可以存放在两种特殊字符"*"和"#",用于做模糊匹配,其中"*"匹配任何一个单词,"#"匹配0个或者多个单词

我们就以上图为例子

如果一个路由键(routingKey)设置为xxx.orange.rabbit,那么该消息会同时路由到Q1,Q2

如果一个路由建(routingKey)设置为lazy.fc,那么该消息会路由到Q2

如果一个路由建(routingKey)设置为fc.lazy.fox,那么该消息会被丢弃,因为它没有匹配到任何路由键

2. 场景假设

我们在日志系统中,会有不同的消息,例如:infoerrorwarning等日志消息,那么在我们的路由模式中,我们就会定义三个路由键infoerrorwarning,这样我们就可以接收到消息啦。

但是我们现在的需求变了,我想要既接收info的消息,还要接收warning的消息,那又要咋办呢?

就算这样我们也不要怕,遇到恐惧就是面对恐惧,奥利给

既然要解决,那就要使用到我们今天的主角啦。。。

主题模式

二、生产者

那我们还是废话不多说,直接上代码

package com.codeworld.rabbitmqdemo.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 生产者---主题模式
* @author dreamcode
*/
public class Send {

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topics_exchange";

/**
* info路由的名称
*/
private static final String TOPICS_NAME_INFO = "log.topics.info";

/**
* warning路由的名称
*/
private static final String TOPICS_NAME_WARNING = "log.topics.warning";


/**
* error路由的名称
*/
private static final String TOPICS_NAME_ERROR = "log.topics.error";



public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"topic");

// 发送info消息
String infoMessage = "topics info";

channel.basicPublish(EXCHANGE_NAME,TOPICS_NAME_INFO, null,infoMessage.getBytes("UTF-8"));

System.out.println("send:" + infoMessage);

// 发送warning消息
String warningMessage = "topics warning";

channel.basicPublish(EXCHANGE_NAME,TOPICS_NAME_WARNING, null,warningMessage.getBytes("UTF-8"));

System.out.println("send:" + warningMessage);

// 发送error消息
String errorMessage = "topics error";

channel.basicPublish(EXCHANGE_NAME,TOPICS_NAME_ERROR, null,errorMessage.getBytes("UTF-8"));

System.out.println("send:" + errorMessage);

channel.close();

connection.close();
}
}

其实代码和路由模式基本上一样,这里就不用过多的解释吧~~

好吧,为了凑字数,我还是解释一下吧

1.定义基本量

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topics_exchange";

/**
* info路由的名称
*/
private static final String TOPICS_NAME_INFO = "log.topics.info";

/**
* warning路由的名称
*/
private static final String TOPICS_NAME_WARNING = "log.topics.warning";

/**
* error路由的名称
*/
private static final String TOPICS_NAME_ERROR = "log.topics.error";

这里就不用解释吧。。。

2. 连接并开启通道

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

2. 声明交换机类型

// 声明交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"topic");

这里需要注意啦

主题模式—》topic

路由模式—》direct

发布/订阅模式—》fanout

特别注意一下,单词一定不要写错哦

4. 发送消息

这里我们就把全部粘贴出来

// 发送info消息
String infoMessage = "topics info";

channel.basicPublish(EXCHANGE_NAME,TOPICS_NAME_INFO, null,infoMessage.getBytes("UTF-8"));

System.out.println("send:" + infoMessage);

// 发送warning消息
String warningMessage = "topics warning";

channel.basicPublish(EXCHANGE_NAME,TOPICS_NAME_WARNING, null,warningMessage.getBytes("UTF-8"));

System.out.println("send:" + warningMessage);

// 发送error消息
String errorMessage = "topics error";

channel.basicPublish(EXCHANGE_NAME,TOPICS_NAME_ERROR, null,errorMessage.getBytes("UTF-8"));

System.out.println("send:" + errorMessage);

其实和路由模式没什么两样的

三、消费者

  • 消费者1
package com.codeworld.rabbitmqdemo.topics;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者1---主题模式
* @author dreamcode
* 接收所有消息
*/
public class Rec1 {


/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topics_exchange";

/**
* 路由的名称
* 接收所有的消息
*/
private static final String TOPICS_NAME = "log.topics.*";

/**
* 队列名称
*/
private static final String TOPICS_NAME_QUEUE = "topics_queue_all";


public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");

// 声明队列
channel.queueDeclare(TOPICS_NAME_QUEUE,false,false,false,null);

// 绑定队列
channel.queueBind(TOPICS_NAME_QUEUE,EXCHANGE_NAME,TOPICS_NAME);

// 保证一次只分发一次
channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("rec:"+ message);

}
};

boolean autoAck = true;

channel.basicConsume(TOPICS_NAME_QUEUE,autoAck, consumer);
}
}

1. 定义基本量

/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topics_exchange";

/**
* 路由的名称
* 接收所有的消息
*/
private static final String TOPICS_NAME = "log.topics.*";

/**
* 队列名称
*/
private static final String TOPICS_NAME_QUEUE = "topics_queue_all";

注意交换机(Exchange)是否和生产者定义的交换机(Exchange)一致

这里我们的消费者1呢,是接收发送过来的全部消息,什么infowarningerror我统统都要了,

哇哦,老大就是不一样哈,管它什么消息,我全部都要,你吃的下吗?

这里我们就比较注重我们定义的路由键啦

private static final String TOPICS_NAME = "log.topics.*";

“*” :代表匹配一个单词

也就是说只要是log.topics开头的我都要

2. 连接并开启通道

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

3. 声明交换机类型

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");

4. 声明队列

// 声明队列
channel.queueDeclare(TOPICS_NAME_QUEUE,false,false,false,null);

5. 绑定队列并指定路由

// 绑定队列
channel.queueBind(TOPICS_NAME_QUEUE,EXCHANGE_NAME,TOPICS_NAME);
  • 消费者2
package com.codeworld.rabbitmqdemo.topics;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 消费者1---主题模式
* @author dreamcode
* 接收info
*/
public class Rec2 {


/**
* 交换机名称
*/
private static final String EXCHANGE_NAME = "topics_exchange";

/**
* 路由的名称
* 接收所有的消息
*/
private static final String TOPICS_NAME = "log.topics.info";

/**
* 队列名称
*/
private static final String TOPICS_NAME_QUEUE = "topics_queue_info";


public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");

// 声明队列
channel.queueDeclare(TOPICS_NAME_QUEUE,false,false,false,null);

// 绑定队列
channel.queueBind(TOPICS_NAME_QUEUE,EXCHANGE_NAME,TOPICS_NAME);

// 保证一次只分发一次
channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel){

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("rec:"+ message);
}
};

boolean autoAck = true;

channel.basicConsume(TOPICS_NAME_QUEUE,autoAck, consumer);
}
}

这里呢就好像是路由模式时,我们只接收info传过来的消息

四、截图

  • 交换机

  • 队列

  • 交换机绑定的队列

  • 运行截图

生产者

消费者1(接收所有消息)

消费者2(接收info消息)

五、结束

OK,在这里我们的主题模式就到这里了,不知道你有没有帮助到你呢?

若没有看懂请前往官网 RabbitMQ

看到这里的人都是技术人才,你的浏览就是我前进的动力

欢迎加入QQ群: