Spring Boot对RabbitMQ进行了很好的支持,今天简单的从源码层面分析RabbitMQ推送消息后如何找到我们监听方法的,以及Spring Cloud Stream中如何找到的。
首先引入pom文件,版本基于spring boot 2.2.7。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>配置类RabbitmqConfig
@Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_email_test"; public static final String QUEUE_INFORM_SMS = "queue_sms_test"; public static final String QUEUE_DEAD="queue_dead"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_test"; public static final String EXCHANGE_DIRECT_INFORM="exchange_direct_test"; private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; /** * 交换机配置 * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 * @return the exchange */ @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM() { //durable(true)持久化,消息队列重启后交换机仍然存在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } /** * 交换机配置 * ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置 * @return the exchange */ @Bean(EXCHANGE_DIRECT_INFORM) public Exchange EXCHANGE_DIRECT_INFORM() { //durable(true)持久化,消息队列重启后交换机仍然存在 return ExchangeBuilder.directExchange(EXCHANGE_DIRECT_INFORM).durable(true).build(); } //声明队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 声明 死信交换机 args.put(DEAD_LETTER_QUEUE_KEY, EXCHANGE_DIRECT_INFORM); // x-dead-letter-routing-key 声明 死信路由键 args.put(DEAD_LETTER_ROUTING_KEY, "queue_dead"); // Queue queue = new Queue(QUEUE_INFORM_SMS,true,false,false,args); Queue queue = QueueBuilder.durable(QUEUE_INFORM_SMS).withArguments(args).build(); return queue; } //声明队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL() { Queue queue = new Queue(QUEUE_INFORM_EMAIL); return queue; } //声明队列 @Bean(QUEUE_DEAD) public Queue QUEUE_DEAD() { Queue queue = new Queue(QUEUE_DEAD); return queue; } /** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); * 绑定队列到交换机 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); } @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); } @Bean public Binding BINDING_QUEUE_INFORM_SMS2(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_DIRECT_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.sms.test").noargs(); } @Bean public Binding BINDING_QUEUE_DEAD(@Qualifier(QUEUE_DEAD) Queue queue, @Qualifier(EXCHANGE_DIRECT_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(QUEUE_DEAD).noargs(); } @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_DIRECT_INFORM,QUEUE_DEAD); } }监听类:
@Component public class ReceiveHandler { //监听email队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) public void receive_email(String msg, Message message, Channel channel) throws Exception { System.out.println("email队列接收消息:"+msg); } }发送类:
@Component public class RabbitmqSend { @Autowired RabbitTemplate rabbitTemplate; public void testSendByTopics(){ DlxMsg messagePostProcessor = new DlxMsg(5000l); for (int i=0;i<1;i++){ String message = "sms email inform to user"+i; rabbitTemplate.convertAndSend(RabbitmqConfig.QUEUE_INFORM_EMAIL,"inform.sms.email",message); System.out.println("Send Message is:'" + message + "'"); } } }启动类:
@SpringBootApplication public class RabbitmqDemoApplication { @Autowired RabbitmqSend rabbitmqSend; public static void main(String[] args) throws Exception{ ConfigurableApplicationContext run = SpringApplication.run(RabbitmqDemoApplication.class, args); RabbitmqSend rabbitmqSend = (RabbitmqSend)run.getBean("rabbitmqSend"); rabbitmqSend.testSendByTopics(); } }application.yml:
server: port: 44001 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest passowrd: guest virtualHost: /这些启动完就可以直接运行了,现在直接分析源码。
我们要分析的就是RabbitMQ如何直接找到标有@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})注解的方法。
在spring-boot-autoconfigure.jar包中的spring.factories文件中,我们可以看到有自动注入RabbitAutoConfiguration类。
@Configuration(proxyBeanMethods = false) @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { .................... }我们直接看RabbitAnnotationDrivenConfiguration类。
@Configuration(proxyBeanMethods = false) @ConditionalOnClass(EnableRabbit.class) class RabbitAnnotationDrivenConfiguration { ............... @Bean(name = "rabbitListenerContainerFactory") @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true) SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "rabbitListenerContainerFactory") @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "direct") DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory( DirectRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } }该类中SimpleRabbitListenerContainerFactory和DirectRabbitListenerContainerFactory比较重要。两个类都继承自AbstractRabbitListenerContainerFactory类,这两个类创建的SimpleMessageListenerContainer和DirectMessageListenerContainer主要是用来负责从RabbitMQ接收消息并转发到我们自己的方法上。大家先记住这两个类。
RabbitAnnotationDrivenConfiguration类上有EnableRabbit。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(RabbitListenerConfigurationSelector.class) public @interface EnableRabbit { } public class RabbitListenerConfigurationSelector implements DeferredImportSelector { @Override public String[] selectImports(AnnotationMetadata importingClassMetadata) { return new String[] { RabbitBootstrapConfiguration.class.getName() }; } } public class RabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { if (!registry.containsBeanDefinition( RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) { registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME, new RootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor.class)); } if (!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) { registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, new RootBeanDefinition(RabbitListenerEndpointRegistry.class)); } } }EnableRabbit导入了RabbitBootstrapConfiguration,RabbitBootstrapConfiguration会注册进RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry两个类。
我们首先看RabbitListenerAnnotationBeanPostProcessor类的postProcessAfterInitialization方法。
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Class<?> targetClass = AopUtils.getTargetClass(bean); final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata); for (ListenerMethod lm : metadata.listenerMethods) { for (RabbitListener rabbitListener : lm.annotations) { processAmqpListener(rabbitListener, lm.method, bean, beanName); } } if (metadata.handlerMethods.length > 0) { processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName); } return bean; }在创建每一个Bean对象之后,会检测该Bean中是否有包含@RabbitListener注解的方法,如果有则会执行processAmqpListener方法。
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint(); endpoint.setMethod(methodToUse); processListener(endpoint, rabbitListener, bean, methodToUse, beanName); }方法会创建一个MethodRabbitListenerEndpoint(Rabbit监听方法终端)对象。
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object target, String beanName) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(rabbitListener)); endpoint.setQueueNames(resolveQueues(rabbitListener)); endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency")); endpoint.setBeanFactory(this.beanFactory); endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions())); Object errorHandler = resolveExpression(rabbitListener.errorHandler()); if (errorHandler instanceof RabbitListenerErrorHandler) { endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler); } else if (errorHandler instanceof String) { String errorHandlerBeanName = (String) errorHandler; if (StringUtils.hasText(errorHandlerBeanName)) { endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class)); } } else { throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a " + errorHandler.getClass().toString()); } String group = rabbitListener.group(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } String autoStartup = rabbitListener.autoStartup(); if (StringUtils.hasText(autoStartup)) { endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup)); } endpoint.setExclusive(rabbitListener.exclusive()); String priority = resolve(rabbitListener.priority()); if (StringUtils.hasText(priority)) { try { endpoint.setPriority(Integer.valueOf(priority)); } catch (NumberFormatException ex) { throw new BeanInitializationException("Invalid priority value for " + rabbitListener + " (must be an integer)", ex); } } resolveExecutor(endpoint, rabbitListener, target, beanName); resolveAdmin(endpoint, rabbitListener, target); resolveAckMode(endpoint, rabbitListener); resolvePostProcessor(endpoint, rabbitListener, target, beanName); RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName); this.registrar.registerEndpoint(endpoint, factory); }endpoint经过一系列设置后,执行this.registrar.registerEndpoint(endpoint, factory);方法。registrar=RabbitListenerEndpointRegistrar(),在该类创建时定义。
RabbitListenerEndpointRegistrar.registerEndpoint方法
public void registerEndpoint(RabbitListenerEndpoint endpoint, @Nullable RabbitListenerContainerFactory<?> factory) { Assert.notNull(endpoint, "Endpoint must be set"); Assert.hasText(endpoint.getId(), "Endpoint id must be set"); Assert.state(!this.startImmediately || this.endpointRegistry != null, "No registry available"); // Factory may be null, we defer the resolution right before actually creating the container AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null resolveContainerFactory(descriptor), true); } else { this.endpointDescriptors.add(descriptor); } } }因为startImmediately=false,此处只是将endpoint加入到list中。
继续看RabbitListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated方法。
public void afterSingletonsInstantiated() { this.registrar.setBeanFactory(this.beanFactory); if (this.beanFactory instanceof ListableBeanFactory) { Map<String, RabbitListenerConfigurer> instances = ((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class); for (RabbitListenerConfigurer configurer : instances.values()) { configurer.configureRabbitListeners(this.registrar); } } if (this.registrar.getEndpointRegistry() == null) { if (this.endpointRegistry == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name"); this.endpointRegistry = this.beanFactory.getBean( RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, RabbitListenerEndpointRegistry.class); } this.registrar.setEndpointRegistry(this.endpointRegistry); } if (this.defaultContainerFactoryBeanName != null) { this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName); } // Set the custom handler method factory once resolved by the configurer MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory(); if (handlerMethodFactory != null) { this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory); } // Actually register all listeners this.registrar.afterPropertiesSet(); // clear the cache - prototype beans will be re-cached. this.typeCache.clear(); }直接进入registrar.afterPropertiesSet()。
public void afterPropertiesSet() { registerAllEndpoints(); } protected void registerAllEndpoints() { Assert.state(this.endpointRegistry != null, "No registry available"); synchronized (this.endpointDescriptors) { for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) { this.endpointRegistry.registerListenerContainer(// NOSONAR never null descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } }这里的endpointRegistry就是RabbitListenerEndpointRegistry类。执行registerListenerContainer方法。
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); ......................................... } }继续进入createListenerContainer方法
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); .................... } public C createListenerContainer(RabbitListenerEndpoint endpoint) { C instance = createContainerInstance(); JavaUtils javaUtils = JavaUtils.INSTANCE .acceptIfNotNull(this.connectionFactory, instance::setConnectionFactory) .acceptIfNotNull(this.errorHandler, instance::setErrorHandler); if (this.messageConverter != null && endpoint != null) { endpoint.setMessageConverter(this.messageConverter); } javaUtils .acceptIfNotNull(this.acknowledgeMode, instance::setAcknowledgeMode) .acceptIfNotNull(this.channelTransacted, instance::setChannelTransacted) .acceptIfNotNull(this.applicationContext, instance::setApplicationContext) .acceptIfNotNull(this.taskExecutor, instance::setTaskExecutor) .acceptIfNotNull(this.transactionManager, instance::setTransactionManager) .acceptIfNotNull(this.prefetchCount, instance::setPrefetchCount) .acceptIfNotNull(this.defaultRequeueRejected, instance::setDefaultRequeueRejected) .acceptIfNotNull(this.adviceChain, instance::setAdviceChain) .acceptIfNotNull(this.recoveryBackOff, instance::setRecoveryBackOff) .acceptIfNotNull(this.mismatchedQueuesFatal, instance::setMismatchedQueuesFatal) .acceptIfNotNull(this.missingQueuesFatal, instance::setMissingQueuesFatal) .acceptIfNotNull(this.consumerTagStrategy, instance::setConsumerTagStrategy) .acceptIfNotNull(this.idleEventInterval, instance::setIdleEventInterval) .acceptIfNotNull(this.failedDeclarationRetryInterval, instance::setFailedDeclarationRetryInterval) .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher) .acceptIfNotNull(this.autoStartup, instance::setAutoStartup) .acceptIfNotNull(this.phase, instance::setPhase) .acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors) .acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled); if (this.batchListener && this.deBatchingEnabled == null) { // turn off container debatching by default for batch listeners instance.setDeBatchingEnabled(false); } if (endpoint != null) { // endpoint settings overriding default factory settings javaUtils .acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup) .acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor) .acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode); javaUtils .acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy); instance.setListenerId(endpoint.getId()); endpoint.setBatchListener(this.batchListener); endpoint.setupListenerContainer(instance); } if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) { AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance .getMessageListener(); javaUtils .acceptIfNotNull(this.beforeSendReplyPostProcessors, messageListener::setBeforeSendReplyPostProcessors) .acceptIfNotNull(this.retryTemplate, messageListener::setRetryTemplate) .acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null, this.recoveryCallback, messageListener::setRecoveryCallback) .acceptIfNotNull(this.defaultRequeueRejected, messageListener::setDefaultRequeueRejected) .acceptIfNotNull(endpoint.getReplyPostProcessor(), messageListener::setReplyPostProcessor); } initializeContainer(instance, endpoint); if (this.containerCustomizer != null) { this.containerCustomizer.configure(instance); } return instance; }这里的createContainerInstance方法就是会根据之前创建的SimpleMessageListenerContainer和DirectMessageListenerContainer其中一个。
之后会执行到endpoint.setupListenerContainer(instance);这行。
public void setupListenerContainer(MessageListenerContainer listenerContainer) { AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) listenerContainer; ...................... setupMessageListener(listenerContainer); } private void setupMessageListener(MessageListenerContainer container) { MessageListener messageListener = createMessageListener(container); Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener"); container.setupMessageListener(messageListener); } protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) { Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(); messageListener.setHandlerAdapter(configureListenerAdapter(messageListener)); String replyToAddress = getDefaultReplyToAddress(); if (replyToAddress != null) { messageListener.setResponseAddress(replyToAddress); } MessageConverter messageConverter = getMessageConverter(); if (messageConverter != null) { messageListener.setMessageConverter(messageConverter); } if (getBeanResolver() != null) { messageListener.setBeanResolver(getBeanResolver()); } return messageListener; }createMessageListener方法会创建一个MessagingMessageListenerAdapter对象,之后configureListenerAdapter(messageListener)方法会创建一个HandlerAdapter对象用于关联对应的Bean和方法。
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) { InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); return new HandlerAdapter(invocableHandlerMethod); }此处的步骤就是
1、通过SimpleMessageListenerContainer或DirectMessageListenerContainer,找到MessagingMessageListenerAdapter
2、通过MessagingMessageListenerAdapter找到对应的HandlerAdapter
3、通过HandlerAdapter找到对应的Bean和方法。
好了,那么重点就在于如何通过RabbitMq找到SimpleMessageListenerContainer或DirectMessageListenerContainer了。
继续回到RabbitBootstrapConfiguration类,我们知道该类还注册了一个RabbitListenerEndpointRegistry。
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { ............... }该类实现了SmartLifecycle方法。那么我们就看看它的start方法。
public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } } private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { listenerContainer.start(); } } public void start() { if (isRunning()) { return; } if (!this.initialized) { synchronized (this.lifecycleMonitor) { if (!this.initialized) { afterPropertiesSet(); } } } try { logger.debug("Starting Rabbit listener container."); configureAdminIfNeeded(); checkMismatchedQueues(); doStart(); } catch (Exception ex) { throw convertRabbitAccessException(ex); } finally { this.lazyLoad = false; } }前面都是大同小异,我们主要看doStart方法。拿DirectMessageListenerContainer做例子,SimpleMessageListenerContainer类似。
protected void doStart() { if (!this.started) { actualStart(); } } protected void actualStart() { .............. if (queueNames.length > 0) { doRedeclareElementsIfNecessary(); getTaskExecutor().execute(() -> { // NOSONAR never null here startConsumers(queueNames); }); } else { this.started = true; this.startedLatch.countDown(); } ................. }看名字就知道开始消费者,直接看startConsumers方法。
private void startConsumers(final String[] queueNames) { while (!DirectMessageListenerContainer.this.started && isRunning()) { this.cancellationLock.reset(); try { for (String queue : queueNames) { consumeFromQueue(queue); } } ................ DirectMessageListenerContainer.this.started = true; DirectMessageListenerContainer.this.startedLatch.countDown(); } } } } private void consumeFromQueue(String queue) { List<SimpleConsumer> list = this.consumersByQueue.get(queue); // Possible race with setConsumersPerQueue and the task launched by start() if (CollectionUtils.isEmpty(list)) { for (int i = 0; i < this.consumersPerQueue; i++) { doConsumeFromQueue(queue); } } }进入doConsumeFromQueue方法
private void doConsumeFromQueue(String queue) { .............. Connection connection = null; // NOSONAR (close) try { connection = getConnectionFactory().createConnection(); } SimpleConsumer consumer = consume(queue, connection); ........... } private SimpleConsumer consume(String queue, Connection connection) { Channel channel = null; SimpleConsumer consumer = null; try { channel = connection.createChannel(isChannelTransacted()); channel.basicQos(getPrefetchCount()); consumer = new SimpleConsumer(connection, channel, queue); channel.queueDeclarePassive(queue); consumer.consumerTag = channel.basicConsume(queue, getAcknowledgeMode().isAutoAck(), (getConsumerTagStrategy() != null ? getConsumerTagStrategy().createConsumerTag(queue) : ""), // NOSONAR never null isNoLocal(), isExclusive(), getConsumerArguments(), consumer); } ....................... return consumer; }终于看到点端倪了。
1、首先创建一个channel,用于和rabbitmq进行交互。
2、创建一个SimpleConsumer,用户消费。SimpleConsumer是DirectMessageListenerContainer的内部类,并继承DefaultConsumer。如果是SimpleMessageListenerContainer创建的就是InternalConsumer。
3、声明该channel交互哪个队列channel.queueDeclarePassive(queue)。
public Queue.DeclareOk queueDeclarePassive(String queue) throws IOException { validateQueueNameLength(queue); return (Queue.DeclareOk) exnWrappingRpc(new Queue.Declare.Builder() .queue(queue) .passive() .exclusive() .autoDelete() .build()) .getMethod(); }4、建立channel和SimpleConsumer映射关系。在channel.basicConsume方法中,consumer是作为最后一个入参。
public String basicConsume(String queue, final boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException { final Method m = new Basic.Consume.Builder() .queue(queue) .consumerTag(consumerTag) .noLocal(noLocal) .noAck(autoAck) .exclusive(exclusive) .arguments(arguments) .build(); BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) { @Override public String transformReply(AMQCommand replyCommand) { String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag(); _consumers.put(actualConsumerTag, callback); .................. } } }可以看到根据消费tag,找到对应的consumer。
这里简单说一下创建connection的步骤,有兴趣的同学可以自己看源码。
1、会通过com.rabbitmq.client.ConnectionFactory类的newConnection方法进行创建。
2、创建FrameHandler对象,主要用于和RabbitMq的Socket连接。
3、创建AMQConnection对象,封装FrameHandler。
4、启动AMQConnection.start方法。会调用SocketFrameHandler.initialize方法,最终会调用AMQConnection.run方法
public void run() { boolean shouldDoFinalShutdown = true; try { while (_running) { Frame frame = _frameHandler.readFrame(); readFrame(frame); } } catch (Throwable ex) { ............ } finally { if (shouldDoFinalShutdown) { doFinalShutdown(); } } }其中readFrame就是有消息来了之后会执行。
读取消息:
1、AMQChannel的handleCompleteInboundCommand方法,并执行processAsync,再执行processDelivery。
protected void processDelivery(Command command, Basic.Deliver method) { Basic.Deliver m = method; Consumer callback = _consumers.get(m.getConsumerTag()); ...................... try { .................. this.dispatcher.handleDelivery(callback, m.getConsumerTag(), envelope, (BasicProperties) command.getContentHeader(), command.getContentBody()); } catch (WorkPoolFullException e) { // couldn't enqueue in work pool, propagating throw e; } }2、根据刚才放入的consumer标签,找到消费者Consumer。执行dispatcher.handleDelivery方法
public void handleDelivery(final Consumer delegate, final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException { executeUnlessShuttingDown( new Runnable() { @Override public void run() { try { delegate.handleDelivery(consumerTag, envelope, properties, body); } catch (Throwable ex) { connection.getExceptionHandler().handleConsumerException( channel, ex, delegate, consumerTag, "handleDelivery"); } } }); }3、再执行SimpleConsumer.handleDelivery方法。刚才说了SimpleConsumer是DirectMessageListenerContainer内部类,其实也是执行DirectMessageListenerContainer类的handleDelivery方法。
PS:这里说一下
①:如果是 DirectMessageListenerContainer,则通过SimpleConsumer执行callExecuteListener。
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) { MessageProperties messageProperties = getMessagePropertiesConverter().toMessageProperties(properties, envelope, "UTF-8"); messageProperties.setConsumerTag(consumerTag); messageProperties.setConsumerQueue(this.queue); Message message = new Message(body, messageProperties); long deliveryTag = envelope.getDeliveryTag(); if (this.logger.isDebugEnabled()) { this.logger.debug(this + " received " + message); } updateLastReceive(); if(){ ............. }else { try { callExecuteListener(message, deliveryTag); } catch (Exception e) { // NOSONAR } } }
②:如果是SimpleMessageListenerContainer,则InternalConsumer执行handleDelivery,往队列中放入消息。
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { try { if (BlockingQueueConsumer.this.abortStarted > 0) { if (!BlockingQueueConsumer.this.queue.offer( new Delivery(consumerTag, envelope, properties, body, this.queueName), BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) { .................. } } else { //网队列中放入消息 BlockingQueueConsumer.this.queue .put(new Delivery(consumerTag, envelope, properties, body, this.queueName)); } } catch (@SuppressWarnings("unused") InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e); } }之后会在 SimpleMessageListenerContainer的run方法中执行下面代码取出。
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) { mainLoop(); }4、执行callExecuteListener方法。一路跟进后,会进入到AbstractMessageListenerContainer的actualInvokeListener方法。并最后进入到doInvokeListener方法。
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Object data) { ........................... // Actually invoke the message listener... try { if (data instanceof List) { listener.onMessageBatch((List<Message>) data, channelToUse); } else { message = (Message) data; listener.onMessage(message, channelToUse); } } catch (Exception e) { throw wrapToListenerExecutionFailedExceptionIfNeeded(e, data); } } finally { cleanUpAfterInvoke(resourceHolder, channelToUse, boundHere); } }5、找到之前创建的实现类MessagingMessageListenerAdapter。
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception { // NOSONAR Message<?> message = toMessagingMessage(amqpMessage); invokeHandlerAndProcessResult(amqpMessage, channel, message); } protected void invokeHandlerAndProcessResult(@Nullable org.springframework.amqp.core.Message amqpMessage, Channel channel, Message<?> message) throws Exception { // NOSONAR InvocationResult result = null; try { if (this.messagingMessageConverter.method == null) { amqpMessage.getMessageProperties() .setTargetMethod(this.handlerAdapter.getMethodFor(message.getPayload())); } result = invokeHandler(amqpMessage, channel, message); } catch (ListenerExecutionFailedException e) { } }6、invokeHandler方法就是之前创建的handlerAdapter,就能执行对应Bean的方法了。
public InvocationResult invoke(Message<?> message, Object... providedArgs) throws Exception { // NOSONAR if (this.invokerHandlerMethod != null) { return new InvocationResult(this.invokerHandlerMethod.invoke(message, providedArgs), null, this.invokerHandlerMethod.getMethod().getGenericReturnType(), this.invokerHandlerMethod.getBean(), this.invokerHandlerMethod.getMethod()); } else if (this.delegatingHandler.hasDefaultHandler()) { // Needed to avoid returning raw Message which matches Object Object[] args = new Object[providedArgs.length + 1]; args[0] = message.getPayload(); System.arraycopy(providedArgs, 0, args, 1, providedArgs.length); return this.delegatingHandler.invoke(message, args); } else { return this.delegatingHandler.invoke(message, providedArgs); } }SimpleMessageListenerContainer和DirectMessageListenerContainer是AbstractMessageListenerContainer子类
1、通过SimpleMessageListenerContainer或DirectMessageListenerContainer,找到MessagingMessageListenerAdapter
2、通过MessagingMessageListenerAdapter找到对应的HandlerAdapter
3、通过HandlerAdapter找到对应的Bean和方法
1、会通过com.rabbitmq.client.ConnectionFactory类的newConnection方法进行创建。
2、创建FrameHandler对象,主要用于和RabbitMq的Socket连接。
3、创建AMQConnection对象,封装FrameHandler。
4、启动AMQConnection.start方法。会调用SocketFrameHandler.initialize方法,最终会调用AMQConnection.run准备进行接收
5、AMQChannel的handleCompleteInboundCommand方法,并执行processAsync,再执行processDelivery。
6、根据刚才放入的consumer标签,找到消费者Consumer。执行dispatcher.handleDelivery方法。
7、再执行SimpleConsumer.handleDelivery方法。刚才说了SimpleConsumer是DirectMessageListenerContainer内部类,其实也是执行DirectMessageListenerContainer类的handleDelivery方法。
8、执行callExecuteListener方法。一路跟进后,会进入到AbstractMessageListenerContainer的actualInvokeListener方法。并最后进入到doInvokeListener方法
9、找到之前创建的实现类MessagingMessageListenerAdapter。
其实读者只需要记住前面标蓝的重要三部分内容就行了,后续创建并接收的可以忽略。知道怎么建立对应关系即可。
Spring Stream 接收 RabbitMQ消息找对应类的源码解析点这里