[b]spring-integration-kafka簡單應用[/b]
[b]pom.xml[/b]
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.3.RELEASE</version>
</parent>
<groupId>com.sunney</groupId>
<artifactId>kafka-demo</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>kafka-demo</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId> org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
</dependencies>
<build>
<finalName>kafak-demo</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>1.3.3.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- do not enable it, this will creats a non standard jar and cause
autoconfig to fail -->
<executable>false</executable>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
<configuration>
<delimiters>
<delimiter>@</delimiter>
</delimiters>
</configuration>
</plugin>
</plugins>
</build>
</project>
[b]applicationContext.xml[/b]
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd"
default-autowire="byName">
<!-- <context:annotation-config />
<context:component-scan base-package="com.*" /> -->
<!-- 導入Spring配置檔案 -->
<!-- <import resource="spring-kafka-consumer.xml" /> -->
<import resource="spring-kafka-producer.xml" />
</beans>
[b]spring-kafka-consumer.xml[/b]
[b]spring-kafka-producer.xml[/b]
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<!-- commons config -->
<bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" />
<bean id="kafkaEncoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="java.lang.String" />
</bean>
<!-- producer參數配置 -->
<bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="topic.metadata.refresh.interval.ms">3600000</prop><!-- metadata重新整理間隔時間,如果負值則失敗的時候才會重新整理,如果0則每次發送後都重新整理,正值則是一種周期行為 -->
<prop key="message.send.max.retries">5</prop> <!-- 發送失敗的情況下,重試發送的次數 -->
<prop key="serializer.class">kafka.serializer.StringEncoder</prop><!-- 消息序列化類實作方式 -->
<prop key="request.required.acks">1</prop><!-- 參與消息确認的broker數量控制,0代表不需要任何确認 1代表需要leader
replica确認 -1代表需要ISR中所有進行确認 -->
</props>
</property>
</bean>
<!-- channel配置 -->
<int:channel id="messageChannel">
<int:queue />
</int:channel>
<!-- outbound-channel-adapter配置 -->
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapterTopicTest" kafka-producer-context-ref="producerContextTopicTest"
auto-startup="true" channel="messageChannel" order="3">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
receive-timeout="1" task-executor="taskExecutor" />
</int-kafka:outbound-channel-adapter>
<!-- 線程池配置 -->
<task:executor id="taskExecutor" pool-size="5"
keep-alive="120" queue-capacity="500" />
<!-- pool-size:線程池活躍的線程數,keep-alive:線程沒有任務執行時最多保持多久時間會終止(s),queue-capacity:任務隊列的最大容量 -->
<!-- producer清單配置 -->
<int-kafka:producer-context id="producerContextTopicTest"
producer-properties="producerProperties">
<int-kafka:producer-configurations>
<!-- 多個topic配置 -->
<int-kafka:producer-configuration
broker-list="114.55.72.173:9092" key-serializer="stringSerializer"
value-class-type="java.lang.String" value-serializer="stringSerializer"
topic="test-yongxing" />
</int-kafka:producer-configurations>
<!-- <int-kafka:producer-configuration broker-list="114.55.72.173:9092"
kafka叢集伺服器清單 key-serializer="stringSerializer" key序列化方式 value-class-type="java.lang.String"
value資料類型 value-serializer="stringSerializer" key序列化方式 topic="test-xing"
/> 主題名字 </int-kafka:producer-configurations> -->
</int-kafka:producer-context>
</beans>
package com.SpringKafka;
import java.util.HashSet;
import java.util.Set;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.SpringKafka.service.KafkaConsumer;
import com.SpringKafka.service.KafkaSendMessageService;
@SpringBootApplication
@EnableScheduling
@ComponentScan
@EnableAutoConfiguration
@ServletComponentScan
public class ApplicationMain {
private static String topic = "test-yongxing";
public static ApplicationContext applicationContext;
public static void main(String[] args) throws Exception {
SpringApplication app = new SpringApplication(ApplicationMain.class);
app.setWebEnvironment(false);
Set<Object> set = new HashSet<Object>();
set.add("classpath:applicationContext.xml");
app.setSources(set);
applicationContext = app.run(args);
new KafkaConsumer(topic).start();// 消費者
send();
}
public static void send() {
KafkaSendMessageService kafkaSendMessageService = ApplicationMain.applicationContext
.getBean("kafkaSendMessageService", KafkaSendMessageService.class);
if (kafkaSendMessageService == null) {
System.out.println("kafkaSendMessageService == null");
}
String topic = "test-yongxing";
String message = "yongxing i == ";
for (int i = 0; i < 10; i++) {
kafkaSendMessageService.sendMessageToKafka("test-yongxing", message + i);
System.out.println("sendMessageToKafka i == " + i);
}
}
}
package com.SpringKafka.service.impl;
import javax.annotation.Resource;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
import com.SpringKafka.service.KafkaSendMessageService;
@Service("kafkaSendMessageService")
public class KafkaSendMessageServiceImpl implements KafkaSendMessageService{
@Resource(name = "messageChannel")
MessageChannel messageChannel;
public void sendMessageToKafka(String topic, String message) {
messageChannel.send(MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.TOPIC,topic)
.build());
}
}
package com.SpringKafka.service.impl;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;
import com.SpringKafka.service.KafkaService;
@Service("kafkaService")
public class KafkaServiceImpl implements KafkaService{
@Resource(name = "messageChannel")
MessageChannel messageChannel;
public void sendUserInfo(String topic, Object obj) {
messageChannel.send(MessageBuilder.withPayload(obj)
.setHeader(KafkaHeaders.TOPIC,topic)
.build());
}
}
package com.SpringKafka.service;
public interface KafkaSendMessageService {
public void sendMessageToKafka(String topic, String message);
}
package com.SpringKafka.service.impl;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.SpringKafka.service.UserDto;
import com.alibaba.fastjson.JSON;
/**
* 類KafkaConsumerService.java的實作描述:消費接收類
*/
public class KafkaConsumerService {
static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
public void processMessage(Map<String, Map<Integer, String>> msgs) {
logger.info("===============================================processMessage===============");
for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) {
logger.info("============Topic:" + entry.getKey());
LinkedHashMap<Integer, String> messages = (LinkedHashMap<Integer, String>) entry.getValue();
Set<Integer> keys = messages.keySet();
for (Integer i : keys)
logger.info("======Partition:" + i);
Collection<String> values = messages.values();
for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) {
String message = "["+iterator.next()+"]";
logger.info("=====message:" + message);
// List<UserDto> userList = JSON.parseArray(message, UserDto.class);
logger.info("=====userList.size:" + message);
}
}
}
}
[b]注意:[/b]這裡有一個是Spring的消費者程式(spring-kafka-consumer.xml、KafkaConsumerService.java),有一個是原生應用的消費者程式(KafkaConsumer.java)
原生應用例子:
package com;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerRecords extends Thread {
private static String topic;
public KafkaConsumerRecords(String topic) {
super();
this.topic = topic;
}
@Override
public void run() {
KafkaConsumer<String, String> kafkaConsumer = kafkaConsumer();
ArrayList<String> topics = new ArrayList<>();
topics.add(this.topic);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
kafkaConsumer.commitAsync(); // 使用commitAsync則是非阻塞方式,會在成功送出或者失敗時,觸發OffsetCommitCallback回調函數的執行。
for (ConsumerRecord<String, String> record : records) {
String json = record.value();
System.out.println("json == " + json);
}
}
}
public KafkaConsumer<String, String> kafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "114.55.72.173:9092");
props.put("group.id", "cebigdata_engin_dev");
props.put("enable.auto.commit", false);
props.put("auto.commit.interval.ms", 1000);
props.put("auto.offset.reset", "latest");
props.put("session.timeout.ms", 30000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("heartbeat.interval.ms", "12000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
}
XMl
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
參考原文(原生應用例子):[url]https://my.oschina.net/cloudcoder/blog/299215[/url]