一.rabbitmq的几种工作模式
常量工具类
1 | package org.yh.mode; |
1.简单模式
即一个生产者一个消费者
1 | package org.yh.mode.simple; |
简单模式-测试类
1 | package simple; |
结果:绑定simpleMode的队列会收到消息
2.工作模式
即一个生产者,多个消费者,每个消息只会被消费一次,消费者负载均衡消费(当a阻塞时,他会去找b消费)。
1 | package org.yh.mode.work; |
work模式-测试
1 | package work; |
结果:consumer-a和b一共会处理5条消息
3.广播模式fanout
即绑定到此交换机的队列都会收到消息,不管什么路由键。
1 | package org.yh.mode.fanout; |
fanout-测试
1 | package fanout; |
结果:所有的绑定此EXCHANGE_FANOUT交换机的队列均会收到消息
4.direct路由模式
即创建交换机时绑定路由键routingkey,生产者发送消息时指定交换机和路由键,只有绑定对应路由键和交换机的队列才会收到消息
1 | package org.yh.mode.direct; |
direct路由模式-测试
1 | package direct; |
结果:此时只有ROUTING_A_KEY收到消息….
5.topic模式
即direct模式的另一种版本,唯一区别就是路由键模糊匹配规则,“#”匹配一个词或多个词,“*”只匹配一个词。
1 | package org.yh.mode.topic; |
topic模式-测试1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30package topic;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.yh.App;
import org.yh.mode.fanout.FanoutListener;
import org.yh.mode.topic.TopicListener;
import static org.yh.mode.MqContants.*;
/**
* @description:
* @author: yh
* @time: 2020/1/10 14:24
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class Test {
@Autowired
AmqpTemplate amqpTemplate;
@org.junit.Test
public void test() {
//第二个是routing key
amqpTemplate.convertAndSend(EXCHANGE_TOPIC, "teacher.xzxcx.a", "放假了");
}
}
结果:只有队列d收到消息,原因是:#多匹配规则,*只能匹配一个单词
二.延迟队列的使用
安装插件-rabbitmq中的插件x-delay-message
下载网址(插件版本要和rabbitmq版本匹配):传送门
安装参考:传送门
配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56package org.yh.mode.dlx.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import static org.yh.mode.MqContants.*;
/**
* @description:
* @author: yh
* @time: 2020/1/15 17:30
*/
@Configuration
public class DlxConfig {
@Bean
public Queue immediatelyQueue() {
return new Queue(IMMEDIATELY_QUEUE, true);
}
@Bean
public Queue delayQueue() {
Map<String, Object> params = new HashMap<>(2);
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", IMMEDIATELY_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", IMMEDIATELY_ROUTING_KEY);
return new Queue(DELAY_A_QUEUE, true, false, false, params);
}
@Bean
public DirectExchange immediatelyExchange() {
return new DirectExchange(IMMEDIATELY_EXCHANGE, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DELAY_TEST_EXCHANGE, true, false);
}
@Bean
public Binding immediatelyBinding() {
return BindingBuilder.bind(immediatelyQueue()).to(immediatelyExchange()).with(IMMEDIATELY_ROUTING_KEY);
}
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(DELAY_ROUTING_KEY_A);
}
}
消息监听
1 | package org.yh.mode.dlx.config; |
延迟队列测试
1 | package delay; |
结果:单位为毫秒,延迟10s。10s后会收到消息