Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

RabbitMQ入门使用

1、同步、异步通信

我们服务之间的通信,一般分为同步和异步两种方式。

同步就是:当两个服务间通信时,必须要每一步按照顺序进行之后再进行下一步的内容。看上去同步可以立即得到结果,好像很好。但是这样的通信方式往往就会产生一些问题。

  1. 耦合度较高。当我们加入新的需求时,需要修改很多的代码。
  2. 性能不好。因为每次都要等待上一个服务完成,所以会消耗很多时间,导致性能下降.
  3. 资源浪费。在等待服务完成的时候,不能释放资源,也不能做事情,就导致了资源的浪费。
  4. 级联失败。如果服务调用过程中一个环节出现错误,就会导致其他环节跟着出现错误。就像是多米诺骨牌一样。

异步就是:当两个服务间通信时,A将资料传给B的时候,就可以继续去做自己的事情,而不用一直等待B传送消息回来,在B有空的时候处理就可以。同时,异步通信解决了一些同步通信存在的问题:耦合度低吞吐量提升故障隔离流量削峰
但是同时也带来了一些新的问题:

  1. 依赖于Broker的可靠性、安全性、吞吐能力
  2. 架构复杂了,业务没有明显的流程线,不好追踪管理

所以我们在使用的时候,应该根据实际情况来使用异步或者同步通信。

2、消息队列(MQ)简介

MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

2.1、常见的MQ

以下就是四种我们常用的MQ以及它们之间的对比:
This is a picture without description

接下来我们就以RabbitMQ来作为我们学习的工具。

2.2、RabbitMQ介绍以及安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

由于它是基于Erlang语言开发的,所以我们必须安装Erlang语言的开发环境。
为了省时间,我们这里直接在Dockers里面进行安装并做一个映射。
步骤如下:

  1. 下载RabbitMQ的镜像
    1
    docker pull rabbitmq:3-management
  2. 安装mq
    1
    2
    3
    4
    5
    6
    7
    8
    9
    docker run \
    -e RABBITMQ_DEFAULT_USER=chenyicai \
    -e RABBITMQ_DEFAULT_PASS=123456 \
    --name mq \
    --hostname mq1 \
    -p 15672:15672 \
    -p 5672:5672 \
    -d \
    rabbitmq:3-management
    然后我们的RabbitMQ就安装完成了,现在我们可以在浏览器输入服务器地址+15672进入RabbitMQ的控制台
    This is a picture without description
    账号密码就是我们前面设置好的chenyicai123456
    登录之后:
    This is a picture without description

以下是RabbitMQ中的几个基本概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

3、常见的消息模型

3.1、官方给出的消息模型

官方一共给出的五个MQ的Demo,分别对应了几种不同的用法:

  1. 基本消息队列(BasicQueue)
  2. 工作消息队列(WorkQueue)
  3. 发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
    1. Fanout Exchange:广播
    2. Direct Exchange:路由
    3. Topic Exchange:主题

This is a picture without description
This is a picture without description

3.2、Hello World

现在我们就在SpringBoot里面实现以下官方示例中的Hello World这个例子。
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  1. publisher:消息发布者,将消息发送到队列queue
  2. queue:消息队列,负责接受并缓存消息
  3. consumer:订阅队列,处理队列中的消息

所以我们实现这个例子的步骤如下:
基本消息队列的消息发送流程:

  1. 建立connection
    1
    2
    3
    4
    5
    6
    7
    8
    9
    ConnectionFactory factory = new ConnectionFactory();
    //设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("8.129.212.155");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("chenyicai");
    factory.setPassword("123456");
    //建立连接
    Connection connection = factory.newConnection();
  2. 创建通道channel
    1
    Channel channel = connection.createChannel();
  3. 利用channel声明队列
    1
    2
    3
    4
    //队列名
    String queueName = "simple.queue";
    //队列的属性
    channel.queueDeclare(queueName, false, false, false, null);
  4. 利用channel向队列发送消息
    1
    2
    3
    String message = "hello, rabbitmq!";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("发送消息成功:【" + message + "】");

基本消息队列的消息接收流程:

  1. 建立connection
  2. 创建通道channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
    利用channel将消费者与队列绑定
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    channel.basicConsume(queueName, true, new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
    AMQP.BasicProperties properties, byte[] body) throws IOException {
    // 5.处理消息
    String message = new String(body);
    System.out.println("接收到消息:【" + message + "】");
    }
    });
    System.out.println("等待接收消息。。。。");

很显然如果每次使用消息队列都这样开关连接,那么是非常不好的一种现象,所以我们接下来就要使用Spring的一个框架,来更好地帮助我们对消息队列进行使用。

4、SpringAMQP

4.1、什么是SpringAMQP

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
This is a picture without description

AMQP:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接受消息。包含两部分,其中Spring-AMQP是基础抽象,Spring-rabbit是底层的默认实现。

4.2、SpringAMQP实现Hello World

Spring中已经集成了AMQP的依赖,所以我们只需要引入依赖就可以实现自动注入,具体步骤如下:

  1. 引入SpringAMQP的依赖
    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 在publisher中编写一个yml配置文件,添加mq连接的信息
    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: 8.129.212.155 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: chenyicai
    password: 123456
    virtual-host: /
  3. 在publisher服务中新建一个测试类,编写测试方法发送消息:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() {
    String queueName = "simple.queue";
    String message = "hello, spring amqp!";
    rabbitTemplate.convertAndSend(queueName, message);
    }
    }
    如果此时我们没有接受信息,信息就会存在mq的缓存区里面,如下:
    This is a picture without description
    This is a picture without description
  4. 在consumer中接受消息(consumer中也要创建连接配置文件)
    1
    2
    3
    4
    5
    6
    7
    @Component
    public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
    System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
    }
    }
    现在我们就完成了一个Hello World的例子的实现。
    结果如下:
    This is a picture without description
    消息队列是阅后即焚的,是一个不可逆的过程。
    现在我们看看剩下的几个例子。

4.3、Work Queue 工作队列

工作队列,可以提高消息处理速度,避免队列消息堆积
他的逻辑图如下:
This is a picture without description

现在我们要来实现一个工作队列

  1. 在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
    String queueName = "simple.queue";
    String message = "hello, message__";
    for (int i = 1; i <= 50; i++) {
    rabbitTemplate.convertAndSend(queueName, message + i);
    Thread.sleep(20);
    }
    }
    }
  2. 在consumer服务中添加一个消费者,也监听simple.queue:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Component
    public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
    }
    }
  3. 然后我们修改yml配置文件,添加以下内容
    1
    2
    3
    4
    5
    spring:
    rabbitmq:
    listener:
    simple:
    prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
  4. 然后我们进行测试,结果如下
    This is a picture without description

两个消费者就会交替获取信息,而睡眠20毫秒的用户会比睡眠200毫秒的用户获取更多的信息。

4.4、发布( Publish )、订阅( Subscribe )

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

This is a picture without description

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

4.4.1、发布订阅-Fanout Exchange

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue

具体实现步骤如下:

  1. consumer服务声明ExchangeQueueBinding
    consumer服务常见一个类,添加@Configuration 注解,并声明FanoutExchangeQueue和绑定关系对象Binding
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @Configuration
    public class FanoutConfig {
    // itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
    return new FanoutExchange("itcast.fanout");
    }

    // fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
    return new Queue("fanout.queue1");
    }

    // 绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
    return BindingBuilder
    .bind(fanoutQueue1)
    .to(fanoutExchange);
    }

    // fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
    return new Queue("fanout.queue2");
    }

    // 绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
    return BindingBuilder
    .bind(fanoutQueue2)
    .to(fanoutExchange);
    }
    }
  2. consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1fanout.queue2
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
    System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
    System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    }
    }
  3. publisher服务发送消息到FanoutExchange
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testSendFanoutExchange() {
    // 交换机名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, every one!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
  4. 测试,此时两个消费者都会受到这条消息
    This is a picture without description

4.4.2、发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
    This is a picture without description

具体实现步骤如下:

  1. consumer服务中,编写两个消费者方法,分别监听direct.queue1direct.queue2。并利用@RabbitListener声明ExchangeQueueRoutingKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
  1. publisher服务发送消息到DirectExchange
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "hello, red!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }

测试如下:
This is a picture without description
两个都能收到red的消息,而我们现在换一种发送消息的方式看看,发送一个blue的消息,按照规定direct.queue2应该是没有消息的。
This is a picture without description
确实是这个样子。

4.4.3、发布订阅-TopicExchange

TopicExchangeDirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
QueueExchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

This is a picture without description

实现步骤如下:

  1. consumer服务中,编写两个消费者方法,分别监听topic.queue1topic.queue2
    并利用@RabbitListener声明ExchangeQueueRoutingKey
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
  2. publisher服务发送消息到TopicExchange
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "中国新闻";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }

进行测试,此时两个消费者都能接收到消息
This is a picture without description

如果修改key的值,后面不为news时,topic.queue2将无法收到消息

1
2
3
4
5
6
7
8
9
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "中国新闻,一条";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.new", message);
}

结果如下:
This is a picture without description

4.5、SpringAMQP-消息转换器

SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

我们在consumer中利用@Bean声明一个队列:

1
2
3
4
5
6
7
@Configuration
public class FanoutConfig {
@Bean
public Queue objectQueue(){
return new Queue("object.queue");
}
}

publisher中发送消息以测试:

1
2
3
4
5
6
7
8
9
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "Jack");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}

会发现在消息队列中我们传过去的是一堆乱码
This is a picture without description

其实就是java将我们的消息进行了一个序列化来进行传输,但是这样子十分不具有可读性,而且字符串较长,会影响到传输的效率。

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDKObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

  1. 首先我们需要引入json的依赖
    1
    2
    3
    4
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    </dependency>
  2. publisher服务声明MessageConverter(可以放在启动类上)
    1
    2
    3
    4
    @Bean
    public MessageConverter messageConverter(){
    return new Jackson2JsonMessageConverter();
    }
    然后我们进行测试
    This is a picture without description

此时显示的消息就很有可读性了。

5、总结

以上就是我对RabbirMQ入门学习的一个总结,还有很多的内容需要去学习,以后学到了会继续补充。s