天天看點

Flume NG 程式設計實踐

前言

Flume已經自帶了幾個比較常用的source,但是在特定情況下還是有一些需求不能滿足,是以需要特定開發的程式。

我們在使用的過程中,遇到了遇到對source和sink開發的情況,是以下面以這兩個為例解釋一下。

我們的需求主要是功能方面的,是以隻寫了source和sink的程式,沒有對channal端沒有做開發,直接用了file channal,之前看過美團對flume的使用,感覺對channal的定制開發還是不錯的,感興趣的可以參考一下。

定制Source

Simple Source Example

下面是一個PollableSource的簡單例子,從官網copy下來的。

主要就是configure、start、stop和process,裡面的注釋還是挺清晰的。

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}           

定制Kafka Source

官網提供的Kafka是單topic消費的,我們在設定資料流處理系統的時候,需要對多個topic進行消費。

大緻原理是這樣的:

  • 被消費的topic資訊存放在中繼資料系統中;
  • kafka source 初始化topic名稱後消費所有的topic并傳給channal;
  • kafka source 端有一個線程,定時地去讀取中繼資料系統的topic資訊,這樣的話,當新添加topic後,在一定的時間延遲内,flume就會繼續消費新的topic。
public class MultiKafkaSource extends AbstractPollableSource implements Configurable { 


    //初始化一些配置資訊,包括中繼資料系統的位址rest接口。
    @Override
    public void doConfigure(Context context) {
        ......
    }

    /**
     * start sub
     */
    private void startSubscribe() {
        logger.info("startSubscribe");
        initTopics();
        if (consumer == null) {
            consumer = new KafkaConsumer<>(parameters);
            consumer.subscribe(topics);
        } else {
            consumer.unsubscribe();
            consumer.subscribe(topics);
        }

    }

    /**
     * init the topics
     */
    private void initTopics() {
        if (topics == null) {
            topics = new ArrayList<>();
        } else {
            topics.clear();
        }
        try {
            List<Message> msgList = getter.getAllMsgBasicInfo();

            for (Message message : msgList) {
                topics.add(TopicUtils.getTopic(message.getMsgType()));
            }
        } catch (IOException e) {
            logger.error("initTopics error");
            logger.error(e.getMessage());
        }

        logger.info("topics:" + Arrays.toString(topics.toArray()));
    }

    /**
     * 重新訂閱,中繼資料系統會有更新
     */
    class ReSubscribe implements Runnable {

        @Override
        public void run() {
            while (!isStop) {
                try {
                    TimeUnit.SECONDS.sleep(subscribeTimes);
                    if (!reSubscribe.getAndSet(true)) {
                        logger.info("restart subscribe");
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
        }
    }

    //開啟訂閱的線程
    @Override
    protected void doStart() throws FlumeException {
        logger.info("KafkaSource DO START");
        counter.start();
        startSubscribe();
        new Thread(new ReSubscribe()).start();
    }

    @Override
    protected void doStop() throws FlumeException {
        isStop = true;
        consumer.close();
        counter.stop();

        logger.info("KafkaSource Do Stop");
    }
    @Override
    protected synchronized Status doProcess() throws EventDeliveryException {
        logger.info("KafkaSource doProcess");

        long startTime = System.currentTimeMillis();
        //拉資料
        ConsumerRecords<String, String> records = consumer.poll(1000);
        //判斷叢集是否shutdown
        HDFSSystemState.getInstance().checkHDFSIsShutDown("source[" + flumeSourceId + "]", null);

        //檢查source和sink資料量是否正常
        boolean isGood = SystemMonitor.getInstance().checkSystemIsGood(sourceProcessCounter);

        while (!isGood) {
            try {
                logger.info("source[" + flumeSourceId + "] is to fast, wait 1 minutes...");
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                logger.error(e.getMessage());
            }
            isGood = SystemMonitor.getInstance().checkSystemIsGood(sourceProcessCounter);
        }

        Status status;
        if (records.isEmpty()) {
            //counter.incrementKafkaEmptyCount();
            status = Status.BACKOFF;
        } else {
            Iterator<ConsumerRecord<String, String>> iter = records.iterator();
            boolean has = true;
            while (has) {
                for (int i = 0; (has = iter.hasNext()) && i < batchLimit; i++) {
                    ConsumerRecord<String, String> record = iter.next();

                    Map<String, String> headers = new HashMap<>();
                    headers.put(EventHeader.TIMESTAME.name(),
                        String.valueOf(System.currentTimeMillis()));
                    headers.put(EventHeader.TOPIC.name(), record.topic());
                    headers.put(EventHeader.FLUME_SOURCE_ID.name(), flumeSourceId);

                    String key = record.key();
                    if (key != null && key.length() != 0) {
                        headers.put(EventHeader.KEY.name(), key);
                    }
                    //資料内容在Event中
                    Event event = EventBuilder.withBody(record.value().getBytes(), headers);
                    eventList.add(event);
                    sourceProcessCounter++;

                }

                counter.addToKafkaEventGetTimer(System.currentTimeMillis() - startTime);
                counter.addToEventReceivedCount(eventList.size());

                logger.info("KafkaSource process record size : " + eventList.size());
                if (eventList.size() > 0) {
                    getChannelProcessor().processEventBatch(eventList);

                    counter.addToEventAcceptedCount(eventList.size());

                    //buger出現的地方
                    //消費完後,kafka會記錄offset。
                    //如果offset沒有及時更新,下次再次啟動的時候,會取到老的offset,是以就會出現重複消費資料的情況
                    consumer.commitSync();
                    eventList.clear();

                    long commitStart = System.currentTimeMillis();

                    counter.addToKafkaCommitTimer(System.currentTimeMillis() - commitStart);
                }
            }

            status = Status.READY;
        }

        if (reSubscribe.get()) {
            startSubscribe();
            reSubscribe.set(false);
        }
        return status;
    }
}           

定制Sink

The purpose of a Sink to extract Events from the Channel and forward them to the next Flume Agent in the flow or store them in an external repository. A Sink is associated with exactly one Channels, as configured in the Flume properties file. There’s one SinkRunner instance associated with every configured Sink, and when the Flume framework calls SinkRunner.start(), a new thread is created to drive the Sink (using SinkRunner.PollingRunner as the thread’s Runnable). This thread manages the Sink’s lifecycle. The Sink needs to implement the start() and stop() methods that are part of the LifecycleAware interface. The Sink.start() method should initialize the Sink and bring it to a state where it can forward the Events to its next destination. The Sink.process() method should do the core processing of extracting the Event from the Channel and forwarding it. The Sink.stop() method should do the necessary cleanup (e.g. releasing resources). The Sink implementation also needs to implement the Configurable interface for processing its own configuration settings. For example:

Simple Example

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}           

參考

  • https://flume.apache.org/FlumeDeveloperGuide.html
  • http://www.jianshu.com/p/befa9c06baad

2016-09-14 22:23:11 rljp

轉載請注明: 轉載自 趙德棟的部落格

作者:趙德棟,作者介紹

本部落格的文章集合:http://zhaodedong.com/category/

繼續閱讀