问题
最近在使用RabbitMq时遇到了一个问题,明明是转换成json发送到mq中的数据,消费者接收到的却是一串数字也就是byte数组,但是使用mq可视化页面查看数据却是正常的,之前在使用过程中从未遇到过这种情况,遇到的情况如下所示:
生产者发送消息的代码如下所示:
public void sendJsonStrMsg(String jsonStr){ rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr);}
消费者代码如下所示:
@RabbitHandler@RabbitListener(queuesToDeclare = {@Queue(name=ProducerService.JSON_QUEUE, durable = "true")},containerFactory = "prefetchTenRabbitListenerContainerFactory")public void listenJsonMsg(String msg, Channel channel, Message message){ log.debug("json字符串类型消息>>>>{}",msg);}
引入的containerFactory如下所示:
@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); MessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //<x> factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jackson2JsonMessageConverter); return factory;}
注意代码中标有<x>
的地方,这里就是我们解决问题的关键。
解决方案
我们先说解决方案,再说原因,解决方案其实很简单,在保持上述代码不变的情况下,只需要再注入如下的bean即可:
@Beanpublic MessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter("*");}
解决方案就是这么简单,只需要在原来的代码的基础上注入Jackson2JsonMessageConverter就可以了,但是原理是什么呢?且往后看。
原理分析
关于原理的解释我们从源码层面来说,毕竟源码面前没有秘密.
生产者源码分析
首先看我们发送消息到mq的方法rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr)
,从此方法进去后,经过重载的方法后最终到达下面所示的方法:
@Overridepublic void convertAndSend(String exchange, String routingKey, final Object object, @Nullable CorrelationData correlationData) throws AmqpException { send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);}
着重看convertMessageIfNecessary
方法,方法名已经很直白的告诉我们了,如果需要就转换消息,我们点进去看一下这个方法:
protected Message convertMessageIfNecessary(final Object object) { if (object instanceof Message) { //<1> return (Message) object; } return getRequiredMessageConverter().toMessage(object, new MessageProperties()); //<2>}
<1>
处是说如果要发送到mq的对象是Message的实例,那么就直接转换成Message类型返回,否则就获取MessageConverter
后调用toMessage()
方法返回Message对象。
我们先看一下RabbitTemplate#getRequiredMessageConverter()
,如下所示:
private MessageConverter getRequiredMessageConverter() throws IllegalStateException { MessageConverter converter = getMessageConverter(); if (converter == null) { throw new AmqpIllegalStateException( "No 'messageConverter' specified. Check configuration of RabbitTemplate."); } return converter;}public MessageConverter getMessageConverter() { return this.messageConverter; //<1>}
<1>
处的代码表明需要一个messageConverter
对象,我在RabbitTemplate
源码中找到了对应的set方法,由于我们没有调用set方法取设置messageConverter的值,那么就需要取查找默认值,默认值的设置如下代码所示:
/** * Convenient constructor for use with setter injection. Don't forget to set the connection factory. */public RabbitTemplate() { initDefaultStrategies(); // NOSONAR - intentionally overridable; other assertions will check}/** * Set up the default strategies. Subclasses can override if necessary. 设置默认策略,子类在必须的时候可以重写 */protected void initDefaultStrategies() { setMessageConverter(new SimpleMessageConverter());}public void setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter;}
我们点进去SimpleMessageConverter#toMessage()
方法看一下是如何将一个java对象转换成Message对象的,可惜的是在SimpleMessageConverter中未找到toMessage方法,我们在此先看一下SimpleMessageConverter继承情况,类图如下:
去掉了一些无用的接口和类之后,剩下的类图如下所示,沿着类图向上找,在AbstractMessageConverter
中找到了toMessage方法:
@Overridepublic final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg, @Nullable Type genericType) throws MessageConversionException { MessageProperties messageProperties = messagePropertiesArg; if (messageProperties == null) { messageProperties = new MessageProperties(); } Message message = createMessage(object, messageProperties, genericType); //<1> messageProperties = message.getMessageProperties(); if (this.createMessageIds && messageProperties.getMessageId() == null) { messageProperties.setMessageId(UUID.randomUUID().toString()); } return message;}
该方法中没有我们需要的内容,继续看<1>
处的方法,该方法需要返回到SimpleMessageConverter
中:
@Overrideprotected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { //<1> bytes = (byte[]) object; messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); //<1.x> } else if (object instanceof String) { //<2> try { bytes = ((String) object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "failed to convert to Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//<2.x> messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { //<3> try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException e) { throw new MessageConversionException( "failed to convert to serialized Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);//<3.x> } if (bytes != null) { messageProperties.setContentLength(bytes.length); return new Message(bytes, messageProperties); } throw new IllegalArgumentException(getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); //<4>}
这个方法就比较有意思了,在<1>
、<2>
、<3>
三处分别判断了发送的消息是否是byte[]
、String
、Serializable
,并且在判断之后将消息的.........