RabbitMQ实践二(消峰限流补充)

Hello,我是一名在互联网捡破烂的程序员,最近破烂还挺好捡的,每天都是东逛逛西逛逛,收了很多的破烂呢。。。

收废铁了,十块一斤。快拿来卖哦,什么烂电冰箱,烂电视机,不管什么破烂我都要。。。

每天骑着我的烂三轮车,每天都是活的苟且偷生的,我好可怜。。。

呜呜呜呜

不管有钱木钱,都进来看一看瞧一瞧哦。。

好了~

今天我们来接着讲,如果你是直接来阅读的这一期的话,那你要去看上一期的内容哦,这样我们才可以衔接起来的。

RabbitMQ实践二(消峰限流)

限流操作

我们就直接从Controller层开始讲解了哈

1. 修改我们的OrderController

OrderController

package com.example.rabbitmq.controller;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


/**
* 订单Controller
*/
@RestController
@RequestMapping("order")
public class OrderController {


@Autowired(required = false)
private OrderService orderService;

private static Integer count = 0;

private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);

/**
* 使用RabbitMQ限流创建订单
* @return
*/
@PostMapping("create/{goodsId}")
public ApiResponse create(@PathVariable("goodsId") Long goodsId){

ApiResponse apiResponse = this.orderService.create(goodsId);

LOGGER.info("流量请求:" + count++);

return apiResponse;
}

/**
* 无RabbitMQ创建订单
* @return
*/
@PostMapping("/save/{goodsId}")
public ApiResponse save(@PathVariable("goodsId") Long goodsId){

ApiResponse apiResponse = this.orderService.save(goodsId);

LOGGER.info("流量请求:" + count++);

return apiResponse;
}
}

其中我们只添加了一个使用限流操作的接口,和普通的接口一样一样的

2. 修改我们的订单Service

OrderService

package com.example.rabbitmq.service;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;

import java.util.List;

public interface OrderService {

/**
* 使用RabbitMQ限流创建订单
* @return
*/
ApiResponse create(Long goodsId);

/**
* 无RabbitMQ消峰限流
* @return
*/
ApiResponse save(Long goodsId);

/**
* 创建订单
* @param testOrder
*/
void createOrder(TestOrder testOrder);
}

其中的create方法主要是将我们的请求全部接到消息队列中

真正创建订单的方法是createOrder

3. 修改我们的订单ServiceImpl

OrderServiceImpl

package com.example.rabbitmq.service.impl;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.mapper.GoodsMapper;
import com.example.rabbitmq.mapper.OrderMapper;
import com.example.rabbitmq.service.OrderService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;

/**
* OrderService
*/
@Service
public class OrderServiceImpl implements OrderService {

@Autowired(required = false)
private AmqpTemplate amqpTemplate;

@Autowired(required = false)
private OrderMapper orderMapper;

@Autowired(required = false)
private GoodsMapper goodsMapper;

/**
* 使用RabbitMQ限流创建订单
*
* @return
*/
@Override
public ApiResponse create(Long goodsId) {

try {

// 判断参数
if (goodsId == null){

return new ApiResponse().code(444).msg("参数错误");
}

// 发送消息
this.amqpTemplate.convertAndSend("order.create",goodsId);

return new ApiResponse().code(200).msg("下单中,请稍后");

}catch (Exception e){

return new ApiResponse().code(500).msg("服务器错误");
}

}


/**
* 使用RabbitMQ限流创建订单
*
* @param testOrder
* @return
*/
@Override
public void createOrder(TestOrder testOrder) {

this.orderMapper.create(testOrder);
}

/**
* 无RabbitMQ消峰限流
*
* @return
*/
@Override
public ApiResponse save(Long goodsId) {

if (goodsId == null){

return new ApiResponse().code(400).msg("参数错误");
}

// 根据商品Id查询商品
TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);

// 商品不存在或者商品库存为0
if (testGoods == null || testGoods.getGoodsStock() <= 0){

return new ApiResponse().code(400).msg("商品不存在或者库存为0");
}

// 直接添加
// 创建订单
TestOrder testOrder = new TestOrder();

// 设置参数
testOrder.setOrderUserEmail("111@qq.com");

testOrder.setOrderUserName("FC");

testOrder.setOrderDate(new Date());

this.orderMapper.save(testOrder);

// 更新库存
this.goodsMapper.updateGoodsStock(goodsId);

return new ApiResponse().code(200).msg("订单创建成功");
}
}

在这里我们可以清楚的看到了,我们是怎么将请求全部接入到我们的消息队列中的

我们这样做的思想就是,我们需要有一个中间商来帮助我们接收消息,那么这个中间商要比我们的持久层要厉害些,可以接收很多的请求,我们再慢慢的消费这些消息

4. 消费者

OrderListener

package com.example.rabbitmq.listener;

import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.GoodsService;
import com.example.rabbitmq.service.OrderService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;
import java.util.List;

/**
* 订单请求消息生产者
*/
@Component
public class OrderListener {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

@Autowired(required = false)
private OrderService orderService;

@Autowired
private AmqpTemplate amqpTemplate;

@Autowired(required = false)
private GoodsService goodsService;

/**
* 创建订单消息监听
* @param goodsId
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "ORDER.QUEUE", durable = "true"),

arguments = {@Argument(name = "x-max-length", value = "10"),
@Argument(name = "dead-letter-exchange",value = "reject-publish")
},

exchange = @Exchange(value = "ORDER.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),

key = {"order.create"}
))
public void create(Long goodsId, Channel channel, Message message) throws IOException {

try {

if (goodsId == null) {

return;
}

// 先根据商品Id查询商品库存
TestGoods goods = this.goodsService.selectGoodsById(goodsId);

if (goods == null || goods.getGoodsStock() <= 0){

return;

}

// 创建订单
TestOrder testOrder = new TestOrder();

// 设置参数
testOrder.setOrderUserName("FC");

testOrder.setOrderUserEmail("111@qq.com");

testOrder.setOrderDate(new Date());

// 执行添加
this.orderService.createOrder(testOrder);

// 更新库存
this.goodsService.updateGoodsStock(goodsId);

LOGGER.info("消费成功");

} catch (Exception e) {

LOGGER.error("消费失败");

}finally {

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}

这个就不用我多说了吧。这里面的细枝末节,需要自行查阅。

也可以去参考 RabbitMQ实践应用一

这样就可以简单的实现我们的消峰限流啦。。。。

我们开始我们的测试吧

我们首先还是用postman来测试我们的接口是否可用

我们可以在控制台看见

这样我们就简单的消费成功了,库存和订单也都更新和添加成功啦

当然这样我们也是不可能的,那我们还是要去用压力测试来试试能不能顶住呢

还是用我们的jmeter来做压力测试,只需要修改接口就行

这样看我们的就是有条理的执行啦

但是这样我们的请求就没有全部打在数据库上,这样我们就可以实现限流啦。。。

我们再来看我们的数据库呢?有木有像没有限流的出现超卖的情况呀

哟哟哟,尽然没有出现超卖的情况,那这就算是实现了限流操作

我们再来看订单是不是100个呢

我们是从115开始的,看看是不是214结束呢

哇哦,果然是预想的一样。。。。。

我们再看时间,是在同一时间下的订单。。。

那就证明了我们的限流操作

OK,这里我们的消峰限流就全部完成了,可能不是那么完善,也有很多的漏洞。

就算库存为0,流量还是会分批打到我们的数据上面,有木有办法,直接舍弃这些流量呢?

这就需要自己思考了,要化为自己的东西才算是真正理解。。。。

总结

从某种意义上说,消费者的限流策略有助于那么处理消息效率高的消费者多消费一些消息,效率低一些的消费者少推送一些消息,从而可以达到能者多劳的目的,尽可能发挥消费者处理消息的能力。在项目中,为了缓解生产者和消费者两边效率不平衡的影响,通常会对消费者进行限流处理,保证消费者端正常消费消息,尽可能避免服务器崩溃以及宕机现象。

加入Redis缓存实现限流操作

一、为什么要加入Redis缓存

我们在前面实现了简单的限流操作,对用户下订单有了很好的维护及在并发情况下可以撑住。

但是,我们在前面留了一个问题,当我们库存为0时,但是我们的流量还存在很多,虽然是分批打入我们的数据库,这样对我们来说是很不友好的。数据库中根本没有库存了,就不应该将剩余的流量打入到我们的数据层。

那么我我们应该怎么解决呢?

这里我们就加入了 Redis缓存技术,相信你在使用这个技术之前就很了解这个Redis了吧。。。。

二、流程图

流程图不标准,根据自己的理解画的。。。

将就一下吧。。。。

三、代码实现

我们只需要修改部分的代码

3.1 引入新的依赖

相信你猜就知道了吧

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
3.2 修改我们的application.yml
#redis配置
redis:
host: 192.168.2.4 #主机地址
database: 4

使用前需要安装Redis,我们可以在Linux上安装也可以在windows上安装

推荐在Linux上安装,我们的大部分都是部署在Linux上的

Redis的使用操作

3.3 修改我们的OrderListener
package com.example.rabbitmq.listener;

import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;

import com.example.rabbitmq.service.GoodsService;
import com.example.rabbitmq.service.OrderService;
import com.example.rabbitmq.utils.JsonUtils;
import com.rabbitmq.client.Channel;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.locks.ReentrantLock;


/**
* 订单请求消息生产者
*/
@Component
public class OrderListener {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

@Autowired(required = false)
private OrderService orderService;

@Autowired(required = false)
private GoodsService goodsService;

private static String GOODS_NAME = "goods:id:";

@Autowired(required = false)
private StringRedisTemplate stringRedisTemplate;

private static ReentrantLock lock = new ReentrantLock();

/**
* 创建订单消息监听
*
* @param goodsId
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "ORDER.QUEUE", durable = "true"),

arguments = {@Argument(name = "x-max-length", value = "10"),
@Argument(name = "dead-letter-exchange", value = "reject-publish")
},

exchange = @Exchange(value = "ORDER.EXCHANGE", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),

key = {"order.create"}
))
public void create(Long goodsId, Channel channel, Message message) throws IOException {

try {

if (goodsId == null) {

return;
}

String json = this.getRedisData(goodsId);

// Redis缓存中没有命中
if (StringUtils.isBlank(json)){

/**
* 上锁
*/
if (lock.tryLock()){

// 从数据库库中拿到数据
TestGoods testGoods = this.getDBData(goodsId);

if (testGoods == null || testGoods.getGoodsStock() <= 0){

return;
}

// 创建订单
TestOrder testOrder = new TestOrder();

// 设置参数
testOrder.setOrderUserName("FC");

testOrder.setOrderUserEmail("111@qq.com");

testOrder.setOrderDate(new Date());

// 执行添加
this.orderService.createOrder(testOrder);

// 更新库存
testGoods.setGoodsStock(testGoods.getGoodsStock() - 1);

json = JsonUtils.serialize(testGoods);

this.setRedisDate(goodsId, json);

// 释放锁
lock.unlock();
}
} else {

Thread.sleep(100L);

json = this.getRedisData(goodsId);

// 将Json转化为对象
TestGoods testGoods = JsonUtils.parse(json, TestGoods.class);

if (testGoods == null || testGoods.getGoodsStock() <= 0){

return;
}

// 创建订单
TestOrder testOrder = new TestOrder();

// 设置参数
testOrder.setOrderUserName("FC");

testOrder.setOrderUserEmail("111@qq.com");

testOrder.setOrderDate(new Date());

// 执行添加
this.orderService.createOrder(testOrder);

// 更新库存
testGoods.setGoodsStock(testGoods.getGoodsStock() - 1);

json = JsonUtils.serialize(testGoods);

this.setRedisDate(goodsId, json);
}

} catch (Exception e) {

LOGGER.error("消费失败");

} finally {

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

/**
* 从Redis中获取数据
*
* @param goodsId
*/
private String getRedisData(Long goodsId) {

// 从缓存中获取数据
String json = this.stringRedisTemplate.opsForValue().get(GOODS_NAME + goodsId);

return json;
}

/**
* 设置Redis缓存数据
* @param goodsId
* @param json
*/
private void setRedisDate(Long goodsId, String json){

this.stringRedisTemplate.opsForValue().set(GOODS_NAME + goodsId, json);
}

/**
* 从数据库中取出数据
* @param goodsId
* @return
*/
private TestGoods getDBData(Long goodsId){

TestGoods testGoods = this.goodsService.selectGoodsById(goodsId);

return testGoods;
}
}

主要代码解释

其中写的三个方法就不用说了吧

getRedisData:从Redis中获取数据

setRedisData:更新Redis中的数据

getDBData:从数据库中获取数据

  • 首先请求过来会带着一个Id,先判断这个Id是否为空
if (goodsId == null) {
return;
}

若为空,消息直接丢弃

  • 先从 Redis中获取数据
String json = this.getRedisData(goodsId);
  • 判断获取的数据是否为空,也就是缓存中是否有数据,若没有数据,则上锁,从数据库中取出数据
/**
* 上锁
*/
if (lock.tryLock()){
// 从数据库库中拿到数据
TestGoods testGoods = this.getDBData(goodsId);
  • 从数据库中拿到数据后,将其放入缓存中,并释放锁
json = JsonUtils.serialize(testGoods);

this.setRedisDate(goodsId, json);

// 释放锁
lock.unlock();
  • 若从缓存中命中数据,那么让其休眠
Thread.sleep(100L);

json = this.getRedisData(goodsId);

OK,就这样简单的修改好我们的代码了,是不是很简单呢。

四、截图验证

首先来看看我们的数据库中商品的库存信息

再来看看我们的订单表

接下来,就直接上我们的压力测试,这次要使用6000个请求啦,看你还抵不抵得住

首先看我们的控制台

我们的消息队列直接将全部的瞬时流量收入囊中

然后我们在分批处理我们的流量,也就是下订单啦

OK,我们来看看我们的Redis缓存是否正确呢?预判库存为0

哇哦,那还可以,基本实现了

下面才是我们的重头戏

看看我们的数据库是否有错呢?

先来看库存,预判为100

为什么呢?我们在下单中,基本上就没有和数据库打交道,把它给撇开了

我们直接是在缓存中进行的,根本没有更新数据库的商品库存,那肯定是100啦

哇哦,果真是诶。。。。

那再来看我们订单表呢,你有木有超出100条呢

一张图截不完,我们是从715开始的,看看是否是814结束呢

哇哦,也是诶,那这就对了嘛。。。

我们仔细一看,这一次我们下单的时间就有差别了,没有同一时间将下单。。。

五、总结

我们使用Redis缓存,大大减轻了我们数据库的压力,查询商品只需要访问一次数据库,查询的数据放入缓存。

订单的下单时间也有所优化。

你们有木有什么优化的呢?可以思考哦,其实还有很多方案

OK,在这里我们的实践操作二(消峰限流)就到这里了,不知道你有没有帮助到你呢?

看到这里的人都是技术人才,你的浏览就是我前进的动力

欢迎加入QQ群: