承接上一篇,我們在寫好了producer之後,來寫consumer
Consumer接到data之後,寫入file中。
vi不顯示高亮,不爽,改一下設定
vi /etc/vimrc
添加
syntax on
建立了一個maven project FileWriterConsumer,建maven project方式參見本系列第一篇。
vi /root/kafka/kafka-examples/FileWriterConsumer/src/main/java/FileWriterConsumer/FileWriterConsumer.java
寫code,參見前幾篇moving average的pox ,把需要的dependency copy過來,到我們自己的project的pox中
然後maven install
建立一個腳本,run_param.sh來運作consumer,運作時候會顯示permission denied,要修改一下運作權限,
chmod +x run_param.sh
./run_param.sh localhost:2181 avg interfacei /root/kafka/Copyinterfacei.traffic 120000
args[3] 我改成了要寫入的檔案的file path
代碼貼出來:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package FileWriterConsumer;
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.io.File;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;
public class FileWriterConsumer {
private Properties kafkaProps = new Properties();
private ConsumerConnector consumer;
private ConsumerConfig config;
private KafkaStream<String, String> stream;
private String waitTime;
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("SimplefwconZkConsumer {zookeeper} {group.id} {topic} {file-path} {wait-time}");
return;
}
String next;
int num;
FileWriterConsumer fwcon = new FileWriterConsumer();
String zkUrl = args[0];
String groupId = args[1];
String topic = args[2];
String filePath = args[3];
fwcon.waitTime = args[4];
fwcon.configure(zkUrl,groupId);
fwcon.start(topic);
try {
//File writename = new File("/root/kafka/datasrc/interfaceiCopy.traffic");
File writename = new File(filePath);
writename.createNewFile();
BufferedWriter out = new BufferedWriter(new FileWriter(writename));
while ((next = fwcon.getNextMessage()) != null) {
System.out.println(next);
out.write(next);
out.write("\r\n");
}
out.write("Here is the file!!!!!\r\n");
out.flush();
out.close();
}catch (Exception e) {
e.printStackTrace();
}
fwcon.consumer.shutdown();
System.exit(0);
}
private void configure(String zkUrl, String groupId) {
kafkaProps.put("zookeeper.connect", zkUrl);
kafkaProps.put("group.id",groupId);
kafkaProps.put("auto.commit.interval.ms","1000");
kafkaProps.put("auto.offset.reset","largest");
// un-comment this if you want to commit offsets manually
//kafkaProps.put("auto.commit.enable","false");
// un-comment this if you don't want to wait for data indefinitely
kafkaProps.put("consumer.timeout.ms",waitTime);
config = new ConsumerConfig(kafkaProps);
}
private void start(String topic) {
consumer = Consumer.createJavaConsumerConnector(config);
/* We tell Kafka how many threads will read each topic. We have one topic and one thread */
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic,new Integer(1));
/* We will use a decoder to get Kafka to convert messages to Strings
* valid property will be deserializer.encoding with the charset to use.
* default is UTF8 which works for us */
StringDecoder decoder = new StringDecoder(new VerifiableProperties());
/* Kafka will give us a list of streams of messages for each topic.
In this case, its just one topic with a list of a single stream */
stream = consumer.createMessageStreams(topicCountMap, decoder, decoder).get(topic).get(0);
}
private String getNextMessage() {
ConsumerIterator<String, String> it = stream.iterator();
try {
return it.next().message();
} catch (ConsumerTimeoutException e) {
System.out.println("waited " + waitTime + " and no messages arrived.");
return null;
}
}
}