天天看點

Kafka 學習筆記(十)之consumer寫入file

承接上一篇,我們在寫好了producer之後,來寫consumer

Consumer接到data之後,寫入file中。

vi不顯示高亮,不爽,改一下設定

vi /etc/vimrc

添加

syntax on 

建立了一個maven project FileWriterConsumer,建maven project方式參見本系列第一篇。

 vi  /root/kafka/kafka-examples/FileWriterConsumer/src/main/java/FileWriterConsumer/FileWriterConsumer.java

寫code,參見前幾篇moving average的pox ,把需要的dependency copy過來,到我們自己的project的pox中

然後maven install

建立一個腳本,run_param.sh來運作consumer,運作時候會顯示permission denied,要修改一下運作權限,

chmod +x run_param.sh

 ./run_param.sh localhost:2181 avg interfacei /root/kafka/Copyinterfacei.traffic 120000

args[3] 我改成了要寫入的檔案的file path

代碼貼出來:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package FileWriterConsumer;

import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.commons.collections.buffer.CircularFifoBuffer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import java.io.File;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileWriter;

public class FileWriterConsumer {

    private Properties kafkaProps = new Properties();
    private ConsumerConnector consumer;
    private ConsumerConfig config;
    private KafkaStream<String, String> stream;
    private String waitTime;


    public static void main(String[] args) {
        if (args.length == 0) {
            System.out.println("SimplefwconZkConsumer {zookeeper} {group.id} {topic} {file-path} {wait-time}");
            return;
        }

        String next;
        int num;
        FileWriterConsumer fwcon = new FileWriterConsumer();
        String zkUrl = args[0];
        String groupId = args[1];
        String topic = args[2];
        String filePath = args[3];
        fwcon.waitTime = args[4];

        fwcon.configure(zkUrl,groupId);

        fwcon.start(topic);

 
        try { 

            //File writename = new File("/root/kafka/datasrc/interfaceiCopy.traffic"); 
        	File writename = new File(filePath); 
            writename.createNewFile(); 
            BufferedWriter out = new BufferedWriter(new FileWriter(writename));
            while ((next = fwcon.getNextMessage()) != null) {
                System.out.println(next);
                out.write(next);
                out.write("\r\n");
            }
            out.write("Here is the file!!!!!\r\n"); 
            out.flush(); 
            out.close(); 

            }catch (Exception e) {
                e.printStackTrace();
            }


        fwcon.consumer.shutdown();
        System.exit(0);

    }

    private void configure(String zkUrl, String groupId) {
        kafkaProps.put("zookeeper.connect", zkUrl);
        kafkaProps.put("group.id",groupId);
        kafkaProps.put("auto.commit.interval.ms","1000");
        kafkaProps.put("auto.offset.reset","largest");

        // un-comment this if you want to commit offsets manually
        //kafkaProps.put("auto.commit.enable","false");

        // un-comment this if you don't want to wait for data indefinitely
        kafkaProps.put("consumer.timeout.ms",waitTime);

        config = new ConsumerConfig(kafkaProps);
    }

    private void start(String topic) {
        consumer = Consumer.createJavaConsumerConnector(config);

        /* We tell Kafka how many threads will read each topic. We have one topic and one thread */
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic,new Integer(1));

        /* We will use a decoder to get Kafka to convert messages to Strings
        * valid property will be deserializer.encoding with the charset to use.
        * default is UTF8 which works for us */
        StringDecoder decoder = new StringDecoder(new VerifiableProperties());

        /* Kafka will give us a list of streams of messages for each topic.
        In this case, its just one topic with a list of a single stream */
        stream = consumer.createMessageStreams(topicCountMap, decoder, decoder).get(topic).get(0);
    }

    private String getNextMessage() {
        ConsumerIterator<String, String> it = stream.iterator();

        try {
            return it.next().message();
        } catch (ConsumerTimeoutException e) {
            System.out.println("waited " + waitTime + " and no messages arrived.");
            return null;
        }
    }


}