天天看点

kettle-从rocketmq消费消息

用途

本文演示如何通过

kettle

rocketmq

消费消息。

技术

  • kettle5.4
  • rocketmq-4.6.0

整体流程

kettle-从rocketmq消费消息

命名参数配置

kettle-从rocketmq消费消息

获取消息源码(get mq message)

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// Instantiate with specified consumer group name.
private DefaultMQPushConsumer consumer=null;
private static Object o = new Object();
private int msg_size=0; // 当前消息数量
private int msg_max_size=0; // 最大消息数量
/*解析消息*/
public boolean parseMsgs(List<MessageExt> msgs){
	try {
		for(MessageExt me:msgs) {	
			if (++msg_size > msg_max_size)
				return false;

			String msg=new String(me.getBody(),RemotingHelper.DEFAULT_CHARSET);
			logBasic(">>>>>接收消息:"+msg);
			Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
			r[0] = msg;
	        putRow(data.outputRowMeta, r);
		}
	} catch (UnsupportedEncodingException e) {
		logError("UnsupportedEncodingException", e);
	} catch(Exception e) {
		logError("Exception", e);
	}
	return true;
}
/*初始化配置*/
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {
	logBasic(">>>>>call init");
	String v = getVariable("msg_max_size","10");
	msg_max_size = Integer.valueOf(v);
	return parent.initImpl(stepMetaInterface, stepDataInterface);
}
/*关闭转换任务*/
public void stopRunning(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)
    throws KettleException
{	
	if(consumer !=null){
		logBasic(">>>>>关闭转换任务!");
		consumer.shutdown();
	}
	parent.stopRunningImpl(stepMetaInterface, stepDataInterface);
}
/*数据接收处理*/
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
	try {
		if (first){
			first = false;
			logBasic(">>>>>消息上限:"+msg_max_size);
			// Instantiate with specified consumer group name.
			consumer = new DefaultMQPushConsumer("g1");
			// Specify name server addresses.
			consumer.setNamesrvAddr("localhost:9876");
			// Subscribe one more more topics to consume.
			consumer.subscribe("TopicTest", "*");
			// Register callback to execute on arrival of messages fetched from brokers.
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {					
					logBasic(">>>>>接到mq消息,总数:"+msgs.size());
					synchronized (o) {
						if (parseMsgs(msgs)) {
							if (msg_size >= msg_max_size)
								o.notifyAll();
							return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
						} else {
							o.notifyAll();
							return ConsumeConcurrentlyStatus.RECONSUME_LATER;
						}
					}
				}
			});
			// Launch the consumer instance.
			consumer.start();
			logBasic(">>>>>启动Consumer!");		
		}
		
		synchronized (o) {
			try {
				logBasic(">>>>>等待接收mq消息!");
				o.wait();
				logBasic(">>>>>消息接收完毕,5秒后将关闭consumer!");
				TimeUnit.SECONDS.sleep(5);
				consumer.shutdown();
				logBasic(">>>>>消息接收完毕,停止Consumer!");

				setOutputDone();
				return false;
			} catch (InterruptedException e) {
				logError("InterruptedException", e);
			}
		}
	} catch (MQClientException e) {
		logError("MQClientException", e);
	} 

	return true;
}
           

执行日志

kettle-从rocketmq消费消息

继续阅读