org.apache.flume.sink.AvroSink是用來通過網絡來傳輸資料的,可以将event發送到RPC伺服器(比如AvroSource),使用AvroSink和AvroSource可以組成分層結構。它繼承自AbstractRpcSink
extends AbstractSink implements Configurable這跟其他的sink一樣都得extends
AbstractSink implements
Configurable,是以重點也在confgure、start、process、stop這四個方法,實作了initializeRpcClient(Properties
props)方法。
一、configure(Context
context)方法,先擷取配置檔案中的主機hostname和端口port;設定clientProps的屬性hosts=h1,hosts.h1=hostname:port;然後将配置資訊中的所有資訊放入clientProps中;擷取cxnResetInterval表示重複建立連接配接的時間間隔,預設是0就是不重複建立連接配接。
二、start()方法是調用createConnection()建立連接配接,如果出現異常就調用destroyConnection()掐斷連接配接,避免資源洩漏。createConnection()方法主要是初始化client
=
initializeRpcClient(clientProps)以及建立一個線程,并執行在給定延遲cxnResetInterval後執行一次銷毀連結destroyConnection(),由于預設cxnResetInterval=0,是以是不會執行這個線程的。這點不是很明白,為什麼要銷毀???initializeRpcClient(clientProps)方法會根據配置檔案中的資訊進行構造相應的RpcClient:首先會擷取"client.type"參數指定的類型可用的有四種(NettyAvroRpcClient(如果沒有"client.type"則使用這個作為預設Client)、FailoverRpcClient、LoadBalancingRpcClient、ThriftRpcClient),執行個體化之後需要對其在進行必要的配置執行client.configure(properties)進行配置:
(1)NettyAvroRpcClient.configure(Properties
properties)方法首先會擷取鎖,檢查connState連接配接狀态要保證是沒有配置過的;其次擷取"batch-size"設定batchSize,如果配置的小于1則使用預設值100;擷取“hosts”,如果配置了多個hosts則隻使用第一個;擷取"hosts."字首,如果有多個則使用第一個,再解析出hostname和port,建構一個InetSocketAddress的對象address;擷取連接配接逾時時間"connect-timeout",設定connectTimeout,如果配置的小于1000則使用預設值20000,機關是ms;擷取相應時間"request-timeout",設定requestTimeout,如果配置的小于1000,則使用預設值20000,機關ms;擷取壓縮類型"compression-type",如果有配置壓縮還需要擷取壓縮的等級compressionLevel;最後調用connect()連結RPC伺服器。
實際的連結在connect(long timeout, TimeUnit
tu)方法中,先構造一個線程池callTimeoutPool;然後根據是否有壓縮構造相應的工廠類CompressionChannelFactory(有壓縮配置)或者NioClientSocketChannelFactory(無壓縮配置);構造一個
NettyTransceiver(this.address,socketChannelFactory,tu.toMillis(timeout))收發器對象transceiver;根據transceiver傳回一個avroClient;最後設定連結狀态為READY。
(2)FailoverRpcClient.configure(Properties
properties)方法會調用configureHosts(Properties
properties)方法,這個方法會擷取配置檔案中的host清單hosts;擷取最大嘗試次數"max-attempts",設定maxTries,預設是hosts的大小;擷取批量大小
"batch-size",設定batchSize,如果配置的小于1則使用預設大小100;将此client置為活動的isActive=true。可以看出這個client可以使用多個host。
(3)LoadBalancingRpcClient.configure(Properties
properties)會擷取配置檔案中的host清單hosts,且不允許少于兩個,否則爆異常;擷取主機選擇器"host-selector",有兩種内置的選擇器:LoadBalancingRpcClient.RoundRobinHostSelector和LoadBalancingRpcClient.RandomOrderHostSelector,預設是ROUND_ROBIN(即RoundRobinHostSelector)輪詢的方式(也可以自定義,要實作LoadBalancingRpcClient.HostSelector接口);擷取"backoff",設定backoff(是否使用推遲算法,就是sink.process出問題後對這個sink設定懲罰時間,在此期間不再認為其可活動)的boolean值(預設false就是不啟用);擷取最大推遲時間"maxBackoff",設定maxBackoff;然後根據選擇器是ROUND_ROBIN還是RANDOM選擇對應的類并執行個體化selector,最後設定主機selector.setHosts(hosts)。
這兩個内置選擇器:RoundRobinHostSelector實際使用的是RoundRobinOrderSelector;RandomOrderHostSelector實際使用的是RandomOrderSelector,這兩個都在 這篇文章中有介紹,這裡不再說明。
(4)ThriftRpcClient.configure(Properties
properties)會擷取狀态鎖stateLock.lock();擷取配置檔案中的host清單中的第一個,隻需要一個;擷取批量大小"batch-size",設定batchSize,如果配置的小于1則使用預設大小100;擷取主機名hostname和端口port;擷取響應時間requestTimeout,如果小于1000設定為預設的20000ms;擷取連接配接池大小"maxConnections",設定connectionPoolSize,如果大小小于1則設定為預設的值5;建立連接配接池管理對象connectionManager=
new ConnectionPoolManager(connectionPoolSize);設定連接配接狀态為READY,connState =
State.READY;最後狀态鎖解鎖stateLock.unlock()。
這四個Client都是extends AbstractRpcClient implements RpcClient。
三、process()方法,代碼如下:
即使本批次event的數量達不到client.getBatchSize()(channel中沒資料了)也會立即發送到RPC伺服器。verifyConnection()方法是确儲存在連結且處于活動狀态,如果連結處于非活動狀态銷毀并重建連結。如果本批次沒有event,則不會想RPC發送任何資料。client.appendBatch(batch)方法是批量發送event。
(1)NettyAvroRpcClient.appendBatch(batch)方法會調用appendBatch(events,
requestTimeout,
TimeUnit.MILLISECONDS)方法,該方法會首先确認連結處于READY狀态,否則報錯;然後将每個event重新封裝成AvroFlumeEvent,放入avroEvents清單中;然後構造一個CallFuture和avroEvents一同封裝成一個Callable放入線程池 handshake
= callTimeoutPool.submit(callable)中去執行,其call方法内容是avroClient.appendBatch(avroEvents,
callFuture)就是在此批量送出到RPC伺服器;然後handshake.get(connectTimeout,
TimeUnit.MILLISECONDS)在規定時間等待執行的傳回結果以及等待append的完成waitForStatusOK(callFuture,
timeout, tu),詳細的可看這裡 ,有對于這兩個future更深入的分析。一個批次傳輸的event的數量是min(batchSize,events.size())
(2)FailoverRpcClient.appendBatch(batch)方法會做最多maxTries次嘗試直到擷取到可以正确發送events的Client,通過localClient=getClient()--》getNextClient()來擷取client,這個方法每次會擷取hosts中的下一個HostInfo,并使用NettyAvroRpcClient來作為RPC
Client,這就又回到了(1)中,這個方法還有一個要注意的就是會先從目前的lastCheckedhost+1位置向後找可以使用的Client,如果不行會再從開始到到lastCheckedhost再找,再找不到就報錯。使用localClient.appendBatch(events)來處理events,可參考(1)。
(3)LoadBalancingRpcClient.appendBatch(batch)方法,首先會擷取可以發送到的RPC伺服器的疊代器Iterator<HostInfo> it
= selector.createHostIterator();然後取一個HostInfo,RpcClient client =
getClient(host)這個Client和(2)一樣都是NettyAvroRpcClient,但是getClient方法會設定一個儲存名字和client映射的clientMap;client.appendBatch(events)執行之後就會跳出循環,下一次appendBatch會選擇下一個client執行。
(4)ThriftRpcClient.appendBatch(batch)方法,從connectionManager.checkout()擷取一個client,ConnectionPoolManager類主要維護倆對象availableClients用來存放可用的client(是一個ClientWrapper,維護一個ThriftSourceProtocol.Client
client
是用來批量處理event的)、checkedOutClients用來存儲從availableClients中拿出的Client表示正在使用的Client;ConnectionPoolManager.checkout()用于從availableClients中remove出client并放入checkedOutClients中,傳回這個client;ConnectionPoolManager.checkIn(ClientWrapper
client)方法用于将指定的Client從checkedOutClient中remove出并放入availableClients中;ConnectionPoolManager.destroy(ClientWrapper
client)用于将checkedOutClients中的指定Client
remove并close。appendBatch方法中獲得client後,會每次封裝min(batchSize,events.size())個event,把他們封裝成ThriftFlumeEvent加入thriftFlumeEvents清單,然後如果thriftFlumeEvents>0則執行doAppendBatch(client,
thriftFlumeEvents).get(requestTimeout,TimeUnit.MILLISECONDS)阻塞等待傳輸完畢。doAppendBatch方法會建構一個Callable其call方法執行client.client.appendBatch(e),将這個Callable放入線程池callTimeoutPool中執行并傳回執行結果Future。
以上四種RpcClient的append(Event event)方法也比較容易了解,不再講述。
四、stop()方法主要是銷毀連結,關閉cxnResetExecutor。
其實flume支援avro和thrift兩種(目前)傳輸,上面的(2)和(3)隻不過是對(1)的上層業務做了一次封裝而已,本質上還是一樣的都是avro(基于netty)。同時記住avrosink是支援壓縮的。
在此,由于部落客對avro、netty、thrift并未深入研究過,是以隻能從flume層面講解avrosink,對于某些人來說,可能講的并不深入,相關内容請自行學習!!