RabbitMQ实践二(消峰限流)

Hello,我是一名在互联网捡破烂的程序员,最近破烂还挺好捡的,每天都是东逛逛西逛逛,收了很多的破烂呢。。。

收废铁了,十块一斤。快拿来卖哦,什么烂电冰箱,烂电视机,不管什么破烂我都要。。。

每天骑着我的烂三轮车,每天都是活的苟且偷生的,我好可怜。。。

呜呜呜呜

不管有钱木钱,都进来看一看瞧一瞧哦。。

好了~

废话我们就不多说了,我最近在收废铁的时候收到一本武功秘籍,发现了新大陆,今天我就来和你们分享一下。

我们一起成为武林霸主,来吧。氛围搞起来。。。。。

首先呢,我们在上一期中对 RabbitMQ做了一个小小的实践,主要是对 RabbitMQ的异步特性进行了分析。

如果小伙伴还没有修炼的,赶紧去修炼去吧,我们要慢慢的成长起来,要慢慢的修炼武功秘籍,不是一天两天就可以练成的,让我们一起成为世界的主宰吧,嗯哈哈哈哈哈

RabbitMQ实践一(异步解耦)

如果你还是一个一个刚刚入门的小伙伴呢,那你的加紧你的步伐,赶快去从武功秘籍的第一页开始吧,你也不要慌张,我们都是从小白开始的,只不过你要多花一点时间来完成前面的修炼,终有一天你会超过的,哈哈哈

RabbitMQ基本使用一(简单介绍)

RabbitMQ基本使用二(简单队列)

RabbitMQ基本使用三(工作队列)

RabbitMQ基本使用四(发布/订阅模式)

RabbitMQ基本使用五(路由模式)

RabbitMQ基本使用六(主题模式)

RabbitMQ入门呢?就是上面的这么多啦。自己根据官网来理解的知识点,写的不是那么很好,不过很通俗易懂。代码有详细介绍。如有问题,请留言,多多包涵。

好吧,我们就废话不多说了,那就开始我们的表演吧!!!!

一、开篇前提

本文篇幅比较长,请耐心阅读,你会收获很多的。

今天给大家带来的是 RabbitMQ的消峰限流,我们知道现在互联网越来越强大,我们的系统也是越来越完善,在高峰期呢,系统将要承受巨大的压力,那么也是我们程序员的压力。像淘宝、京东大型网站购物系统每逢双十一,那就是程序员最忙的时候,当天要承受千万级别的流量冲击,不得不抵住压力啊。

想必大家都知道哦我要说什么了吧,没错,就是你想的那样,我们就是要对这千万级别的流量打交道。我们要抵住流量的冲击。让它能够缓解我们的系统的压力,系统压力小了,我们自身的压力就小了。OK

1. 场景介绍

  • 场景一

    我们知道在我们双十一中都会有秒杀的商品,我们所秒杀的商品价格都是非常低的,并且商品也是非常好,比如华为Mate30只要999,、苹果12只要99等等这些秒杀商品,不管是谁,看了都会心动。但是呢,商品的数量是有限的,不是每个用户都会抢到,全国14亿用户,就拿一半的人来抢,那个流量冲击是我们无法想像的。那要是全部流量打在我们的数据库中,那就只有说再见。。凉凉,最后还是我们程序员扛下了所有,那么我们应该怎么办呢?

  • 场景二

    相信大多数人都抢过火车票吧,肯定有没有抢到票的吧。比如在我们的12306抢票网站,每次到一定的时间段都会有大量用户涌入抢票,可能我会遇到过服务器忙、或者加载失败等情况。那么在这么大的流量下,我们是怎么抗住的呢?

2. 问题描述

在我们面对瞬时流量的情况下,全部的流量都打在我们的数据库中,那是很难受的。

那么我们应该怎么来解决这种瞬时流量下的并发情况?

在我们秒杀中,库存只有一份,所有人会集中在时间读和写写这些数据,多人读取同一个数据

3. 优化方案

我们无论在抢票或者秒杀商品中时,为什么我没有抢到,别人却抢到了?

下面带你打开这扇门

  • 我们将请求尽量拦截在系统的上游(不要让锁冲突到数据上)。为什么那些传统的秒杀系统会挂,那是因为所有的请求都压到了数据层上,导致数据读取锁冲突,并发的响应数据慢,巨几乎都是所有请求超时。流量虽大,下单成功的有效流量去很少。如果我们秒杀商品库存有100件,那么有1000W人来抢,那么那时的瞬时流量很大,但是基本没有人秒杀成功,原因在于所有的流浪都打在我们的数据层上,到时响应速度慢,请求的有效率为0。那么这是一次很失败的秒杀活动。

    那用户不得吐槽啊,还不得上热搜啊,某某什么秒杀系统,垃圾的很,没人秒杀成功,都是请求超时,哎,溜了溜了

    那程序员就遭殃了啊,老板还不得直接暴扣到头上啊,那是很难受的,薪水都要大打折扣。

  • 充分利用缓存来实现这种瞬时流量大的情况,秒杀买票,这就是一个典型的读多写少的应用场景,大部分请求是车次的查询,票查询,下单和支付才是写请求。一趟火车其实只有2000张票,200W人来买,最多只有2000人下单成功,其他人都是查询库存,写比例只有0.1%,读的比例占99.9%,非常适合用缓存来优化。

这下知道为什么有时候抢不到商品了吧

这样的设计就是为了牺牲用户流量来换系统稳定

4. 架构

这个架构我是根据自己的理解来的,可能不怎么符合理念,知道大概意思就是了哈

OK,说了这么多,我们就开始我们的代码设计吧

二、实践操作

1. 项目开始阶段

下面的所有代码都是本人独立完成,可能不是那么完美,不过应该问题不大吧

如有错误之处,还请包涵,多多指出

这里我们加入我们的 Redis缓存技术,有时间会更新的。。

首先我们要有 RabbitMQ的客户端,可以在Linux上安装,也可以在Windows上安装。

我使用的是Linux上安装的,可以去参考 RabbitMQ在Linux下安装

1.1 创建SpringBoot项目

这个就不用多说了吧,这个都是我们的家常便饭了

1.2 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

主要是导入我们我们的 MQ依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
1.3 修改application.yml
server:
port: 9000
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/test_rabbitmq?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
username: root
password: root
rabbitmq:
addresses: 192.168.2.2
virtual-host: /
username: guest
password: guest
template:
exchange: ORDER.EXCHANGE
listener:
simple:
#指定最小的消费者数量
concurrency: 1
retry:
enabled: false #是否支持重试
prefetch: 100
acknowledge-mode: manual
logging:
level:
com.example.rabbitmq.mapper: debug
mybatis:
mapper-locations: classpath:mapper/*.xml

主要配置的说明

1.3 数据库连接
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/test_rabbitmq?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
username: root
password: root

将用户名和密码修改为自己的

如果你的数据库是在Linux上,那么你需要将localhost修改为你的IP地址,并开启3306端口

1.4 RabbitMQ基本配置
  • 基本配置
rabbitmq:
addresses: 192.168.2.2
virtual-host: /
username: guest
password: guest

注意,我这里没有列出格式

rabbitmq这是是在spring下的

address:这个是你的IP地址,同样的如果你的 RabbitMQ是在Linux上,使用Linux的IP地址。若是windows版本,直接使用localhost

virtual-host:这个是你rabbitmq上的一个虚拟主机

username:用户名,如果你没有添加用户:默认的是guest

password:密码,同样,如果你没有添加用户:默认是guest

具体说明请详见 RabbitMQ在Linux下安装

  • 默认交换机
template:
exchange: ORDER.EXCHANGE

这里我们可以定义默认的交换机名称,那么在代码中我们就需要设置这样的名称

  • 消费者监听配置
listener:
simple:
#指定最小的消费者数量
concurrency: 1
retry:
enabled: false #是否支持重试
prefetch: 100 #每次处理的消息
acknowledge-mode: manual #手动确认

上面的注释已经很清楚明白了吧,具体需要配置自己可以自行配置。

1.5 日志打印配置(主要打印我们的SQL语句)
logging:
level:
com.example.rabbitmq.mapper: debug
1.6 mybatis配置文件位置
mybatis:
mapper-locations: classpath:mapper/*.xml

2. 数据库(test_rabbitmq)

2.1 Order(订单表)
DROP TABLE IF EXISTS `test_order`;

CREATE TABLE `test_order` (
`order_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单Id',
`order_user_name` varchar(255) DEFAULT NULL COMMENT '订单人的名称',
`order_user_email` varchar(255) DEFAULT NULL COMMENT '订单人的邮箱',
`order_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '订单时间',
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB AUTO_INCREMEN
2.2 Googs(商品表)
DROP TABLE IF EXISTS `test_goods`;

CREATE TABLE `test_goods` (
`goods_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '商品Id',
`goods_name` varchar(255) DEFAULT NULL COMMENT '商品名称',
`goods_stock` int(100) DEFAULT NULL COMMENT '商品库存',
PRIMARY KEY (`goods_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
insert into `test_goods`(`goods_id`,`goods_name`,`goods_stock`) values (1,'商品',0);

3. 基本类的创建

3.1 数据响应类

ApiResponse

package com.example.rabbitmq.common;
import java.util.HashMap;

/**
* 数据响应返回
*/
public class ApiResponse extends HashMap<String, Object> {
/**
* 状态码
*/
private Integer code;
/**
* 信息
*/
private String msg;

@Override
public Object put(String key, Object value) {

super.put(key, value);

return this;
}

public ApiResponse code(Integer code){

this.put("code", code);

return this;
}

public ApiResponse msg(String msg){

this.put("msg",msg);

return this;
}
}

用户下单响应数据

抢单成功,返回给用户信息code=200,msg='抢单成功'

抢单失败,返回给用户信息code=400,msg='抢单失败,或者是请求超时'

3.2 订单类

Order

package com.example.rabbitmq.entity;

import lombok.Data;

import java.util.Date;

/**
* 基本类
*/
@Data
public class TestOrder {

/**
* 订单Id
*/
private Long orderId;

/**
* 订单人名称
*/
private String orderUserName;

/**
* 订单人邮箱
*/
private String orderUserEmail;

/**
* 订单时间
*/
private Date orderDate;
}

这里我们使用的是一个注解@Data,这个注解来源于依赖lombok,要使用这个依赖不仅需要添加这个依赖,而且在IDEA中还要下载这个插件

我相信现在绝大多数人都是使用的IDEA编译器吧~

如果还有小伙伴不知道这个编译器或者没有使用的,那就赶快行动起来吧!!!

IDEA这款编译器是真的很友好

哇哦,扯到一边去了。。。

OK,我们回归正题

3.3 商品类

Goods

package com.example.rabbitmq.entity;

import lombok.Data;

/**
* 商品
*/
@Data
public class TestGoods {

/**
* 商品ID
*/
private Long goodsId;

/**
* 商品名称
*/
private String goodsName;

/**
* 商品库存
*/
private Integer goodsStock;
}

首先呢我们要模拟一个没有限流的场景

我们先要将goods表中的库存修改为100(自己自定义),这里我们就模拟只有100个库存的商品

我的习惯是从Controller层到持久层,这个因人而异吧。。没有什么强制要求

4. 无限流操作(正常操作)

4.1订单Controller层

OrderController

package com.example.rabbitmq.controller;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


/**
* 订单Controller
*/
@RestController
@RequestMapping("order")
public class OrderController {

@Autowired(required = false)
private OrderService orderService;

private static Integer count = 0;

private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
/**
* 无RabbitMQ创建订单
* @return
*/
@GetMapping("/save/{goodsId}")
public ApiResponse save(@PathVariable("goodsId") Long goodsId){

ApiResponse apiResponse = this.orderService.save(goodsId);

LOGGER.info("流量请求:" + count++);

return apiResponse;
}
}

上面代码可能会报错,不过不要慌,心不乱,手不抖,我们跟着感觉走

那是因为我们有些类还没有创建

接下里就是我们的Service类创建

4.2 订单Service层

OrderService

package com.example.rabbitmq.service;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestOrder;

import java.util.List;

public interface OrderService {
/**
* 无RabbitMQ消峰限流
* @return
*/
ApiResponse save(Long goodsId);
}
4.3 订单Service实现类

OrderServiceImpl

package com.example.rabbitmq.service.impl;

import com.example.rabbitmq.common.ApiResponse;
import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.entity.TestOrder;
import com.example.rabbitmq.mapper.GoodsMapper;
import com.example.rabbitmq.mapper.OrderMapper;
import com.example.rabbitmq.service.OrderService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;

/**
* OrderService
*/
@Service
public class OrderServiceImpl implements OrderService {


@Autowired(required = false)
private AmqpTemplate amqpTemplate;

@Autowired(required = false)
private OrderMapper orderMapper;

@Autowired(required = false)
private GoodsMapper goodsMapper;


/**
* 无RabbitMQ消峰限流
*
* @return
*/
@Override
public ApiResponse save(Long goodsId) {

if (goodsId == null){

return new ApiResponse().code(400).msg("参数错误");
}

// 根据商品Id查询商品
TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);

// 商品不存在或者商品库存为0
if (testGoods == null || testGoods.getGoodsStock() <= 0){

return new ApiResponse().code(400).msg("商品不存在或者库存为0");
}

// 直接添加
// 创建订单
TestOrder testOrder = new TestOrder();

// 设置参数
testOrder.setOrderUserEmail("111@qq.com");

testOrder.setOrderUserName("FC");

testOrder.setOrderDate(new Date());

this.orderMapper.save(testOrder);

// 更新库存
this.goodsMapper.updateGoodsStock(goodsId);

return new ApiResponse().code(200).msg("订单创建成功");
}
}

我们来解释一下我们的正常逻辑

  • 判断参数是否有效
if (goodsId == null){
return new ApiResponse().code(400).msg("参数错误");
}
  • 查询商品信息
// 根据商品Id查询商品
TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);

// 商品不存在或者商品库存为0
if (testGoods == null || testGoods.getGoodsStock() <= 0){

return new ApiResponse().code(400).msg("商品不存在或者库存为0");
}

先从数据库中查询商品信息,返回查询结果

判断是否为空或者是库存是否为0

  • 创建订单
 // 直接添加
// 创建订单
TestOrder testOrder = new TestOrder();

// 设置参数
testOrder.setOrderUserEmail("111@qq.com");

testOrder.setOrderUserName("FC");

testOrder.setOrderDate(new Date());

this.orderMapper.save(testOrder);

上面条件都满足,那么我们可以下订单了

  • 更新库存
// 更新库存
this.goodsMapper.updateGoodsStock(goodsId);

当然呢我们下单成功了,当然要更新我们的库存呐。

那这就是我们下单的一般逻辑啦

这里肯定是有问题的啦,这样我们所有的请求都打在我们的数据库上了,那数据库肯定是罩不住的,那要崩啊

稍后会给出解决方案

4.4 商品Service层

GoodsService

package com.example.rabbitmq.service;

import com.example.rabbitmq.entity.TestGoods;

public interface GoodsService {

/**
* 根据商品Id查询商品库存
* @param goodsId
* @return
*/
TestGoods selectGoodsById(Long goodsId);

/**
* 更新库存
* @param goodsId
*/
void updateGoodsStock(Long goodsId);
}
4.5 商品Service实现类

GoodsServiceImpl

package com.example.rabbitmq.service.impl;

import com.example.rabbitmq.entity.TestGoods;
import com.example.rabbitmq.mapper.GoodsMapper;
import com.example.rabbitmq.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* 商品Service
*/
@Service
public class GoodsServiceImpl implements GoodsService {
@Autowired(required = false)
private GoodsMapper goodsMapper;
/**
* 根据商品Id查询商品
* @param goodsId
* @return
*/
@Override
public TestGoods selectGoodsById(Long goodsId) {

if (goodsId == null){

return null;
}

TestGoods testGoods = this.goodsMapper.selectStockById(goodsId);

return testGoods;
}

/**
* 更新库存
*
* @param goodsId
*/
@Override
public void updateGoodsStock(Long goodsId) {

if (goodsId == null){

return;
}

try {

this.goodsMapper.updateGoodsStock(goodsId);

return;

}catch (Exception e) {

return;
}
}
}
4.6 商品Mapper层

GoodsMapper

package com.example.rabbitmq.mapper;

import com.example.rabbitmq.entity.TestGoods;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

@Mapper
@Repository
public interface GoodsMapper {

/**
* 根据商品Id查询商品
* @param goodsId
* @return
*/
TestGoods selectStockById(Long goodsId);

/**
* 更新库存
* @param goodsId
*/
void updateGoodsStock(Long goodsId);
}
4.7商品Mapper的xml文件

GoodsMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >

<mapper namespace="com.example.rabbitmq.mapper.GoodsMapper">


<resultMap id="BaseGoodsMap" type="com.example.rabbitmq.entity.TestGoods">

<id property="goodsId" column="goods_id"/>

<result property="goodsName" column="goods_name"/>

<result property="goodsStock" column="goods_stock"/>

</resultMap>


<!-- 根据商品Id查询商品 -->
<select id="selectStockById" resultMap="BaseGoodsMap" parameterType="java.lang.Long">

select goods_stock from test_goods where goods_id = #{goodsId};

</select>

<!-- 更新库存 -->
<update id="updateGoodsStock" parameterType="java.lang.Long">

update test_goods set goods_stock = goods_stock - 1 where goods_id = #{goodsId} ;

</update>

</mapper>
4.8 订单Mapper层

OrderMapper

package com.example.rabbitmq.mapper;

import com.example.rabbitmq.entity.TestOrder;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Repository;

/**
*
*/
@Mapper
@Repository
public interface OrderMapper {
/**
* 无RabbitMQ消峰限流保存订单
*/
void save(TestOrder testOrder);
}
4.9 订单Mapper的xml文件

OrderMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >

<mapper namespace="com.example.rabbitmq.mapper.OrderMapper">


<resultMap id="BaseOrderMap" type="com.example.rabbitmq.entity.TestOrder">

<id property="orderId" column="order_id"/>

<result property="orderUserName" column="order_user_name"/>

<result property="orderUserEmail" column="order_user_email"/>

<result property="orderDate" column="order_date"/>

</resultMap>
<insert id="save" parameterType="com.example.rabbitmq.entity.TestOrder">

insert into test_order (order_user_name, order_user_email, order_date) values (

#{orderUserName},

#{orderUserEmail},

#{orderDate}
);
</insert>
</mapper>

OK,这里我们基本操作就完成了,这些都是我们的正常操作

那么我们启动项目来测试一下

4.10 测试

这里我们先用postman来测试我们的接口

我们可以在控制台看见

我们的全部请求都到了数据库上,我们这里只是发起了两次请求,数据库肯定不会啊,数据库应该每秒能够撑起2000次请求吧,那么要是超过了2000,那就要出现问题了呀。。。

我们来看数据库中的数据

库存信息

订单信息

这样我们就可以轻松的创建订单了,哇哈哈哈哈

但是事实不撩人啊,咋个这么轻松哦,,那是不可能的

那么我们就需要压力测试来啦,这里我们使用的是jmeter

这里我们创建了4000个线程来请求我们接口

这样看我们的数据库压力还大不大,哼

那就来瞧一瞧我们数据库怎么来解决吧

妈呀,这是什么哦,怎么可以乱来哦。

这里我们看见,虽然有查询,有添加,还有更新库存的操作

但是呢,别个还在执行的时候,我还在下订单的时候,另外一个就在更新库存,Are You Sure?

这。。。。

这我看不下去了

那么我们再来看看绝望的时刻

哇哦,这就很尴尬了,这样下去不得了啊,我们不得亏死啊,尽然还超卖了。。。。

为什么会出现这样的情况呢?

当我们的请求流量瞬时就来了,而且一般还是同一时间来的,这样我们的全部流量就打到我们的数据库上,

当我们一个线程还在查询这一步,查询出来哦,还有1个库存,可以下订单,那么另外一个线程也来了,

查询出来,哦,我也还有1个库存,可以下订单。

这样两个线程都去更新库存信息,这样就会出现超卖的情况那。

在我们秒杀活动中,如果这样去实现,那不得亏死哦。

那么我们应该怎么办呢?左想想,右想想

哦,我们不是学过消息队列吗?我们可以用这个来进行优化啊。。。

消息队列中不是可以消峰限流吗?

鼠标往上一划,哇哦,原来写了这么多啦,那我们就先这样结束吗?

不会,我们在下一节会继续讲到的,我们下期再见啦

RabbitMQ实践二(消峰限流补充)

OK,在这里我们的实践操作二(消峰限流)就到这里了,不知道你有没有帮助到你呢?

看到这里的人都是技术人才,你的浏览就是我前进的动力

欢迎加入QQ群: