rabbitmq学习ing

一.rabbitmq的几种工作模式

常量工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package org.yh.mode;

/**
* @description:
* @author: yh
* @time: 2020/1/15 11:28
*/
public class MqConstants {
public static final String EXCHANGE_FANOUT = "fanout.exchange.school";
public static final String EXCHANGE_DIRECT = "direct.exchange.school";
public static final String CLASS_ONE_QUEUE = "class.one.queue";
public static final String CLASS_TWO_QUEUE = "class.two.queue";
public static final String STUDENT_A_QUEUE = "student.a.queue";
public static final String STUDENT_B_QUEUE = "student.b.queue";
public static final String ROUTING_A_KEY = "teacher.a.key";
public static final String ROUTING_B_KEY = "teacher.b.key";
public static final String EXCHANGE_TOPIC = "topic.exchange.school";
public static final String STUDENT_C_QUEUE = "student.c.queue";
public static final String STUDENT_D_QUEUE = "student.d.queue";

public static final String IMMEDIATELY_QUEUE = "immediately.test.queue";
public static final String IMMEDIATELY_EXCHANGE = "immediately.test.exchange";
public static final String IMMEDIATELY_ROUTING_KEY = "immediately.test.key";
public static final String DELAY_TEST_EXCHANGE = "dead.test.exchange";
public static final String DELAY_A_QUEUE = "delay.test.queue";
public static final String DELAY_ROUTING_KEY_A = "delay.test.key";
}

1.简单模式

即一个生产者一个消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package org.yh.mode.simple;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @description: 简单模式
* @author: yh
* @time: 2020/1/10 10:10
*/
@Component
@Slf4j
public class SimpleListener {

@RabbitListener(queuesToDeclare = @Queue("simpleMode"))
public void messageListener(String msg) {
JSONObject jsonObject = JSON.parseObject(msg);
log.info("result is {}", jsonObject.get("msg"));
}

}

简单模式-测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package simple;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.yh.App;

/**
* @description:
* @author: yh
* @time: 2020/1/10 10:07
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class Test {
@Autowired
AmqpTemplate amqpTemplate;

@org.junit.Test
public void test() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("name", "tracer");
jsonObject.put("msg", "hello,how are u ");
amqpTemplate.convertAndSend("simpleMode", JSON.toJSONString(jsonObject));
}
}

结果:绑定simpleMode的队列会收到消息

2.工作模式

即一个生产者,多个消费者,每个消息只会被消费一次,消费者负载均衡消费(当a阻塞时,他会去找b消费)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.yh.mode.work;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @description:
* @author: yh
* @time: 2020/1/10 10:35
*/
@Component
@Slf4j
public class WorkListener {

@RabbitListener(queuesToDeclare = @Queue("workMode"))
public void workListenerOne(String msg) {
log.info("work-mode-consumer-a result : {}", msg);
}

@RabbitListener(queuesToDeclare = @Queue("workMode"))
public void workListenerTwo(String msg) {
log.info("work-mode-consumer-b result : {}", msg);
}
}

work模式-测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package work;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.yh.App;

/**
* @description: work模式 一个生产者对应多个消费者(当一个消费者阻塞时 ,另一个会去消费,round-robin负载均衡)
* @author: yh
* @time: 2020/1/10 10:07
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
@Slf4j
public class Test {
@Autowired
AmqpTemplate amqpTemplate;

@org.junit.Test
public void test() throws InterruptedException {
for (int i = 0; i < 5; i++) {
amqpTemplate.convertAndSend("workMode", i);
log.info("测试,{}",i);
Thread.sleep(1000);
}
log.info("over");
}
}

结果:consumer-a和b一共会处理5条消息

3.广播模式fanout

即绑定到此交换机的队列都会收到消息,不管什么路由键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.yh.mode.fanout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import static org.yh.mode.MqContants.*;

/**
* @description:
* @author: yh
* @time: 2020/1/10 14:09
*/
@Component
@Slf4j
public class FanoutListener {


@RabbitListener(bindings = @QueueBinding(value = @Queue(value = CLASS_ONE_QUEUE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_FANOUT, ignoreDeclarationExceptions = "true", type = ExchangeTypes.FANOUT)))
public void listen1(String msg) {
log.info("fanout 模式消息:一班级收到的消息 is {}", msg);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = CLASS_TWO_QUEUE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_FANOUT, ignoreDeclarationExceptions = "true", type = ExchangeTypes.FANOUT),key = "class.key.two"))
public void listen2(String msg) {
log.info("fanout 模式消息:二班级收到的消息 is {}", msg);
}
}

fanout-测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package fanout;

import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.yh.App;

import static org.yh.mode.MqContants.EXCHANGE_FANOUT;

/**
* @description: 广播模式 每一个队列绑定交换机 交换机收到消息后 每个队列均可以收到消息
* @author: yh
* @time: 2020/1/10 14:24
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class Test {
@Autowired
AmqpTemplate amqpTemplate;

@org.junit.Test
public void test() {
//第二个是routing key
amqpTemplate.convertAndSend(EXCHANGE_FANOUT, "", "放假啦");
}
}

结果:所有的绑定此EXCHANGE_FANOUT交换机的队列均会收到消息

4.direct路由模式

即创建交换机时绑定路由键routingkey,生产者发送消息时指定交换机和路由键,只有绑定对应路由键和交换机的队列才会收到消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.yh.mode.direct;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import static org.yh.mode.MqContants.*;

/**
* @description:
* @author: yh
* @time: 2020/1/10 15:24
*/
@Component
@Slf4j
public class DirectListener {


@RabbitListener(bindings = @QueueBinding(value = @Queue(value = STUDENT_A_QUEUE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT, ignoreDeclarationExceptions = "true", type = ExchangeTypes.DIRECT), key = ROUTING_A_KEY))
public void test1(String msg) {
log.info("direct:student-a msg is {}", msg);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = STUDENT_B_QUEUE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT, ignoreDeclarationExceptions = "true", type = ExchangeTypes.DIRECT), key = ROUTING_B_KEY))
public void test2(String msg) {
log.info("direct:student-b msg is {}", msg);
}
}

direct路由模式-测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package direct;

import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.yh.App;

import static org.yh.mode.MqContants.*;

/**
* @description:
* @author: yh
* @time: 2020/1/10 15:24
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class Test {
@Autowired
AmqpTemplate amqpTemplate;


@org.junit.Test
public void test() {
amqpTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_A_KEY,"你们可以回去嗨啦");
}
}

结果:此时只有ROUTING_A_KEY收到消息….

5.topic模式

即direct模式的另一种版本,唯一区别就是路由键模糊匹配规则,“#”匹配一个词或多个词,“*”只匹配一个词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.yh.mode.topic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import static org.yh.mode.MqContants.*;

/**
* @description:
* @author: yh
* @time: 2020/1/10 15:57
*/
@Component
@Slf4j
public class TopicListener {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = STUDENT_C_QUEUE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_TOPIC, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = "teacher.*"))
public void test1(String msg) {
log.info("topic 模式,队列c 收到的信息 {}", msg);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = STUDENT_D_QUEUE, durable = "true"),
exchange = @Exchange(value = EXCHANGE_TOPIC, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC), key = "teacher.#"))
public void test2(String msg) {
log.info("topic 模式 队列d 收到的信息 {}", msg);
}

}

topic模式-测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package topic;

import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.yh.App;
import org.yh.mode.fanout.FanoutListener;
import org.yh.mode.topic.TopicListener;

import static org.yh.mode.MqContants.*;

/**
* @description:
* @author: yh
* @time: 2020/1/10 14:24
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class Test {
@Autowired
AmqpTemplate amqpTemplate;

@org.junit.Test
public void test() {
//第二个是routing key
amqpTemplate.convertAndSend(EXCHANGE_TOPIC, "teacher.xzxcx.a", "放假了");
}
}

结果:只有队列d收到消息,原因是:#多匹配规则,*只能匹配一个单词

二.延迟队列的使用

安装插件-rabbitmq中的插件x-delay-message
下载网址(插件版本要和rabbitmq版本匹配):传送门
安装参考:传送门

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.yh.mode.dlx.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

import static org.yh.mode.MqContants.*;

/**
* @description:
* @author: yh
* @time: 2020/1/15 17:30
*/
@Configuration
public class DlxConfig {
@Bean
public Queue immediatelyQueue() {
return new Queue(IMMEDIATELY_QUEUE, true);
}

@Bean
public Queue delayQueue() {
Map<String, Object> params = new HashMap<>(2);
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", IMMEDIATELY_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", IMMEDIATELY_ROUTING_KEY);
return new Queue(DELAY_A_QUEUE, true, false, false, params);
}

@Bean
public DirectExchange immediatelyExchange() {
return new DirectExchange(IMMEDIATELY_EXCHANGE, true, false);
}

@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DELAY_TEST_EXCHANGE, true, false);
}

@Bean
public Binding immediatelyBinding() {
return BindingBuilder.bind(immediatelyQueue()).to(immediatelyExchange()).with(IMMEDIATELY_ROUTING_KEY);
}

@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(DELAY_ROUTING_KEY_A);
}
}

消息监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package org.yh.mode.dlx.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

import static org.yh.mode.MqContants.IMMEDIATELY_QUEUE;


/**
* @description:
* @author: yh
* @time: 2020/1/15 18:14
*/
@Component
@Slf4j
public class DlxReceiver {

@RabbitListener(queues = IMMEDIATELY_QUEUE)
public void getMsg(String msg) {
log.info("时间{},延迟消息:{}", DateFormatUtils.format(new Date(), "HH:mm:ss"), msg);
}

}

延迟队列测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package delay;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.yh.App;

import java.util.Date;

import static org.yh.mode.MqContants.*;

/**
* @description:
* @author: yh
* @time: 2020/1/15 17:58
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
@Slf4j
public class Test {
@Autowired
AmqpTemplate amqpTemplate;

@org.junit.Test
public void send() {
amqpTemplate.convertAndSend(DELAY_TEST_EXCHANGE, DELAY_ROUTING_KEY_A, "消息:通过交换机和路由键发送到符合的队列",
message -> {
message.getMessageProperties().setExpiration("10000");
log.info("现在时间:{}", DateFormatUtils.format(new Date(), "HH:mm:ss"));
return message;
});
}
}

结果:单位为毫秒,延迟10s。10s后会收到消息

-------------笔者水平有限,若有错漏,欢迎指正!-------------

本文标题:rabbitmq学习ing

文章作者:yh

发布时间:2020年01月15日 - 19:01

最后更新:2020年01月15日 - 19:01

原始链接:https:www.yh0729.cn/20200115/rabbitmq_study.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

坚持原创技术分享,您的支持将鼓励我继续创作!