天天看点

rocketmq-client 查询手动发送消息异常

今天处理rocketmq的后台的一些问题

下面这个问题是你也用网上4.x的rocketmq版本的监控后台,才会出现的

MQClientException: CODE: 208 DESC: query message by id finished, but no message.

这个是我们用rocketmq-client手动发送消息的时候,再去查看消息详情的时候遇到的问题,会报错,消息找不到

这是由于查询出来的mq消息,这些消息的msgId,不是真的msgId,而是UniqueKey

为什么会这样呢,我们来看一段源码

public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
        List<MessageExt> msgExts = new ArrayList<MessageExt>();
        while (byteBuffer.hasRemaining()) {
            MessageExt msgExt = clientDecode(byteBuffer, readBody);
            if (null != msgExt) {
                msgExts.add(msgExt);
            } else {
                break;
            }
        }
        return msgExts;
    }
           

1.以上是解析从broker获取到的消息

public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
        return decode(byteBuffer, readBody, true, true);
    }
           

2.注意第四个参数是true,它代表是否是client端,就是监控端

public static MessageExt decode(
        java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {
        try {

            MessageExt msgExt;
            //1.重点看这里
            if (isClient) {
                msgExt = new MessageClientExt();
            } else {
                msgExt = new MessageExt();
            }

            //此处省略
            ...........
            ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);
            String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
            msgExt.setMsgId(msgId);

            //2.重点看这里
            if (isClient) {
                ((MessageClientExt) msgExt).setOffsetMsgId(msgId);
            }

            return msgExt;
        } catch (Exception e) {
            byteBuffer.position(byteBuffer.limit());
        }

        return null;
    }
           

3.MessageClientExt主要是这个对象惹的祸,老的版本mq就是一个 MessageExt

我们再来看看这个对象为什么惹祸了

public class MessageClientExt extends MessageExt {

    public String getOffsetMsgId() {
        return super.getMsgId();
    }

    public void setOffsetMsgId(String offsetMsgId) {
        super.setMsgId(offsetMsgId);
    }

    /**
    * 没错就是这里,默认拿的是属性里的UNIQ_KEY
    **/
    @Override
    public String getMsgId() {
        String uniqID = MessageClientIDSetter.getUniqID(this);
        if (uniqID == null) {
            return this.getOffsetMsgId();
        } else {
            return uniqID;
        }
    }

    public void setMsgId(String msgId) {
        //DO NOTHING
        //MessageClientIDSetter.setUniqID(this);
    }
}
           

所以知道了问题,就好解决了

我是在显示类里面做相应的处理,贴下我的处理方式,我是在  MessageView.class里处理

public static MessageView fromMessageExt(MessageExt messageExt) {
        MessageView messageView = new MessageView();
        BeanUtils.copyProperties(messageExt, messageView);
        if (messageExt.getBody() != null) {
            messageView.setMessageBody(new String(messageExt.getBody(), Charsets.UTF_8));
        }
        //主要是这里判断下,是否是这个类,是就把原来的msgId拿出来
        if(messageExt instanceof MessageClientExt){
            MessageClientExt ext =   (MessageClientExt) messageExt;
            messageView.setMsgId(ext.getOffsetMsgId());
        }

        return messageView;
    }
           

好了,这里就总结下这个问题