天天看点

spring-integration-kafka简单应用

[b]spring-integration-kafka简单应用[/b]

[b]pom.xml[/b]

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.3.3.RELEASE</version>
	</parent>
	<groupId>com.sunney</groupId>
	<artifactId>kafka-demo</artifactId>
	<packaging>jar</packaging>
	<version>1.0-SNAPSHOT</version>
	<name>kafka-demo</name>
	<url>http://maven.apache.org</url>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-kafka</artifactId>
			<version>1.3.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId> org.apache.avro</groupId>
			<artifactId>avro</artifactId>
			<version>1.7.7</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.7</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.2.0</version>
		</dependency>
	</dependencies>
	<build>
		<finalName>kafak-demo</finalName>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<filtering>true</filtering>
			</resource>
		</resources>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.3</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<version>1.3.3.RELEASE</version>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
				<configuration>
					<!-- do not enable it, this will creats a non standard jar and cause
						autoconfig to fail -->
					<executable>false</executable>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-resources-plugin</artifactId>
				<version>2.6</version>
				<configuration>
					<delimiters>
						<delimiter>@</delimiter>
					</delimiters>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>
           

[b]applicationContext.xml[/b]

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
						http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
						http://www.springframework.org/schema/context
						http://www.springframework.org/schema/context/spring-context-4.2.xsd
						http://www.springframework.org/schema/aop
						http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
						http://www.springframework.org/schema/tx
						http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
	 					http://www.springframework.org/schema/mvc
                        http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd"
	default-autowire="byName">

	<!-- <context:annotation-config />
	<context:component-scan base-package="com.*" /> -->

	<!-- 导入Spring配置文件 -->
	<!-- <import resource="spring-kafka-consumer.xml" /> -->
 	<import resource="spring-kafka-producer.xml" />

</beans>
           

[b]spring-kafka-consumer.xml[/b]

[b]spring-kafka-producer.xml[/b]

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
	xmlns:task="http://www.springframework.org/schema/task"
	xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

	<!-- commons config -->
	<bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
	<bean id="kafkaEncoder"
		class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
		<constructor-arg value="java.lang.String" />
	</bean>

	<!-- producer参数配置 -->
	<bean id="producerProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="properties">
			<props>
				<prop key="topic.metadata.refresh.interval.ms">3600000</prop><!-- metadata刷新间隔时间,如果负值则失败的时候才会刷新,如果0则每次发送后都刷新,正值则是一种周期行为 -->
				<prop key="message.send.max.retries">5</prop>  <!-- 发送失败的情况下,重试发送的次数 -->
				<prop key="serializer.class">kafka.serializer.StringEncoder</prop><!-- 消息序列化类实现方式 -->
				<prop key="request.required.acks">1</prop><!-- 参与消息确认的broker数量控制,0代表不需要任何确认 1代表需要leader
					replica确认 -1代表需要ISR中所有进行确认 -->
			</props>
		</property>
	</bean>

	<!-- channel配置 -->
	<int:channel id="messageChannel">
		<int:queue />
	</int:channel>

	<!-- outbound-channel-adapter配置 -->
	<int-kafka:outbound-channel-adapter
		id="kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref="producerContextTopicTest"
		auto-startup="true" channel="messageChannel" order="3">
		<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
			receive-timeout="1" task-executor="taskExecutor" />
	</int-kafka:outbound-channel-adapter>

	<!-- 线程池配置 -->
	<task:executor id="taskExecutor" pool-size="5"
		keep-alive="120" queue-capacity="500" />
	<!-- pool-size:线程池活跃的线程数,keep-alive:线程没有任务执行时最多保持多久时间会终止(s),queue-capacity:任务队列的最大容量 -->

	<!-- producer列表配置 -->
	<int-kafka:producer-context id="producerContextTopicTest"
		producer-properties="producerProperties">
		<int-kafka:producer-configurations>
			<!-- 多个topic配置 -->
			<int-kafka:producer-configuration
				broker-list="114.55.72.173:9092" key-serializer="stringSerializer"
				value-class-type="java.lang.String" value-serializer="stringSerializer"
				topic="test-yongxing" />
		</int-kafka:producer-configurations>
		<!-- <int-kafka:producer-configuration broker-list="114.55.72.173:9092"
			kafka集群服务器列表 key-serializer="stringSerializer" key序列化方式 value-class-type="java.lang.String"
			value数据类型 value-serializer="stringSerializer" key序列化方式 topic="test-xing"
			/> 主题名字 </int-kafka:producer-configurations> -->
	</int-kafka:producer-context>
</beans>
           
package com.SpringKafka;

import java.util.HashSet;
import java.util.Set;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.SpringKafka.service.KafkaConsumer;
import com.SpringKafka.service.KafkaSendMessageService;

@SpringBootApplication
@EnableScheduling
@ComponentScan
@EnableAutoConfiguration
@ServletComponentScan
public class ApplicationMain {
	private static String topic = "test-yongxing";
	public static ApplicationContext applicationContext;

	public static void main(String[] args) throws Exception {
		SpringApplication app = new SpringApplication(ApplicationMain.class);
		app.setWebEnvironment(false);
		Set<Object> set = new HashSet<Object>();
		set.add("classpath:applicationContext.xml");
		app.setSources(set);
		applicationContext = app.run(args);

		new KafkaConsumer(topic).start();// 消费者
		send();

	}

	public static void send() {
		KafkaSendMessageService kafkaSendMessageService = ApplicationMain.applicationContext
				.getBean("kafkaSendMessageService", KafkaSendMessageService.class);
		if (kafkaSendMessageService == null) {
			System.out.println("kafkaSendMessageService == null");

		}
		String topic = "test-yongxing";
		String message = "yongxing i == ";

		for (int i = 0; i < 10; i++) {
			kafkaSendMessageService.sendMessageToKafka("test-yongxing", message + i);
			System.out.println("sendMessageToKafka i == " + i);
		}
	}
}
           
package com.SpringKafka.service.impl;

import javax.annotation.Resource;

import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import com.SpringKafka.service.KafkaSendMessageService;

@Service("kafkaSendMessageService")
public class KafkaSendMessageServiceImpl  implements KafkaSendMessageService{

    @Resource(name = "messageChannel")
    MessageChannel messageChannel;

    public void sendMessageToKafka(String topic, String message) {
    	messageChannel.send(MessageBuilder.withPayload(message)
                                    .setHeader(KafkaHeaders.TOPIC,topic)
                                    .build());
    }

}
           
package com.SpringKafka.service.impl;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import com.SpringKafka.service.KafkaService;

@Service("kafkaService")
public class KafkaServiceImpl  implements KafkaService{
    @Resource(name = "messageChannel")
    MessageChannel messageChannel;

    public void sendUserInfo(String topic, Object obj) {
    	messageChannel.send(MessageBuilder.withPayload(obj)
                                    .setHeader(KafkaHeaders.TOPIC,topic)
                                    .build());
    }

}
           
package com.SpringKafka.service;

public interface KafkaSendMessageService {
    public void sendMessageToKafka(String topic, String message);
}


package com.SpringKafka.service.impl;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.SpringKafka.service.UserDto;
import com.alibaba.fastjson.JSON;

/**
 * 类KafkaConsumerService.java的实现描述:消费接收类
 */
public class KafkaConsumerService {

    static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);

    public void processMessage(Map<String, Map<Integer, String>> msgs) {
        logger.info("===============================================processMessage===============");
        for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
            logger.info("============Topic:" + entry.getKey());
            LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue();
            Set<Integer> keys = messages.keySet();
            for (Integer i : keys)
                logger.info("======Partition:" + i);
            Collection<String> values = messages.values();
            for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) {
                String message = "["+iterator.next()+"]";
                logger.info("=====message:" + message);
//                List<UserDto> userList = JSON.parseArray(message, UserDto.class);
                logger.info("=====userList.size:" + message);

            }

        }
    }
}
           

[b]注意:[/b]这里有一个是Spring的消费者程序(spring-kafka-consumer.xml、KafkaConsumerService.java),有一个是原生应用的消费者程序(KafkaConsumer.java)

原生应用例子:

package com;

import java.util.ArrayList;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerRecords extends Thread {

	private static String topic;

	public KafkaConsumerRecords(String topic) {
		super();
		this.topic = topic;
	}

	@Override
	public void run() {
		KafkaConsumer<String, String> kafkaConsumer = kafkaConsumer();
		ArrayList<String> topics = new ArrayList<>();
		topics.add(this.topic);
		kafkaConsumer.subscribe(topics);

		while (true) {
			ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
			kafkaConsumer.commitAsync(); // 使用commitAsync则是非阻塞方式,会在成功提交或者失败时,触发OffsetCommitCallback回调函数的执行。

			for (ConsumerRecord<String, String> record : records) {
				String json = record.value();
				System.out.println("json == " + json);
			}
		}
	}

	public KafkaConsumer<String, String> kafkaConsumer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "114.55.72.173:9092");
		props.put("group.id", "cebigdata_engin_dev");
		props.put("enable.auto.commit", false);
		props.put("auto.commit.interval.ms", 1000);
		props.put("auto.offset.reset", "latest");
		props.put("session.timeout.ms", 30000);
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("heartbeat.interval.ms", "12000");

		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		return consumer;
	}

}
           
XMl
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>1.1.1.RELEASE</version>
</dependency>
           

参考原文(原生应用例子):[url]https://my.oschina.net/cloudcoder/blog/299215[/url]

继续阅读