天天看点

MetaQ源码阅读及与Spring结合使用

MetaQ (全称 Metamorphosis )是一个高性能、高可用、可扩展的分布式消息中间件 ,MetaQ 具有消息存储顺序写、吞吐量大和支持本地和XA 事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景, MetaQ在阿里巴巴各个子公司被广泛应用,每天转发 250 亿 + 条消息。主要应用于异步解耦, Mysql 数据复制,收集日志等场景 。。

我是做移动互联网广告系统的,在工作中有很多场景使用到了MetaQ,例如:广告的存储、效果数据的上报,多机房扣费等都需要依赖MetaQ,由于公司已经使用MeatQ作为消息中间件的时间很久了,已经有了模板,所以很多的时候就是直接拿来使用,对里面为什么做这样那样的封装没有去深入的了解,刚好这段时间有空就去看了看源码,给自己总结沉淀一下,做到不仅知道怎么用,还要知道为什么这样做。

一、生产者

发送消息是由生产者MessageProduce触发,MessageProduce从MessageSessionFactory中创建出来具体实现如下:

MetaClientConfig metaClientConfig = new MetaClientConfig();
 ZKConfig zkConfig = new ZkUtils.ZKConfig()
 zkConfig.zkConnect = "127.0.0.1:2181";
 metaClientConfig.setZkConfig(zkConfig)
 MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );
 // create producer,强烈建议使用单例
 MessageProducer producer = sessionFactory.createProducer();
 // publish topic
 final String topic = "meta-test";
 producer.publish(topic);
 SendResult sendResult = producer.sendMessage(new Message(topic, "xxxx".getBytes()));
           

我们可以看出MessageProduce 是通过工厂创建的 ,MetaMessageSessionFactory需要一个参数就MetaClientConfig这个类,MetaClientConfig里面是什么了?MetaClientConfig中有个工具类ZkUtils,通过名字就知道是和zk交互的类,

和zk交互我们知道创建一个客户端需要几个参数:

1. zkConnect (zk的ip地址)

2. zkSessionTimeoutMs(zk的会话超时时间)

3. zkConnectionTimeoutMs(zk的连接超时时间)

4. zkSyncTimeMs(zk心跳时间)

我们知道Sping IOC容器就是用来创建发现维护类与类之间的关系的,MetaQ团队当然也想到了这个,那他是这么实现的呢?

在com.taobao.metamorphosis.client.extension.spring 中有如下几个类:

1. AbstractMetaqMessageSessionFactory

2. DefaultMessageListener

3. JavaSerializationMessageBodyConverter

4. MessageBodyConverter

5. MessageBuilder

6. MessageListenerContainer

7. MetaqMessage

8. MetaqMessageSessionFactoryBean

9. MetaqTemplate

10. MetaqTopic

11. XAMetaqMessageSessionFactoryBean

先来看看MetaqTemplate这个类,这个类提供发送消息的方法,

public SendResult send(MessageBuilder builder) throws InterruptedException {
        Message msg = builder.build(this.messageBodyConverter);
        final String topic = msg.getTopic();
        MessageProducer producer = this.getOrCreateProducer(topic);
        try {
            return producer.sendMessage(msg);
        }
        catch (MetaClientException e) {
            return new SendResult(false, null, -, ExceptionUtils.getFullStackTrace(e));
        }
    }
           

我们发现使用send方法的时候还要MessageBodyConverter的类,这个类是用来做什么的呢?:

/**
     * Convert a message object to byte array.
     * 
     * @param body
     * @return
     * @throws MetaClientException
     */
    public byte[] toByteArray(T body) throws MetaClientException;

    /**
     * Convert a byte array to message object.
     * 
     * @param bs
     * @return
     * @throws MetaClientException
     */
    public T fromByteArray(byte[] bs) throws MetaClientException;
           

可以看到这里定义了两个方法 用来把消息转换为二进制,以及从二进制中恢复消息,我们知道数据在网络上传输都是二进制的方式进行传输的,这个接口很方便我们做扩展,灵活的实现自己的转换规则,比如采用其他序列化协议,如protobufs,hessian等等,当然如果你不想实现自己的消息转换类,这里提供了一个实现类:JavaSerializationMessageBodyConverter

public class JavaSerializationMessageBodyConverter implements MessageBodyConverter<Serializable> {
    JavaSerializer serializer = new JavaSerializer();
    JavaDeserializer deserializer = new JavaDeserializer();


    @Override
    public byte[] toByteArray(Serializable body) throws MetaClientException {
        try {
            return this.serializer.encodeObject(body);
        }
        catch (IOException e) {
            throw new MetaClientException(e);

        }
    }


    @Override
    public Serializable fromByteArray(byte[] bs) throws MetaClientException {
        try {
            return (Serializable) this.deserializer.decodeObject(bs);
        }
        catch (IOException e) {
            throw new MetaClientException(e);

        }
    }

}
           

JavaSerializationMessageBodyConverter 实现了MessageBodyConverter ,对消息体进行序列化和反序列化。

send方法中还调用了getOrCreateProducer我们来看看这个方法:

public MessageProducer getOrCreateProducer(final String topic) {
        if (!this.shareProducer) {
            FutureTask<MessageProducer> task = this.producers.get(topic);
            if (task == null) {
                task = new FutureTask<MessageProducer>(new Callable<MessageProducer>() {

                    @Override
                    public MessageProducer call() throws Exception {
                        MessageProducer producer = MetaqTemplate.this.messageSessionFactory.createProducer();
                        producer.publish(topic);
                        if (!StringUtils.isBlank(MetaqTemplate.this.defaultTopic)) {
                            producer.setDefaultTopic(MetaqTemplate.this.defaultTopic);
                        }
                        return producer;
                    }

                });
                FutureTask<MessageProducer> oldTask = this.producers.putIfAbsent(topic, task);
                if (oldTask != null) {
                    task = oldTask;
                }
                else {
                    task.run();
                }
            }

            try {
                MessageProducer producer = task.get();
                return producer;
            }
            catch (ExecutionException e) {
                throw ThreadUtils.launderThrowable(e.getCause());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        else {
            if (this.sharedProducer == null) {
                synchronized (this) {
                    if (this.sharedProducer == null) {
                        this.sharedProducer = this.messageSessionFactory.createProducer();
                        if (!StringUtils.isBlank(this.defaultTopic)) {
                            this.sharedProducer.setDefaultTopic(this.defaultTopic);
                        }
                    }
                }
            }
            this.sharedProducer.publish(topic);
            return this.sharedProducer;
        }
        throw new IllegalStateException("Could not create producer for topic '" + topic + "'");
    }
           

看到熟悉的 messageSessionFactory ,创建生产者的时候就需要这个工厂类来创建,我们在回过头来看看MetaqTemplate这个类:

private MessageSessionFactory messageSessionFactory;
    private String defaultTopic;
    private MessageBodyConverter<?> messageBodyConverter;
    private boolean shareProducer = false;
    private volatile MessageProducer sharedProducer;
           

有好几个属性,我们只要传入一个MessageSessionFactory 及MessageBodyConverter对象即可:

到此我们就可以创建MetaqTemplate这个类了:

先来创建MessageSessionFactory ,这里使用MetaqMessageSessionFactoryBean这个实现类:

<!--  message session factory -->  
    <bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">  
        <property name="zkConnect" value="127.0.0.1:2181"/>  
        <property name="zkSessionTimeoutMs" value="30000"/>  
        <property name="zkConnectionTimeoutMs" value="30000"/>  
        <property name="zkSyncTimeMs" value="5000"/>  
</bean>  
           

这样我们就创建了一个工厂类了,然后我们需要创建一个消息转换类,这里使用默认实现类:

JavaSerializationMessageBodyConverter

<!--  message body converter using java serialization. -->  
   <bean id="messageBodyConverter"    
  class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>  
           

我们需要创建MetaqTemplate元素都准备好了,可以创建MetaqTemplate类了:

<!--  template to send messages. -->  
   <bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">      
       <property name="messageSessionFactory" ref="sessionFactory"/>  
       <property name="messageBodyConverter" ref="messageBodyConverter"/>  
   </bean>  
           

可以发送消息了:

final String topic = "date";  
final SendResult sendResult =  
template.send(MessageBuilder.withTopic(topic).withBody(new Date()); 
           

二、消费者

看完生产者我在来看看消费者,接受消息是由消费者MessageConsume触发,MessageConsume从MessageSessionFactory中创建出来具体实现如下:

MetaClientConfig metaClientConfig = new MetaClientConfig();
   ZKConfig zkConfig = new ZkUtils.ZKConfig()
   zkConfig.zkConnect = "127.0.0.1:2181";
   metaClientConfig.setZkConfig(zkConfig)
   MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig );
        // subscribed topic
        final String topic = "meta-test";
        // consumer group
        final String group = "meta-example";
        // create consumer,强烈建议使用单例
        MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
        // subscribe topic
        consumer.subscribe(topic,  * , new MessageListener() {

            public void recieveMessages(Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }


            public Executor getExecutor() {
                // Thread pool to process messages,maybe null.
                return null;
            }
        });
        // complete subscribe
        consumer.completeSubscribe();

    }
           

消费者也是通过MetaMessageSessionFactory 去创建的,然后调用subscribe 实现消息的订阅接受及处理,我们来看看这个类MessageListenerContainer:

@Override
    public void afterPropertiesSet() throws Exception {
        log.info("Start to initialize message listener container.");
        if (this.subscribers != null) {
            Set<MessageConsumer> consumers = new HashSet<MessageConsumer>();
            for (Map.Entry<MetaqTopic, ? extends DefaultMessageListener<?>> entry : this.subscribers.entrySet()) {
                final MetaqTopic topic = entry.getKey();
                final DefaultMessageListener<?> listener = entry.getValue();
                if (topic == null) {
                    throw new IllegalArgumentException("Topic is null");
                }
                if (StringUtils.isBlank(topic.getTopic())) {
                    throw new IllegalArgumentException("Blank topic");
                }
                MessageConsumer consumer = this.getMessageConsumer(topic);
                if (consumer == null) {
                    throw new IllegalStateException("Get or create consumer failed");
                }
                log.info("Subscribe topic=" + topic.getTopic() + " with group=" + topic.getGroup());
                if (listener.getMessageBodyConverter() == null) {
                    listener.setMessageBodyConverter(this.messageBodyConverter);
                }
                consumer.subscribe(topic.getTopic(), topic.getMaxBufferSize(), listener);
                consumers.add(consumer);
            }
            for (MessageConsumer consumer : consumers) {
                consumer.completeSubscribe();
            }
        }
        log.info("Initialize message listener container successfully.");
    }
           

可以看到这个类在初始完成后会创建一个消费者,然后调用消费者的subscribe方法订阅和处理消息,创建这个类需要下面这个几个类:MetaqTopic 、DefaultMessageListener或者其子类,下面我来分别看看这个两个类:

MetaqTopic 主要有如下几个属性:

private ConsumerConfig consumerConfig = new ConsumerConfig();
    private String topic;
    private int maxBufferSize =  * ;
           

我们知道创建消费者的时候需要指定topic及每次消费的大小,MetaqTopic 这个就是用来指定这些属性值的

再来看看 DefaultMessageListener:

@Override
    public void recieveMessages(Message message) throws InterruptedException {
        if (this.messageBodyConverter != null) {
            try {
                T body = (T) this.messageBodyConverter.fromByteArray(message.getData());
                this.onReceiveMessages(new MetaqMessage<T>(message, body));
            }
            catch (Exception e) {
                log.error("Convert message body from byte array failed,msg id is " + message.getId() + " and topic is "
                        + message.getTopic(), e);
                message.setRollbackOnly();
            }
        }
        else {
            this.onReceiveMessages(new MetaqMessage<T>(message, null));
        }
    }

    public abstract void onReceiveMessages(MetaqMessage<T> msg);
           

这个类实现了recieveMessages处理消息的方法,在方法中我们调用了MessageBodyConverter 这个类转换消息,然后调用了onReceiveMessages 这个方法,需要我们自己来实现真正的消息处理,也就是我们需要实现DefaultMessageListener这个类中onReceiveMessages 方法来处理消息就可以了。

这里简单进行一个实现:

import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener;  
import com.taobao.metamorphosis.client.extension.spring.MetaqMessage;  
import java.util.Date;  


/** 
 * Process date messages listener. 
 *  
 * @author dennis 
 *  
 */  
public class DateMessageListener extends DefaultMessageListener<Date> {  

    @Override  
    public void onReceiveMessages(MetaqMessage<Date> msg) {  
        Date date = msg.getBody();  
        System.out.println("receive date message:" + date);  
    }  

}  
           

这样我们所需的要素就都有了,现在看看怎么用spring来配置:

<!--  topics to be subscribed. -->  
    <bean id = "dateTopic" class="com.taobao.metamorphosis.client.extension.spring.MetaqTopic">  
        <!-- consumer group -->  
        <property name="group" value="testGroup"/>  
        <!--  topic -->  
        <property name="topic" value="date"/>  
        <!--  max buffer size to fetch messages -->  
        <property name="maxBufferSize" value="16384"/>  
    </bean>  
           
<!--  message listener -->  
    <bean id= "messageListener" class="com.taobao.metamorphosis.example.spring.DateMessageListener">  
        <!--  threads to process these messages. -->  
        <property name="processThreads" value="10"/>  
    </bean>  
           
<!--  listener container to subscribe topics -->  
  <bean id ="listenerContainer" class="com.taobao.metamorphosis.client.extension.spring.MessageListenerContainer">   
       <property name="messageSessionFactory" ref="sessionFactory"/>  
       <property name="messageBodyConverter" ref="messageBodyConverter"/>  
       <property name="subscribers">  
           <map>  
               <entry key-ref="dateTopic" value-ref="messageListener"/>  
           </map>  
       </property>  
  </bean>  
           

只要配置好这些后就可以通过我们实现的监听器DateMessageListener 自动处理消息了。

写到这差不多就整理完了,代码比较多,只找了几个关键的地方进行分析,着重点落在了这么结合Spring使用。由于能力有限制,写到不到位的地方多多见谅。。

我是一只小蜗牛,虽然速度慢,但我一直在努力向前爬。。