RabbitMQ
RabbitMQ 环境搭建建议参照Docker部署
# 权限控制
用户管理
# 查看用户列表
$ rabbitmqctl list_users
# 添加新用户
$ rabbitmqctl add_user <username> <password>
# 修改用户密码
$ rabbitmqctl change_password <username> <password>
# 删除用户
$ rabbitmqctl delete_user <username>
# 为用户分配角色,可分配多个角色,administrator为超管,后面留空可清除所有角色
$ rabbitmqctl set_user_tags <username> <tag> <tag> ...
虚拟机管理
每个虚拟机之间数据都是相互隔离的,可以为每个项目创建一个虚拟机
# 查看虚拟机列表
$ rabbitmqctl list_vhosts
# 创建虚拟机
$ rabbitmqctl add_vhost <vhost_name>
# 删除虚拟机
$ rabbitmqctl delete_vhost <vhost_name>
虚拟机授权
# 查看用户都有哪些虚拟机权限
$ rabbitmqctl list_user_permissions <username>
# 查看有哪些用户拥有该虚拟机权限
$ rabbitmqctl list_permissions -p <vhost_name>
# 为用户分配虚拟机权限,后面三个分别是[配置权限、发布权限、消费权限],.* 代表全部权限
$ rabbitmqctl set_permissions <vhost_name> <username> ".*" ".*" ".*"
# 清除用户权限
$ rabbitmqctl clear_permissions -p <vhost_name> <username>
# 基础信息
依赖
<!-- AMPQ,包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件总览
spring:
rabbitmq:
host: localhost # 主机名
port: 5672 # 端口号
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机名
listener:
simple:
prefetch: 1 # 预取数量设置为1
# 模型介绍
简单队列 Simplest:消息发送到队列后,消费者接收并消费消息,生产者消费者一对一,每条消息仅被消费一次
工作队列 WorkQueue:消息发送到队列后,多个消费者接收并消费消息,生产者消费者一对多,每条消息仅被消费一次
- 默认消费者优先取出消息,然后慢慢消费,不利于负载均衡的平衡时间,可以调节预取数量
扇形交换机 Fanout:绑定 Fanout 交换机的队列,每个消费者都会消费到所有消息
直连交换机 Direct:为每个绑定到 Direct 交换机的队列,设置一个或多个 key,通过 key 实现精准消息发布
主题交换机 Topic:效果同 Direct 但功能更强大,routingKey 支持以
.分割为多级别,并支持通配符*表示单个级别通配符,#表示零个或多个级别通配符,使用示例:
- **china.beijing.news:**代表中国北京的新闻
- **china.beijing.weather:**代表中国北京的天气
- china.shenzhen.news:代表中国深圳的新闻
- **china.shenzhen.weather:**代表中国深圳的天气
- china.*.news:代表中国所有城市的新闻
- china.#:代表中国所有城市的所有内容
# 代码示例
生产消费模型 代码示例
// 发送消息
public class SpringAmqpTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void sendQueue() {
String queueName = "queue1";
String message = "Hello AMQP for Spring";
rabbitTemplate.convertAndSend(queueName, message);
}
}
// 接收消息
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "queue1")
public void listenerSimpleQueue(String message) {
System.out.println("接收到队列消息:【" + message + "】");
}
}
发布订阅模型 交换机绑定 代码示例
绑定关系配置类一般都声明在消费者端
// 配置类声明
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcast.fanout");
}
// 声明队列
@Bean
public Queue fanoutQueue() {
return new Queue("fanout.queue");
}
// 添加交换机与队列的绑定关系
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
// return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
// return BindingBuilder.bind(queue).to(topicExchange).with(routingKey);
}
}
// 注解声明
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(type = ExchangeTypes.FANOUT),
value = @Queue(name = "fanout.queue")
))
public void fanoutQueue(String msg) {
System.out.println("fanout.queue 接收到队列消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(type = ExchangeTypes.DIRECT), // 默认值
value = @Queue(name = "direct.queue"),
key = {"vip"}
))
public void directQueue(String msg) {
System.out.println("direct.queue 接收到队列消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(type = ExchangeTypes.TOPIC),
value = @Queue(name = "topic.queue"),
key = {"china.beijing.news"} // china.*.news china.#
))
public void topicQueue(String msg) {
System.out.println("topic.queue 接收到队列消息:【" + msg + "】");
}
消息转换器
将发布订阅的 Java 对象序列化为 JSON 数据进行传输
@Bean
public MessageConverter messageConverter() {
// jackson提供的通用消息转换器
return new Jackson2JsonMessageConverter();
}
上次更新: 2025-09-19, 17:53:05