天天看點

storm-kafka源碼分析storm-kafka源碼分析一、概述二、org.apache.storm.kafka三、trident

storm-kafka源碼分析

@(KAFKA)[kafka, 大資料, storm]

  • storm-kafka源碼分析
  • 一概述
    • 一代碼結構
    • 二orgapachestormkafka
    • 三orgapachestormkafkatrident
      • 1spout
      • 2state
      • 3metric
    • 四其它說明
      • 1線程與分區
  • 二orgapachestormkafka
    • 一基礎類
      • 1Broker
      • 2BrokerHosts
      • 3Partition
      • 4tridentGlobalPartitionInformation
      • 5KafkaConfig
      • 6SpoutConfig
      • 7ZkState
      • 8DynamicBrokersReader
      • 9tridentZkBrokerReader
      • 10ZkCoordinator
      • 11PartitionManager
      • 12DynamicPartitionConnections
      • 13KafkaUtils
        • 1calculatePartitionsForTask
    • 二KafkaSpout
      • 1open
      • 2nextTuple
  • 三trident
    • 一tridentspout的主要流程
      • 1主要調用流程回顧
      • 2指定spout
    • 二Coordinator
      • 1Coordinator的執行個體化
      • 2close與isReady
      • 3getPartitionsForBatch
    • 三Emitter TridentKafkaEmitter結構
      • 1offset與nextOffset
      • 1事務型的spout
        • emitPartitionBatchNew
        • emitPartitionBatch
      • 2透明型的spout
        • emitPartitionBatch
      • 3公共方法
    • 四透明型spout
      • 1emitPartitionBatch
      • 2emitNewPartitionBatch
    • TODOImmutableMapof
        • TODO如果擷取失敗哪裡更新了新的分區資訊是fetch裡面作了處理嗎後面再看
      • 3failFastEmitNewPartitionBatch
      • 4doEmitNewPartitionBatch
        • 1确定offset
        • 2讀取消息
        • 3發送消息并更新offset
        • 4建構下一個meta并傳回
    • 五事務型spout
      • 1emitPartitionBatchNew
      • 2emitPartitionBatch
      • 3reEmitPartitionBatch
    • 六2種spout的公共方法
      • 1refreshPartitions
      • 2getOrderedPartitions
      • 3close
      • 4Partitions與Partition
    • 七fetch消息的邏輯
      • 1fetchMessages
      • 2KafkaUtilfetchMessages
    • 八KafkaOffsetMetric
        • TODO還有其它metric吧

一、概述

storm-kafka是storm用于讀取kafka消息的連接配接器,本文主要對trident的實作部分作了解讀。

(一)代碼結構

storm-kafka中多7個package中,其中的org.apache.storm.kafka與org.apache.storm.kafka.trident中最核心的2個,分别用于處理storm-core與trident,其它package隻是這2個的輔助。我們下面分别先簡單看一下這2個package的内容。

注:還有一個包org.apache.storm.kafka.bolt用于向kafka寫入資料,用得較少,暫不分析。

(二)org.apache.storm.kafka

org.apache.storm.kafka這個package包括了一些公共子產品,以及storm-core的spout處理。

(三)org.apache.storm.kafka.trident

trident這個package中的類按照其功能可大緻分為3類:spout, state和metric。除此之外,trident還調用了一些org.apache.storm.kafka中的類用于處理相同的事務,如metric, exception, DynamicBrokerReader等

1、spout

spout指定了如何從kafka中讀取消息,根據trident的構架,它涉及的主要類為:

* OpaqueTridentKafkaSpout, TransactionalTridentKafkaSpout: 2種類型的spout

* Coordinator, TridentKafkaEmitter: 即Coordinator與Emitter的具體實作。

* GlobalPartitionInformation, ZkBrokerReader:2個重要的輔助類,分别記錄了partition的資訊以及如何從zk中讀取kafka的狀态(還有一個靜态指定的,這裡不分析)。

2、state

3、metric

主要涉及一個類:MaxMetric,其實還有其它metric,但在org.apache.storm.kafka中定義了。

(四)其它說明

1、線程與分區

注意,storm-kafka中的spout隻是其中一個線程。

嚴格來說是每個partition隻能由一個task負責,當然,一個task可以處理多個partition。但task和partition之間是怎麼對應的呢?如何決定一個task處理哪些partition?

在trident拓撲中,多個batch會同時被處理(由MAX_SPOUT_PENDING決定),每個batch包含多個或者全部分區,每個batch讀取的消息大小由fetchSizeBytes決定。

二、org.apache.storm.kafka

(一)基礎類

這些基礎的功能類可以大緻分為以下幾類:

* Bean類:表示某一種實體,包括Broker,BrokerHost, Partition 和trident.GlobalPartitionInformation

* 配置類: 包括KafkaConfig 和 SpoutConfig。

* zk讀寫類:包括擷取state内容的ZkState,以及讀取broker資訊的DynamicBrokersReader和trident.ZkBrokerReader。

* 資料處理類:ZkCoordinator用于确定自已這個spout要處理哪些分區,以及某個分區對于的PartitionManager對象,而PartitionManager則真正的對某個分區進行處理了,DynamicPartitionConnections用于被PartitionManager調用以擷取分區對應的SimpleConsumer,

* KafkaUtils: 一些功能方法。

另外還有一些metric和錯誤處理的類等,暫不介紹。

1、Broker

Broker隻有2個變量:

public String host;
public int port;
           

表示一台kafka機器的位址與端口。

2、BrokerHosts

有2種實作:StaticHosts 與 ZkHost。

以ZkHost為例:

private static final String DEFAULT_ZK_PATH = "/brokers";
public String brokerZkStr = null;
public String brokerZkPath = null; // e.g., /kafka/brokers
public int refreshFreqSecs = 60;
           

可以看出,這是記錄了kafka在zk中的位置(ip與路徑),以及多久重新整理一下這個資訊。預設為/kafka/brokers,有2個子目錄:

topic   ids
           

分别記錄了topic資訊及broker資訊。

3、Partition

Partition記錄了一個分區的具體資訊,包括(所在的broker, 所屬的topic,partition号)。

Partition(Broker host, String topic, int partition)
           

4、trident.GlobalPartitionInformation

GlobalPartitionInformation記錄的是某個topic的所有分區資訊,其中分區資訊以一個TreeMap的形式來儲存。

public String topic;
private Map<Integer, Broker> partitionMap;
           

它有一個getOrderedPartitions()方法,傳回的就是這個topic的所有分區資訊:

public List<Partition> getOrderedPartitions() {
    List<Partition> partitions = new LinkedList<Partition>();
    for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
        partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId));
    }
    return partitions;
}
           

注意,因為使用了TreeMap的資料結構,是以傳回的結果就是有序的。

5、KafkaConfig

就是關于kafkaSpout的一些配置項,完整清單為:

public final BrokerHosts hosts;
public final String topic;
public final String clientId;

public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
           

6、SpoutConfig

SpoutConfig extends KafkaConfig
           

加了幾個配置項:

public List<String> zkServers = null;
public Integer zkPort = null;
public String zkRoot = null;
public String id = null;

public String outputStreamId;

// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;

// Exponential back-off retry settings.  These are used when retrying messages after a bolt
// calls OutputCollector.fail().
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
           

7、ZkState

ZkState記錄了每個partition的處理情況,它是通過讀寫zk來實作的,zk中的内容如下:

{"topology":{"id":"2e3226e2-ef45-4c53-b03f-aacd94068bc9","name":"ljhtest"},"offset":8066973,"partition":0,"broker":{"host":"gdc-kafka08-log.i.nease.net","port":9092},"topic":"ma30"}
           

上面的資訊分别為topoId,拓撲名稱,這個分區處理到的offset,分區号,這個分區在哪台kafka機器,哪個端口,以及topic名稱。

ZkState隻要提供了對這個zk資訊的讀寫操作,如readJSON, writeJSON。

這些資訊在zk中的位置通過建構KafkaConfig對象時的第3、4個參數指定,如下面的配置,則資料被寫在/kafka2/ljhtest下面。是以第4個參數必須唯一,否則不同拓撲會有沖突。

SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/kafka2", "ljhtest");
           

而trident的預設位置為/transactional/${topo}

8、DynamicBrokersReader

讀取zk中關于kafka的資訊,如topic的分區等。

public List<GlobalPartitionInformation> getBrokerInfo() 
           

擷取所有topic的分區資訊。

private int getNumPartitions(String topic)
           

擷取某個topic的分區數量。

9、trident.ZkBrokerReader

trident.ZkBrokerReader大部分功能通過DynamicBrokersReader完成,關于與zk的連接配接,都是通過前者完成。同時增加了以下2個方法:

  • getBrokerForTopic():傳回某個topic的分區資訊,傳回的是GlobalPartitionInformation對象。這是由于可能同時讀取多個分區的情況。
  • getAllBrokers():讀取所有的分區,不指定topic。因為支援正則topic,是以有可能有多個topic。
  • refresh(): 這是一個private方法,每隔一段時間去refresh分區資訊,在上面2個方法中被調用。

    每次發送一個新的batch時,會通過DynamicPartitionConnections#register()方法調用上面的方法,當時間超過refreshFreqSecs時,即會重新整理分區資訊。

10、ZkCoordinator

ZkCoordinator implements PartitionCoordinator
           

與之對應的還有個StaticCoordinator。

主要功能是讀取zk中的分區資訊,然後計算自己這個task負責哪些分區。

PartitionCoordinator隻有3個方法:

(1)主要方法為getMyManagedPartitions(),即計算自己這個spout應該處理哪些分區。

還有refresh是去重新整理分區資訊的。

List<PartitionManager> getMyManagedPartitions();
           

(2)擷取PartitionManager對象:

PartitionManager getManager(Partition partition);
           

(3)定期重新整理分區資訊

void refresh();
           

11、PartitionManager

記錄了某個分區的連接配接資訊,如:

Long _committedTo;
LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>();
Partition _partition;
SpoutConfig _spoutConfig;
String _topologyInstanceId;
SimpleConsumer _consumer;
DynamicPartitionConnections _connections;
ZkState _state;
           

即這個分區的分區号,consumer等資訊,還有用于發送消息的next()方法等,反正對某個分區的處理都在這個類中。

2個重點方法:

* fill()用于從kafka中擷取消息,寫到_waitingToEmit這個清單中。

* next()從上面準備的清單中讀取資料,通過emit()發送出去。

* 還有ack(),fail等方法。

PartitionManager持有一個DynamicPartitionConnections對象,通過這個對象的regist方法可以擷取到一個SimpleConsumer對象,進而對消息進行讀取。

12、DynamicPartitionConnections

DynamicPartitionConnections用于記錄broker—SimpleConsumber—-分區之間的關系。* 一個broker對應一個SimpleConsumber,但一個SimpleConsumer可以對應多個分區。尤其是spout的數量比分區數量少的時候*

主要用于建立SimpleConsumer,通過Partition資訊,傳回一個SimpleConsumer對象:

public SimpleConsumer register(Partition partition) {...}
           

以及unRegister()方法,取消關聯。

Map<Broker, ConnectionInfo> _connections = new HashMap();
           

這個變量記錄了一個broker的連接配接資訊,其中ConnectionInfo有2個成員變量:

static class ConnectionInfo {
    SimpleConsumer consumer;
    Set<String> partitions = new HashSet<String>();

    public ConnectionInfo(SimpleConsumer consumer) {
        this.consumer = consumer;
    }
}
           

是以一個broker對應一個ConnectionInfo對象,而ConnectionInfo對象内有一個SimpleConsumber對象和其對應的多個分區。

13、KafkaUtils

很多公用方法,以後一個一個解釋:

(1)calculatePartitionsForTask

public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
           

計算某個task負責哪些分區。

注意,tridentSpout并未使用這個方法計算所負責的分區。TridentSpout的分區計算不在storm-kafka中實作,而是Trident機制自帶的。詳細的說是在OpaquePartitionedTridentSpoutExecutor的emitBatch()方法中計算。這就有個問題了,為什麼在trident中,會自己計算負責的分區,而一般的storm需要自己來實作。

(二)KafkaSpout

在使用者代碼中,使用者通過使用KafKaConfig對象建立一個KafkaSpout,這是整個拓撲的起點:

SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/test2", "ljhtest");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);
           

KafkaSpout繼承自BaseRichSpout,有open(), nextTuple(), ack(), fail()等方法。

下面我們詳細分析一下KafkaSpout這個類。

1、open()

KafkaSpout完成初始化的方法,當一個spout 被建立時,這個方法被調用。這個方法主要完成了以下幾個對象的初始化:

* _state : 擷取state目錄下的内容,詳見ZkState中的介紹。

* _connection:用于在每次發送消息(nextTuple方法法)時,擷取某個分區的SimpleConsumer對象。

* _coordinator:用于在每次必發送消息時擷取這個spout要處理哪些分區。

此外還有2個metric。

2、nextTuple()

//擷取這個task要處理哪些分區,然後對每個分區資料開始處理
    List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
    for (int i = 0; i < managers.size(); i++) {

        // in case the number of managers decreased
       _currPartitionIndex = _currPartitionIndex % managers.size();
        //發送消息,下面慢慢分析。
        mitState state = managers.get(_currPartitionIndex).next(_collector);
    }
           

隻要就2個步驟:

* 擷取到這個spout要處理哪些分區

* 然後周遊分區,對消息進行處理,處理的過程在ParitionManage中,稍後再詳細介紹。

三、trident

OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout

(一)tridentspout的主要流程

1、主要調用流程回顧

先說明一下,一個spout的組成分成三個部分,簡單的說就是消息是從MasterBatchCoordinator開始的,它是一個真正的spout,而TridentSpoutCoordinator與TridentSpoutExecutor都是bolt,MasterBatchCoordinator發起協調消息,最後的結果是TridentSpoutExecutor發送業務消息。而發送協調消息與業務消息的都是調用使用者Spout中BatchCoordinator與Emitter中定義的代碼。

MaterBatchCorodeinator —————> ITridentSpout.Coordinator#isReady

|

|

v

TridentSpoutCoordinator —————> ITridentSpout.Coordinator#[initialTransaction, success, close]

|

|

v

TridentSpoutExecutor —————> ITridentSpout.Emitter#(emitBatch, success(),close)

對于分區是OpaquePartitionedTridentSpoutExecutor等

如果需要詳細了解這個過程,可參考:

http://blog.csdn.net/lujinhong2/article/details/49785077

我們先簡單介紹一下所有的相關類及其位置,然後分别介紹Coordinator與Emitter的實作。尤其是着重分析一下Emitter部分,因為它是實際讀取kafka消息,并向下遊發送的過程。

2、指定spout

使用者在代碼中用以下語句指定使用哪個spout,如:

OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
           

然後storm根據這個spout的代碼,找到對應的Coordinator與Emitter。我們看一下OpaqueTridentKafkaSpout的代碼。

這代碼很簡單,主要完成了:

(1)初始化一個Spout時,會要求傳遞一個TridentKafkaConfig的參數,指定一些配置參數。

TridentKafkaConfig _config;

public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
    _config = config;
}
           

(2)然後就分别指定了Coordinator與Emitter:

@Override
public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {
    return new TridentKafkaEmitter(conf, context, _config, context
            .getStormId()).asOpaqueEmitter();
}

@Override
public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {
    return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
}
           

(二)Coordinator

1、Coordinator的執行個體化

public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
    config = tridentKafkaConfig;
    reader = KafkaUtils.makeBrokerReader(conf, config);
}
           

2、close()與isReady()

Coordinator通過TridentKafkaConfig傳入一個DefaultCoordinator的對象,Coordinator的close()及isReady()均是通過調用DefaultCoordinator的實作來完成的。

@Override
public void close() {
    config.coordinator.close();
}

@Override
public boolean isReady(long txid) {
    return config.coordinator.isReady(txid);
}
           

我們接着看一下DefaultCoordinator的實作:

@Override
public boolean isReady(long txid) {
    return true;
}

@Override
public void close() {
}
           

很簡單,isReady()直接傳回true,close()則不做任何事情。

3、getPartitionsForBatch()

這個方法的功能是在初始化一個事務時,去zk讀取最新的分區資訊(當然是緩存逾時後才讀)。

@Override
public List<GlobalPartitionInformation> getPartitionsForBatch() {
    return reader.getAllBrokers();
}
           

注釋為:

Return the partitions currently in the source of data. The idea is is that if a new partition is added and a prior transaction is replayed, it doesn’t emit tuples for the new partition because it knows what partitions were in that transaction.

由下面可以看出,getPartitionsForBatch()都是在初始化一個事務時被調用的。

透明型:

@Override
    public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
        return _coordinator.getPartitionsForBatch();
    }
           

事務型:

@Override
    public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {
        if(currMetadata!=null) {
            return currMetadata;
        } else {
            return _coordinator.getPartitionsForBatch();            
        }
    }
           

那我們繼續看看這個方法完成了什麼功能:

@Override
public List<GlobalPartitionInformation> getAllBrokers() {
    refresh();
    return cachedBrokers;
}
           

除了這個,還有一個使用靜态指定的,暫不管它。

private void refresh() {
    long currTime = System.currentTimeMillis();
    if (currTime > lastRefreshTimeMs + refreshMillis) {
        try {
            LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
            cachedBrokers = reader.getBrokerInfo();
            lastRefreshTimeMs = currTime;
        } catch (java.net.SocketTimeoutException e) {
            LOG.warn("Failed to update brokers", e);
        }
    }
}
           

其它就是在逾時的情況下去zk讀取broker的資訊,并傳回partitions的資訊。傳回的資訊為GlobalPartitionInformation清單,即topic與其具體分區資訊的map。

public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
  List<String> topics =  getTopics();
  List<GlobalPartitionInformation> partitions =  new ArrayList<GlobalPartitionInformation>();

  for (String topic : topics) {
      GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
      try {
          int numPartitionsForTopic = getNumPartitions(topic);
          String brokerInfoPath = brokerPath();
          for (int partition = 0; partition < numPartitionsForTopic; partition++) {
              int leader = getLeaderFor(topic,partition);
              String path = brokerInfoPath + "/" + leader;
              try {
                  byte[] brokerData = _curator.getData().forPath(path);
                  Broker hp = getBrokerHost(brokerData);
                  globalPartitionInformation.addPartition(partition, hp);
              } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                  LOG.error("Node {} does not exist ", path);
              }
          }
      } catch (SocketTimeoutException e) {
          throw e;
      } catch (Exception e) {
          throw new RuntimeException(e);
      }
      LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
      partitions.add(globalPartitionInformation);
  }
    return partitions;
}
           

以下内容均是對emitter的介紹

注意,在trident中,每個task負責哪些分區是在storm-core中計算好的,是以在emitter中隻負責處理這個分區的消息就行了,具體來說是在OpaquePartitionedTridentSpoutExecutor.emitBatch()中計算分區的

(三)Emitter : TridentKafkaEmitter結構

TridentKafkaEmitter中有2個内部類,分别對應事務型與透明型的spout。事務型的spout重發batch時必須與上一批次相同,而透明型是沒這個需要的,可以從其它可能的分區中取一批新的資料。

1、offset與nextOffset

消息處理的metaData中儲存了offset與nextOffset2個資料,其中後者一般通過MessageAndOffset#nextOffset()來擷取到。offset表示目前正在處理的消息的offset,nextOffset表示目前消息的下一個offset。舉個例子:

(offset)*這是一批消息**(nextOffset)

是以正常情況下,應該offset

1、事務型的spout

有5個方法,我們這裡先讨論其中2個核心方法。storm根據某個batch是否第一次發送來決定調用哪個方法。

emitPartitionBatchNew()

當某個batch是第一次發送時,調用此方法,這個方法的調用順序為:

emitPartitionBatchNew() —-> failFastEmitNewPartitionBatch() —–> doEmitNewPartitionBatch()

emitPartitionBatch()

當某個batch是重發時,調用此方法,這個方法的調用順序為:

emitPartitionBatch() —–> reEmitPartitionBatch()

2、透明型的spout

透明型的spout不需要保證重發的batch與上一批次是相同的,是以,對于每一次發送都是相同的邏輯即可,不需要管是否第一次發送,它隻有一個發送方法。

emitPartitionBatch()

emitPartitionBatch() —–> emitNewPartitionBatch() —-> failFastEmitNewPartitionBatch() —–> doEmitNewPartitionBatch()

2種類型發送資料時隻終均是調用doEmitNewPartitionBatch(),而透明型的spout在調用之前會先使用emitNewPartitionBatch()來捕獲FailedFetchException,重新擷取一份新的中繼資料,以準備讀取新的消息

storm-kafka源碼分析storm-kafka源碼分析一、概述二、org.apache.storm.kafka三、trident

3、公共方法

除了以上的發送資料方法以外,它們均還有以下3個方法,下面再詳細分析。

@Override
        public void refreshPartitions(List<Partition> partitions) {
            refresh(partitions);
        }

        @Override
        public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
            return orderPartitions(partitionInformation);
        }

        @Override
        public void close() {
            clear();
        }
           

(四)透明型spout

1、emitPartitionBatch()

/**
         * Emit a batch of tuples for a partition/transaction.
         *
         * Return the metadata describing this batch that will be used as lastPartitionMeta
         * for defining the parameters of the next batch.
         */
        @Override
        public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
            return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
        }
           

當需要發送一個新的batch時,storm會調用emitPartitionBatch方法,此方法直接調用emitNewPartitionBatch。

參數說明:

* transactionAttempt,隻有2個成員變量,即long _txId和int _attemptId,即記錄了目前的事務id及已經嘗試的次數。

* tridentCollector,就是用于發送消息的collector。

* partition,表示一個分區,可以了解為kafka的一個分區,有2個成員變量,分别為Broker host和int partition,即kafka的機器與分區id。

* map,用于記錄這個事務的中繼資料,詳細見後面分析。

2、emitNewPartitionBatch()

private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
    try {
        return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
    } catch (FailedFetchException e) {
        LOG.warn("Failed to fetch from partition " + partition);
        if (lastMeta == null) {
            return null;
        } else {
            Map ret = new HashMap();
            ret.put("offset", lastMeta.get("nextOffset"));
            ret.put("nextOffset", lastMeta.get("nextOffset"));
            ret.put("partition", partition.partition);
            ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
            ret.put("topic", _config.topic);
            ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
            return ret;
        }
    }
}
           

很明顯,也隻是簡單調用failFastEmitNewPartitionBatch,但如果擷取消息失敗的話,則會建立一個新中繼資料。

如果lastMeta為null的話,則會直接傳回null,則會從其它地方(如zk)進行初始化(郵見下面的分析);如果不為空,則根據lastMeta的值,根據一個新的中繼資料。中繼資料包括以下幾個字段:

* offset:下一個需要處理的offset

* nextOffset:由于未開始處理batch,是以offset與nextOffset都是同一個值。注意,如果正在處理一個batch,則offset是正在處理的batch的offset,而nextOffset則是下一個需要處理的offset。

* partition:就是哪個分區了

* broker:哪台kafka機器以及端口

* topic:哪個kafka topic

* topology:拓撲的名稱與id。

TODO:ImmutableMap.of()

ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)
           

TODO:如果擷取失敗,哪裡更新了新的分區資訊,是fetch裡面作了處理嗎?後面再看。

3、failFastEmitNewPartitionBatch()

private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
    SimpleConsumer consumer = _connections.register(partition);
    Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
    _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
    return ret;
}
           

先根據partition資訊注冊一個consumer,注意這裡的分區資訊包括了機器、端口還有分區id等。然後就調用doEmitNewPartitionBatch執行實際事務,最後的是metric的使用。

4、doEmitNewPartitionBatch()

(1)确定offset

簡單的說,就是

* 如果lastMeta為空,則從其它地方(如zk)擷取offset;

* 否則,如果目前topoid與之前的不同(表示拓撲重新開機過)而且ignoreZkOffsets為true,則從指定的offset開始;

* 如果目前topoid與之前的相同(表示在持續處理消息中),或者ignoreZkOffsets為false,則從之前的位置繼續處理

long offset;
    //1、如果lastMeta不為空,則:
    if (lastMeta != null) {
        String lastInstanceId = null;
        Map lastTopoMeta = (Map) lastMeta.get("topology");
        if (lastTopoMeta != null) {
            lastInstanceId = (String) lastTopoMeta.get("id");
        }
        //1.1:如果ignoreZkOffsets為true,而且目前拓撲id與之前的id不同時,則從指定的時間點開始擷取消息。
        if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
            offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
        } else {
            //1.2:如果ignoreZkOffsets為false,或者目前拓撲id與之前的id相同(表示拓撲沒有重新開機過,一直在處理消息中),則繼續之前的處理。
            offset = (Long) lastMeta.get("nextOffset");
        }
    } else {
        //2、如果lastMeta為空,則從其它地方(如zk)擷取之前的offset
        offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
    }
           

(2)讀取消息

ByteBufferMessageSet msgs = null;
    try {
        msgs = fetchMessages(consumer, partition, offset);
    } catch (TopicOffsetOutOfRangeException e) {
        long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
        LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
        offset = newOffset;
        msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
    }
           

如果TopicOffsetOutOfRangeException,則從最舊的消息開始讀。

(3)發送消息并更新offset

long endoffset = offset;
    for (MessageAndOffset msg : msgs) {
        emit(collector, msg.message());
        endoffset = msg.nextOffset();
    }
           

每發送一條消息則将endoffset往後移一位,直到發送完時,endoffset就是下一個需要處理的offset。

(4)建構下一個meta并傳回

Map newMeta = new HashMap();
    newMeta.put("offset", offset);
    newMeta.put("nextOffset", endoffset);
    newMeta.put("instanceId", _topologyInstanceId);
    newMeta.put("partition", partition.partition);
    newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
    newMeta.put("topic", _config.topic);
    newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
    return newMeta;
           

關于metric的設定以及讀取kafka消息的實作,下面單獨介紹

(五)事務型spout

1、emitPartitionBatchNew()

當某個batch第一次發送時調用此方法,傳回是這個batch相關的中繼資料,可用于重構這個batch。如果這個batch出錯需要重發,則調用emitPartitionBatch(),下面再介紹。

/**
         * Emit a batch of tuples for a partition/transaction that's never been emitted before.
         * Return the metadata that can be used to reconstruct this partition/batch in the future.
         */
        @Override
        public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
            return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
        }
           

與透明型不同的是,它沒有捕獲FailedFetchException這個異常,是以出現擷取消息失敗時,會一直等待某個分區恢複。其它處理邏輯與透明型相同,參考上面的介紹即可。

2、emitPartitionBatch()

/**
         * Emit a batch of tuples for a partition/transaction that has been emitted before, using
         * the metadata created when it was first emitted.
         */
        @Override
        public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
            reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
        }
           

當一個batch之前已經發送過,但失敗了,則調用此方法重試。

3、reEmitPartitionBatch()

重試發送消息的主要實作,邏輯也相對簡單。

直接去fetch消息。如果消息不為空的話,則判斷offset:

* 如果offset與nextoffset相等,則表示消息已經處理完了

* 如果offset>nextOffset,則出錯了,抛出以下運作時異常:

throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
           

最後發送消息,并更新nextOffset。

完整代碼如下:

private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
    LOG.info("re-emitting batch, attempt " + attempt);
    String instanceId = (String) meta.get("instanceId");
    if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
        SimpleConsumer consumer = _connections.register(partition);
        long offset = (Long) meta.get("offset");
        long nextOffset = (Long) meta.get("nextOffset");
        ByteBufferMessageSet msgs = null;
        msgs = fetchMessages(consumer, partition, offset);

        if(msgs != null) {
            for (MessageAndOffset msg : msgs) {
                if (offset == nextOffset) {
                    break;
                }
                if (offset > nextOffset) {
                    throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
                }
                emit(collector, msg.message());
                offset = msg.nextOffset();
            }
        }
    }
}
           

(六)2種spout的公共方法

1、refreshPartitions()

根據注釋可知,當處理一些新的分區時,管理到這些分區的連接配接資訊。

/**
         * This method is called when this task is responsible for a new set of partitions. Should be used
         * to manage things like connections to brokers.
         */
        @Override
        public void refreshPartitions(List<Partition> partitions) {
            refresh(partitions);
        }
           

2、getOrderedPartitions()

getOrderedPartitions()方法會在分區中繼資料發生變化(即Partitions發生變化)時被調用。該方法與refreshPartitions()方法調用時機相同,用來應對分區的變化。例如,建立并維護與新增加Partitions的連接配接時就可以使用這個方法。

3、close()

看下面的實作,其實refreshPartitions()和close()都隻是簡單的清空了連接配接,而getOrderedPartitions是擷取分區資訊。

private void clear() {
    _connections.clear();
}

private List<Partition> orderPartitions(GlobalPartitionInformation partitions) {
    return partitions.getOrderedPartitions();
}

private void refresh(List<Partition> list) {
    _connections.clear();
    _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
}
           

4、Partitions與Partition

Partitions含義為分區的中繼資料,如一共存在多少個分區,分區所在的broker等,具體資訊由使用者定義,不過這些資訊一般是比較穩定的。在kafka中,是通過以下代碼指定的:

new ZkHosts(brokerHosts)
           

看如何将zk中的資訊導入Partitions的:

Partition則是某個具體的分區了。

在coordinator的getPartitionsForBatch()中指定。

(七)fetch消息的邏輯

_connection包括了一些連接配接資訊,如broker,端口,分區id等,通過它可以擷取到一個simpleConsumer,下面重點分析這個擷取消息的過程。

msgs = fetchMessages(consumer, partition, offset);
           

1、fetchMessages()

private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
    long start = System.nanoTime();
    ByteBufferMessageSet msgs = null;
    msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
    long end = System.nanoTime();
    long millis = (end - start) / 1000000;
    _kafkaMeanFetchLatencyMetric.update(millis);
    _kafkaMaxFetchLatencyMetric.update(millis);
    return msgs;
}
           

主要調用 KafkaUtils.fetchMessages(_config, consumer, partition, offset);其餘代碼用于更新metric,統計擷取消息的平均時長以及最大時長。

2、KafkaUtil.fetchMessages()

邏輯很簡單,建構一個FetchRequest,然後得到FetchResponse。此外就是一些處理異常的代碼了

public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)
        throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException {
    ByteBufferMessageSet msgs = null;
    String topic = config.topic;
    int partitionId = partition.partition;
    FetchRequestBuilder builder = new FetchRequestBuilder();
    FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
            clientId(config.clientId).maxWait(config.fetchMaxWait).build();
    FetchResponse fetchResponse;
    try {
        fetchResponse = consumer.fetch(fetchRequest);
    } catch (Exception e) {
        if (e instanceof ConnectException ||
                e instanceof SocketTimeoutException ||
                e instanceof IOException ||
                e instanceof UnresolvedAddressException
                ) {
            LOG.warn("Network error when fetching messages:", e);
            throw new FailedFetchException(e);
        } else {
            throw new RuntimeException(e);
        }
    }
    if (fetchResponse.hasError()) {
        KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
        if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
            String msg = "Got fetch request with offset out of range: [" + offset + "]";
            LOG.warn(msg);
            throw new TopicOffsetOutOfRangeException(msg);
        } else {
            String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
            LOG.error(message);
            throw new FailedFetchException(message);
        }
    } else {
        msgs = fetchResponse.messageSet(topic, partitionId);
    }
    return msgs;
}
           

(八)KafkaOffsetMetric

TODO:還有其它metric吧

storm-kafka中定義了一個metric用來計算目前正在處理的offset與最新的offset之間有多少差距,即落後了多少條資料。

這個類定義在KafkaUtil中,主要有2個核心變量:

_partitionToOffset是一個hashMap,内容為(分區,正在處理的offset)

_partitions就是_partitionToOffset的key組成的一個集合。

public static class KafkaOffsetMetric implements IMetric {
    Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
    Set<Partition> _partitions;
    String _topic;
    DynamicPartitionConnections _connections;

    public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
        _topic = topic;
        _connections = connections;
    }

    public void setLatestEmittedOffset(Partition partition, long offset) {
        _partitionToOffset.put(partition, offset);
    }

    @Override
    public Object getValueAndReset() {
        try {
            long totalSpoutLag = 0;
            long totalEarliestTimeOffset = 0;
            long totalLatestTimeOffset = 0;
            long totalLatestEmittedOffset = 0;
            HashMap ret = new HashMap();
            if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
                for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
                    Partition partition = e.getKey();
                    SimpleConsumer consumer = _connections.getConnection(partition);
                    if (consumer == null) {
                        LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                        return null;
                    }
                    long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
                    long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
                    if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
                        LOG.warn("No data found in Kafka Partition " + partition.getId());
                        return null;
                    }
                    long latestEmittedOffset = e.getValue();
                    long spoutLag = latestTimeOffset - latestEmittedOffset;
                    ret.put(_topic + "/" + partition.getId() + "/" + "spoutLag", spoutLag);
                    ret.put(_topic + "/" + partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
                    ret.put(_topic + "/" + partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
                    ret.put(_topic + "/" + partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
                    totalSpoutLag += spoutLag;
                    totalEarliestTimeOffset += earliestTimeOffset;
                    totalLatestTimeOffset += latestTimeOffset;
                    totalLatestEmittedOffset += latestEmittedOffset;
                }
                ret.put(_topic + "/" + "totalSpoutLag", totalSpoutLag);
                ret.put(_topic + "/" + "totalEarliestTimeOffset", totalEarliestTimeOffset);
                ret.put(_topic + "/" + "totalLatestTimeOffset", totalLatestTimeOffset);
                ret.put(_topic + "/" + "totalLatestEmittedOffset", totalLatestEmittedOffset);
                return ret;
            } else {
                LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
            }
        } catch (Throwable t) {
            LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
        }
        return null;
    }

    public void refreshPartitions(Set<Partition> partitions) {
        _partitions = partitions;
        Iterator<Partition> it = _partitionToOffset.keySet().iterator();
        while (it.hasNext()) {
            if (!partitions.contains(it.next())) {
                it.remove();
            }
        }
    }
}
           

這個metric隻在2個地方被調用:

(1)第一次讀取一個分區時

_kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
           

(2)refresh時

_kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
           

refreshPartitions()時會調用refresh方法。This method is called when this task is responsible for a new set of partitions. Should be used to manage things like connections to brokers.