RabbitMQ
# 基础信息
依赖
<!-- AMPQ,包含RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件总览
spring:
rabbitmq:
host: localhost # 主机名
port: 5672 # 端口号
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机名
listener:
simple:
prefetch: 1 # 预取数量设置为1
# 模型介绍
简单队列 Simplest:消息发送到队列后,消费者接收并消费消息,生产者消费者一对一,每条消息仅被消费一次
工作队列 WorkQueue:消息发送到队列后,多个消费者接收并消费消息,生产者消费者一对多,每条消息仅被消费一次
- 默认消费者优先取出消息,然后慢慢消费,不利于负载均衡的平衡时间,可以调节预取数量
扇形交换机 Fanout:绑定 Fanout 交换机的队列,每个消费者都会消费到所有消息
直连交换机 Direct:为每个绑定到 Direct 交换机的队列,设置一个或多个 key,通过 key 实现精准消息发布
主题交换机 Topic:效果同 Direct 但功能更强大,routingKey 支持以
.
分割为多级别,并支持通配符*
表示单个级别通配符,#
表示零个或多个级别通配符,使用示例:
- **china.beijing.news:**代表中国北京的新闻
- **china.beijing.weather:**代表中国北京的天气
- china.shenzhen.news:代表中国深圳的新闻
- **china.shenzhen.weather:**代表中国深圳的天气
- china.*.news:代表中国所有城市的新闻
- china.#:代表中国所有城市的所有内容
# 代码示例
生产消费模型 代码示例
// 发送消息
public class SpringAmqpTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void sendQueue() {
String queueName = "queue1";
String message = "Hello AMQP for Spring";
rabbitTemplate.convertAndSend(queueName, message);
}
}
// 接收消息
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "queue1")
public void listenerSimpleQueue(String message) {
System.out.println("接收到队列消息:【" + message + "】");
}
}
发布订阅模型 交换机绑定 代码示例
绑定关系配置类一般都声明在消费者端
// 配置类声明
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcast.fanout");
}
// 声明队列
@Bean
public Queue fanoutQueue() {
return new Queue("fanout.queue");
}
// 添加交换机与队列的绑定关系
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
// return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
// return BindingBuilder.bind(queue).to(topicExchange).with(routingKey);
}
}
// 注解声明
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(type = ExchangeTypes.FANOUT),
value = @Queue(name = "fanout.queue")
))
public void fanoutQueue(String msg) {
System.out.println("fanout.queue 接收到队列消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(type = ExchangeTypes.DIRECT), // 默认值
value = @Queue(name = "direct.queue"),
key = {"vip"}
))
public void directQueue(String msg) {
System.out.println("direct.queue 接收到队列消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(type = ExchangeTypes.TOPIC),
value = @Queue(name = "topic.queue"),
key = {"china.beijing.news"} // china.*.news china.#
))
public void topicQueue(String msg) {
System.out.println("topic.queue 接收到队列消息:【" + msg + "】");
}
消息转换器
将发布订阅的 Java 对象序列化为 JSON 数据进行传输
@Bean
public MessageConverter messageConverter() {
// jackson提供的通用消息转换器
return new Jackson2JsonMessageConverter();
}
上次更新: 2024-03-22, 00:04:16