RabbitMQ基础教程之Spring使用篇 相关博文,推荐查看:
RabbitMq基础教程之安装与测试
RabbitMq基础教程之基本概念
RabbitMQ基础教程之基本使用篇
RabbitMQ基础教程之使用进阶篇
RabbitMQ基础教程之Spring&JavaConfig使用篇
在前面的一篇演示了如何使用Spring来进行RabbitMQ的消息投递和消费,虽然可以实现基本的需求场景,但是使用起来却并不是特别顺手,首先是不同的消费者,得添加好多不同的配置项,加上有较多的配置(QueueName, ExchangeName, RoutingKey, autoAck…)
那么有没有可能借助工厂方式,来简化消费者这边的大多数配置呢?
I. 工厂类定义消费者信息 目标比较清晰了,我们希望有一个工厂类,可以承载所有的关心的配置信息,然后在实际使用的地方,通过这个工厂类生成一个Consumer即可
1. 消费接口定义 首先需要定义一个公共的消费者接口,主要用来接收并处理消息
1 2 3 4 5 6 7 public interface IMqConsumer extends ChannelAwareMessageListener { void setContainer (SimpleMessageListenerContainer container) ; default void shutdown () {} }
对于ChannelAwareMessageListener
前面就以及用到,当有消息后,触发的监听器,这里我们增加了两个方法,其实主要就是干一件事情,优雅的关闭消费
当应用需要停止或者重启时,我们希望先优雅的关闭消息消费,那么就会用到 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#stop()
因此针对这个功能,可以实现一个公共的抽象类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public abstract class AbsMQConsumer implements IMqConsumer { private volatile boolean end = false ; private SimpleMessageListenerContainer container; private boolean autoAck; public void setContainer (SimpleMessageListenerContainer container) { this .container = container; autoAck = container.getAcknowledgeMode().isAutoAck(); } public void shutdown () { end = true ; } protected void autoAck (Message message, Channel channel, boolean success) throws IOException { if (autoAck) { return ; } if (success) { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } else { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , true ); } } public void onMessage (Message message, Channel channel) throws Exception { try { autoAck(message, channel, process(message, channel)); } catch (Exception e) { autoAck(message, channel, false ); throw e; } finally { if (end) { container.stop(); } } } public abstract boolean process (Message message, Channel channel) ; }
上面的实现中,前面两个方法比较清晰,没有什么二意,需要关注的是onMessage
方法的实现,我们默认封装了ack的逻辑,设计思路如下:
当开启了手动ack之后,要求实际消费方实现 process
方法,并返回boolean,表示是否消费成功
消费成功,则ack
消费失败,则将消息重新丢回到队列
若开启自动ack,则不需要关注
每次消费一条消息之后,需要关注下是否关闭这个状态,从而实现mq的停止消费
所以每个实际消费者,实现这个抽象类的 process
方法即可,在内部实现自己的消息消费逻辑
2. 工厂类 前面主要定义了消费的实体可以怎么玩,接下来就是重头戏了,如何声明队列,如何绑定交换器等,如何注册消息监听器(即上面的Consumer)?
根据前面的实现,我们需要关注的几个参数依然是下面几个:
1 2 3 4 5 6 7 8 9 10 11 12 private String exchange;private String queue;private String routingKey;private Boolean autoDeleted;private Boolean durable;private Boolean autoAck;private ConnectionFactory connectionFactory;private RabbitAdmin rabbitAdmin;
我们最终的目标就是给每个Consumer创建一个SimpleMessageListenerContainer
的Bean交给Spring来托管,所以可以利用Spring的FactoryBean来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 @Data @Builder public class MQContainerFactory implements FactoryBean <SimpleMessageListenerContainer > { private ExchangeType exchangeType; private String directExchange; private String topicExchange; private String fanoutExchange; private String queue; private String routingKey; private Boolean autoDeleted; private Boolean durable; private Boolean autoAck; private Integer concurrentNum; private ConnectionFactory connectionFactory; private RabbitAdmin rabbitAdmin; private IMqConsumer consumer; private Exchange buildExchange () { if (directExchange != null ) { exchangeType = ExchangeType.DIRECT; return new DirectExchange(directExchange); } else if (topicExchange != null ) { exchangeType = ExchangeType.TOPIC; return new TopicExchange(topicExchange); } else if (fanoutExchange != null ) { exchangeType = ExchangeType.FANOUT; return new FanoutExchange(fanoutExchange); } else { if (StringUtils.isEmpty(routingKey)) { throw new IllegalArgumentException("defaultExchange's routingKey should not be null!" ); } exchangeType = ExchangeType.DEFAULT; return new DirectExchange("" ); } } private Queue buildQueue () { if (StringUtils.isEmpty(queue)) { throw new IllegalArgumentException("queue name should not be null!" ); } return new Queue(queue, durable == null ? false : durable, false , autoDeleted == null ? true : autoDeleted); } private Binding bind (Queue queue, Exchange exchange) { return exchangeType.binding(queue, exchange, routingKey); } private void check () { if (rabbitAdmin == null || connectionFactory == null ) { throw new IllegalArgumentException("rabbitAdmin and connectionFactory should not be null!" ); } if (consumer == null ) { throw new IllegalArgumentException("rabbit msg consumer should not be null!" ); } } @Override public SimpleMessageListenerContainer getObject () throws Exception { check(); Queue queue = buildQueue(); Exchange exchange = buildExchange(); Binding binding = bind(queue, exchange); rabbitAdmin.declareQueue(queue); rabbitAdmin.declareExchange(exchange); rabbitAdmin.declareBinding(binding); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setRabbitAdmin(rabbitAdmin); container.setConnectionFactory(connectionFactory); container.setQueues(queue); container.setPrefetchCount(20 ); container.setConcurrentConsumers(concurrentNum == null ? 1 : concurrentNum); container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL); container.setMessageListener(consumer); consumer.setContainer(container); return container; } @Override public Class<?> getObjectType() { return SimpleMessageListenerContainer.class ; } }
具体的实现代码如上,接下来进行分块分析,首先是Exchange, 我们直到常用的有三种 Exchange:
DirectExchange
TopicExchange
FanoutExchange
因此,我们自定义了一个枚举,来实现不同的Exchange的绑定姿势,注意下面的实现姿势,利用了抽象类的思路
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public enum ExchangeType { DIRECT { @Override public Binding binding (Queue queue, Exchange exchange, String routingKey) { return BindingBuilder.bind(queue).to((DirectExchange) exchange).with(routingKey); } }, TOPIC { @Override public Binding binding (Queue queue, Exchange exchange, String routingKey) { return BindingBuilder.bind(queue).to((TopicExchange) exchange).with(routingKey); } }, FANOUT { @Override public Binding binding (Queue queue, Exchange exchange, String routingKey) { return BindingBuilder.bind(queue).to((FanoutExchange) exchange); } }, DEFAULT { @Override public Binding binding (Queue queue, Exchange exchange, String routingKey) { return BindingBuilder.bind(queue).to((DirectExchange) exchange).with(queue.getName()); } }; public abstract Binding binding (Queue queue, Exchange exchange, String routingKey) ; }
剩下的就是 com.git.hui.rabbit.spring.component.MQContainerFactory#getObject
的逻辑了,基本上和前面的思路一样
定义queue
定义exchange
创建绑定
创建SimpleMessageListenerContainer
,设置各种参数
3. 配置类 不可避免的需要一些配置,如何RabbitMQ的连接工厂,RabbitAmdin,这些是可以作为多个Consumer的公共Bean来使用的,因此就放在了配置类中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public class FacSpringConfig { @Bean public ConnectionFactory connectionFactory () { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost("127.0.0.1" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); factory.setVirtualHost("/" ); return factory; } @Bean public RabbitAdmin rabbitAdmin (ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
II. 测试验证 从代码实现角度来看,就几个类,还是比较简单的,接下来就看实际使用的姿势,是不是变简单了
新建一个消费类
1 2 3 4 5 6 7 8 public class FacMQConsumer extends AbsMQConsumer { @Override public boolean process (Message message, Channel channel) { String data = new String(message.getBody()); System.out.println(" fac mq consumer: " + data); return true ; } }
然后定义这个消费类的配置信息,主要是两个Bean的定义,一个是定义上面的FactoryBean,内部通过Builder模式设置了各种参数(借助lombok实现);另外一个就是获取SimpleMessageListenerContainer
容器了
1 2 3 4 5 6 7 8 9 10 11 12 13 @Bean public MQContainerFactory mqContainerFactory (ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) { return MQContainerFactory.builder().queue("fac.direct" ).directExchange("fac.direct.exchange" ).durable(true ) .autoDeleted(false ).autoAck(false ).connectionFactory(connectionFactory).rabbitAdmin(rabbitAdmin) .routingKey("fac-routing" ).consumer(new FacMQConsumer()).build(); } @Bean public SimpleMessageListenerContainer facContainer (ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) throws Exception { MQContainerFactory fac = mqContainerFactory(connectionFactory, rabbitAdmin); return fac.getObject(); }
对应的测试类可以如下实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @RunWith (SpringJUnit4ClassRunner.class ) @ContextConfiguration (classes = FacSpringConfig.class ) public class FactoryComponentUnit { @Autowired private AmqpProducer amqpProducer; @Test public void testDirectConsumer () throws InterruptedException { String[] routingKey = new String[]{"hello.world" , "fac-routing" , "test1" }; for (int i = 0 ; i < 10 ; i++) { amqpProducer.publishMsg("fac.direct.exchange" , routingKey[i % 3 ], ">>> hello " + routingKey[i % 3 ] + ">>> " + i); } System.out.println("-------over---------" ); Thread.sleep(1000 * 60 * 10 ); } }
然后就可以愉快的玩耍了
III. 其他 项目地址
一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
声明 尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
扫描关注