知识杂货铺 知识杂货铺
首页
后端(1本书)
  • 主题初衷与诞生
  • 介绍
  • 快速上手
  • 目录结构
  • 核心配置和约定
  • 自动生成front matter
  • Markdown 容器
  • Markdown 中使用组件
  • 相关文章

    • 使目录栏支持h2~h6标题
    • 如何让你的笔记更有表现力
    • 批量操作front matter工具
    • 部署
    • 关于写文章和H1标题
    • 关于博客搭建与管理
    • 在线编辑和新增文章的方法
  • 主题配置
  • 首页配置
  • front matter配置
  • 目录页配置
  • 添加摘要
  • 修改主题颜色和样式
  • 评论栏
  • 快速开始
  • 代码集成_TODO
  • 框架初探
  • 在GitHub上贡献代码
  • 使用K8s部署系统
  • Seata分布式事务
GitHub (opens new window)

Kevin Zhang

爱凑热闹的高龄程序猿
首页
后端(1本书)
  • 主题初衷与诞生
  • 介绍
  • 快速上手
  • 目录结构
  • 核心配置和约定
  • 自动生成front matter
  • Markdown 容器
  • Markdown 中使用组件
  • 相关文章

    • 使目录栏支持h2~h6标题
    • 如何让你的笔记更有表现力
    • 批量操作front matter工具
    • 部署
    • 关于写文章和H1标题
    • 关于博客搭建与管理
    • 在线编辑和新增文章的方法
  • 主题配置
  • 首页配置
  • front matter配置
  • 目录页配置
  • 添加摘要
  • 修改主题颜色和样式
  • 评论栏
  • 快速开始
  • 代码集成_TODO
  • 框架初探
  • 在GitHub上贡献代码
  • 使用K8s部署系统
  • Seata分布式事务
GitHub (opens new window)
  • Spring Boot 培训教程
  • Spring Boot介绍

  • 开发环境配置

  • 原理剖析

  • Web开发

  • 数据访问

  • 事务

  • 集成Redis

  • 集成MongoDB

  • 异步消息

    • 消息队列
    • MQ介绍
    • 集成ActiveMQ
    • 集成RabbitMQ
      • 9.3 Spring Boot 集成 RabbitMQ
        • 9.3.1 安装配置
        • 9.3.1.1 安装 Erlang
        • 9.3.1.2 安装 RabbitMQ
        • 9.3.2 与 Spring Boot 集成
        • 9.3.2.1 一对一模式
        • 9.3.2.2 一对多模式
        • 9.3.2.3 ACK 消息确认
    • 集成RocketMQ
    • 集成Kafka
    • 课后作业
  • 异常处理

  • 单元测试与热部署

  • 安全控制

  • 应用监控

  • 企业级开发

  • 多环境配置与部署

  • 综合示例

  • 前后端分离的vue急速入门

  • Spring Boot配置大全

  • 在Docker中部署Spring Boot应用

  • 开发前后端分离应用

  • 前进到Spring Cloud

  • 规则引擎

  • 流程引擎

  • 后记
  • 后端
  • 异步消息
Kevin Zhang
2024-10-30
目录

集成RabbitMQ

# 9.3 Spring Boot 集成 RabbitMQ

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息中间件,基于 erlang 开发,并发能力很强,性能极好,延时很低,是当前大量部署使用的消息中间件。

image-20191209102156424

RabbitMQ 发送消息时,是先把消息发送给 Exchange(交换器),然后再分发给有相应 RoutingKey(路由键)关系的 Queue(队列)。

在 RabbitMQ 中消息交换器有四种:

  1. Direct模式:消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配。
  2. Topic模式:主题交换器通过模式匹配消息的路由键属性,然后将消息分配到绑定到该模式的队列。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。该交换器会识别两个通配符:“#”和“*”。#匹配0个或多个单词,*匹配一个单词。
  3. Fanout模式:每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上(就像广播一样)。fanout 类型转发消息是最快的。
  4. Header模式: 和主题交换器有点相似,头交换器的路由值基于消息的 header 数据。主题交换器路由键只有是字符串,而头交换器可以是整型和哈希值。

Headers 交换器允许你匹配 AMQP 消息的 header 而非路由键,除此之外 headers 交换器和 direct 交换器完全一致,但性能却很差,几乎用不到。

# 9.3.1 安装配置

到 RabbitMQ官网 (opens new window) 下载最新版 rabbitmq-server-3.8.2.exe (opens new window)。

安装运行 RabbitMQ 需要 64 位的 Erlang,针对 3.8.2 版本的 RabbitMQ 官方推荐 Erlang 22.1 版本。

到 Erlang官网 (opens new window) 下载 otp_win64_22.1.exe (opens new window)。

# 9.3.1.1 安装 Erlang

使用默认配置安装 Erlang。

image-20191210175929079

# 9.3.1.2 安装 RabbitMQ

使用默认配置安装 RabbitMQ。

image-20191212015556148

RabbitMQ 提供了 Web 管理界面。

打开命令提示符窗口,进入C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin目录,执行rabbitmq-plugins.bat enable rabbitmq_management 开启 Web 管理插件。

Microsoft Windows [版本 10.0.18362.476]
(c) 2019 Microsoft Corporation。保留所有权利。

C:\Users\Kevin>cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@NOTEBOOK-KEVIN:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@NOTEBOOK-KEVIN...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.2\sbin>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

打开浏览器访问 http://localhost:15672/ (opens new window) ,使用guest用户,密码guest登录管理控制台。

image-20191212015951847

# 9.3.2 与 Spring Boot 集成

新建 Spring Boot 项目,选择 Spring for RabbitMQ+Spring Web 启动器依赖。

image-20191210183720495

pom 文件的主要依赖为spring-boot-starter-amqp。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
1
2
3
4
5
6
7
8

在配置文件 application.yml 中添加 RabbitMQ 连接等配置信息。

spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
# 开启发送确认
    publisher-confirm-type: correlated
# 开启发送失败退回
    publisher-returns: true
# 开启ACK
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 9.3.2.1 一对一模式

一个生产者对一个消费者模式,生产者将消息投送到队列中,消费者从队列中消费消息,然后就结束了。

创建配置类 RabbitMQConfig,在其中配置消息队列kevin。

package com.example.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfig {

    @Bean
    public Queue kevinQueue() {
        return new Queue("kevin");
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

创建消息生产者 RabbitMQProducer,使用注入的 AmqpTemplate 对象向消息队列kevin发送消息。

package com.example.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQProducer {
	
	@Autowired 
	AmqpTemplate amqpTemplate;
	
	public void sendMessage(String msg) {
		amqpTemplate.convertAndSend("kevin", msg);
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

创建消息消费者 RabbitMQConsumer,监听kevin消息队列的消息,并在管理控制台上输出信息。

package com.example.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "kevin")
public class RabbitMQConsumer {
	
	@RabbitHandler
    public void receive(String msg) {
		System.out.println("RabbitMQ Consumer consume message: " + msg);
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

创建与用户交互的控制器类 RabbitMQController,在/rabbitmq/send路径上接收用户输入。

package com.example.rabbitmq.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.example.rabbitmq.producer.RabbitMQProducer;

@RestController
@RequestMapping("/rabbitmq/")
public class RabbitMQController {
	
	@Autowired
	RabbitMQProducer rabbitMQProducer;
	
	@RequestMapping("/send")
	public String sendMsg(String msg) {
		rabbitMQProducer.sendMessage(msg);
		return msg + " Sended to kevin.";
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

运行 Spring Boot 应用,打开浏览器访问 [http://localhost:8080/rabbitmq/send?msg=Kevin is a GOODMAN.](http://localhost:8080/rabbitmq/send?msg=Kevin is a GOODMAN.) ,向 RabbitMQ 发送消息。

image-20191212023410382

RabbitMQ 消费者(RabbitMQConsumer 类)消费消息,并在控制台打印信息。

image-20191212023624170

通过 管理控制台 (opens new window),可以看到消息队列的相关情况。

image-20191212024813785

# 9.3.2.2 一对多模式

一个生产者对多个消费者,该模式下可以是一个生产者将消息投递到一个队列,该队列对应多个消费者,此时每条消息只会被消费一次,多个消费者循环处理。另外也可以是一个生产者将消息投递到多个队列里,此时消息是被复制处理。

在配置类com.example.rabbitmq.config.RabbitMQConfig中添加一个新的消息队列 roy。

@Bean
public Queue royQueue() {
    return new Queue("roy");
}
1
2
3
4

创建消息生产者 RabbitMQProducerMore,使用注入的 AmqpTemplate 对象向消息队列roy发送消息。

package com.example.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQProducerMore {
	
	@Autowired 
	AmqpTemplate amqpTemplate;
	
	public void sendMessage(String msg) {
		amqpTemplate.convertAndSend("roy", msg);
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

创建消息消费者 RabbitMQConsumer1,监听roy消息队列的消息,并在管理控制台上输出信息。

package com.example.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "roy")
public class RabbitMQConsumer1 {
	
	@RabbitHandler
    public void receive(String msg) {
		System.out.println("RabbitMQ Consumer1 consume message: " + msg);
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

创建消息消费者 RabbitMQConsumer2,监听roy消息队列的消息,并在管理控制台上输出信息。

package com.example.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "roy")
public class RabbitMQConsumer2 {
	
	@RabbitHandler
    public void receive(String msg) {
		System.out.println("RabbitMQ Consumer2 consume message: " + msg);
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

在控制器类 RabbitMQController 中,添加/rabbitmq/sendMore路径上接收用户输入,并调用 RabbitMQProducerMore(消息生产者类)中的 sendMessage 方法循环发送5条消息到 roy 消息队列上。

@Autowired
RabbitMQProducerMore rabbitMQProducerMore;

@RequestMapping("/sendMore")
public String sendMoreMsg(String msg) {
    for (int i = 0; i < 5; i++) {
        rabbitMQProducerMore.sendMessage(msg + "No." + i);
    }
    return msg + " Sended 5 Messages to roy.";
}
1
2
3
4
5
6
7
8
9
10

运行 Spring Boot 应用,打开浏览器访问 [http://localhost:8080/rabbitmq/sendMore?msg=Roy is a GOODMAN.](http://localhost:8080/rabbitmq/sendMore?msg=Roy is a GOODMAN.),向 RabbitMQ 发送消息。

image-20191212231517513

RabbitMQ 消费者(RabbitMQConsumer1 类和 RabbitMQConsumer2 类)轮流消费消息,并在控制台打印信息。

image-20191212231456659

通过 管理控制台 (opens new window),可以看到消息队列的相关情况。

image-20191212231735588

一个生产者将消息投递到多个队列里的场景,请读者自行完成练习。

# 9.3.2.3 ACK 消息确认

消息确认 ACK 机制,是指消费者从 RabbitMQ 收到消息并处理完成后,反馈给 RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除的一种保证消息能被正确消费的一种处理方法。

如果一个消费者在处理消息的过程中出现了网络不稳定、服务器异常等现象,那么就不会有 ACK 反馈,RabbitMQ 会认为这个消息没有被正常消费,会将消息重新放入队列中。

ACK:acknowledgement,确认。

在配置类com.example.rabbitmq.config.RabbitMQConfig中添加一个新的消息队列lily,并绑定到一个名为“fanoutExchange”的 Fanout 交换器上。

@Bean
public Queue ackQueue() {
    return new Queue("lily");
}

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
}

@Bean
Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(ackQueue).to(fanoutExchange);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

创建消息生产者 RabbitMQProducerAck,使用注入的 RabbitTemplate 对象向消息队列lily发送消息,并编写对应的回调实现。

package com.example.rabbitmq.producer;

import java.time.LocalDateTime;
import java.time.ZoneId;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQProducerAck implements RabbitTemplate.ReturnCallback {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Override
	public void returnedMessage(Message message, int i, String s, String s1, String s2) {
		System.out.println("RabbitMQProducerAck Returned Message: " + message.toString() + " , " + i + " , " + s1 + " , " + s2);
	}

	public void sendMessage(String msg) {
		rabbitTemplate.setReturnCallback(this);
		rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
			if (ack) {
				System.out.println("Send Message Success.");
			} else {
				System.out.println("Send Message Failure:" + cause + correlationData.toString());
			}
		});
		rabbitTemplate.convertAndSend("lily", msg + ", " + LocalDateTime.now(ZoneId.systemDefault()));
	}
	
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

创建消息消费者 RabbitMQConsumerAck,监听lily消息队列的消息,消费消息并确认(channel.basicAck),然后在管理控制台上输出信息。

package com.example.rabbitmq.consumer;

import java.time.LocalDateTime;
import java.time.ZoneId;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
@RabbitListener(queues = {"lily"})
public class RabbitMQConsumerAck {

    @RabbitHandler
    public void process(String sendMsg, Channel channel, Message message) {
    	System.out.println("RabbitMQ ConsumerAck consume message: " + sendMsg + " @" + LocalDateTime.now(ZoneId.systemDefault()));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("Process Success");
        } catch (Exception e) {
            System.out.println("Process Failure.");
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

在控制器类 RabbitMQController 中,添加/rabbitmq/sendAck路径,接收用户输入。

@Autowired
RabbitMQProducerAck rabbitMQProducerAck;

@RequestMapping("/sendAck")
public String sendAckMsg(String msg) {
    rabbitMQProducerAck.sendMessage(msg);
    return msg + " Sended to lily.";
}
1
2
3
4
5
6
7
8

运行 Spring Boot 应用,打开浏览器访问 [http://localhost:8080/rabbitmq/sendAck?msg=Lily is a GOODMAN.](http://localhost:8080/rabbitmq/sendAck?msg=Lily is a GOODMAN.),向 RabbitMQ 发送消息。

image-20191213000914694

RabbitMQ 消费者 RabbitMQConsumerAck 消费消息,然后告诉服务器收到的这条消息已经被当前消费者(RabbitMQConsumerAck)消费了,消息服务器就可以删除这条消息了。

RabbitMQ 生产者 RabbitMQProducerAck 收到确认回调信息后,在控制台输出Send Message Success.的信息。

请仔细查看在控制台打印的信息并结合上述代码理解消息生产、消费、确认的过程。

image-20191213001003779

RabbitMQ ConsumerAck consume message: Lily is a GOODMAN., 2019-12-13T00:07:51.750 @2019-12-13T00:07:51.782:根据代码,前面的时间为消息的生产时间,后面的时间为消息的消费时间。

通过 管理控制台 (opens new window),可以看到消息队列的相关情况。

image-20191213001141397

kevin 队列有1条消息没有确认,roy 队列有5条消息没有确认,所以这次 Spring Boot 应用启动的时候还能看到RabbitMQ Consumer1 consume message: Roy is a GOODMAN.No.3之类的信息就是因为没有确认消息已被消费(服务器没有收到确认就不会删除消息,下次连接到队列,就会继续发送消息给消费者)。

lily 队列 Unacked 消息数量为 0,是因为消息已经确认消费了。

本小节示例项目代码:

https://github.com/gyzhang/SpringBootCourseCode/tree/master/spring-boot-rabbitmq (opens new window)

编辑 (opens new window)
上次更新: 2024/11/17, 16:29:23
集成ActiveMQ
集成RocketMQ

← 集成ActiveMQ 集成RocketMQ→

最近更新
01
PNG图片处理C++
02-07
02
PNG图片处理
01-24
03
离线安装Docker
12-24
更多文章>
Theme by Vdoing | Copyright © 2008-2025 Kevin Zhang | MIT License | 蜀ICP备20013663号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式