天天看點

flume 高可用性 高可靠性 agent source

--------沒有整理過的,稍後進行整理

生詞表

consumes  消耗,

delivered   傳遞,傳遞

passive    被動

encapsulate  封裝  enˈkaps(y)əˌlāt

riˈtrēvəl   retrieval  回複

ensures   保證 

temporarily  暫時的 

causing  造成

exponentially  指數型的增

An Event is a unit of data that flows through a Flume agent. The Event flows from Source to Channel to Sink,

 and is represented by an implementation of the Event interface. An Event carries a payload (byte array)

 that is accompanied by an optional set of headers (string attributes).

 A Flume agent is a process (JVM) that hosts the components that allow Events to flow from an external source to a external destination

 翻譯:Flume agent(用戶端)是把事件作為最小的資料機關,一個事件流從source(源端),傳遞到channel(通道),在到達sink(目标端),是基于事件接口的實作完成的,一個

 事件攜帶者必要的全部資訊(以位元組資料方式傳遞),最終被編譯成可選的資料集頭檔案(字元串),Flume 的agent(用戶端)是執行在JVM容器中的組建,同時也允許事件從外部的soure(源端)

 到外部的目标端.

 ------------------------------------------------------------------------------------------------------------------------

A Source consumes Events having a specific format, and those Events are delivered to the Source by an external

source like a web server. For example, an AvroSource can be used to receive Avro Events from clients or from other

 Flume agents in the flow. When a Source receives an Event, it stores it into one or more Channels.

  The Channel is a passive store that holds the Event until that Event is consumed by a Sink. One type of Channel

  available in Flume is the FileChannel which uses the local filesystem as its backing store. A Sink is responsible

  for removing an Event from the Channel and putting it into an external repository like HDFS (in the case of an HDFSEventSink)

   or forwarding it to the Source at the next hop of the flow. The Source and Sink within the given agent run asynchronously with the Events staged in the Channel.

  翻譯:在源端所使用的事件有指定的格式,這些事件的産生是用外部的源提供或産生的如,一個web容器 ,舉個例子來說 ,一個AvroSource類型的源能夠用于 接受 Avro事件 ,可以

  是其他的agent上發出的流,當一個 源接受到 一個事件 ,就會将相關的事件資訊存放到 一個 或者獲得通道中 ,通道 是被動是的存儲 ,被通道接收到的事件

  流可以被 sink 端進行消費

  ,在 通道 channel 中 檔案通道是很有效的 ,将事件流資訊存放到本地的檔案系統作為備用的存儲 ,一個 sink 元件也可以相應從 通道中移除一個事件 ,可以将流的資訊存放到像分布式存儲

  檔案系統 ,HDFS 中(前提是将sink 配置隻想HDFS,類型為HDFSEventSink),也可以将目标指向下一個流的源。對于源和目标(source and sink),通過異步的方式運作于管道上。

  -----------------------------------------------------------------------------------------------------------------------------

  Reliability

An Event is staged in a Flume agent’s Channel. Then it’s the Sink‘s responsibility to deliver the Event to the next agent or

 terminal repository (like HDFS) in the flow. The Sink removes an Event from the Channel only after the Event is stored into the Channel

 of the next agent or stored in the terminal repository. This is how the single-hop message delivery semantics in

  Flume provide end-to-end reliability of the flow. Flume uses a transactional approach to guarantee the reliable

   delivery of the Events. The Sources and Sinks encapsulate the storage/retrieval of the Events in a Transaction

    provided by the Channel. This ensures that the set of Events are reliably passed from point to point in the flow.

     In the case of a multi-hop flow, the Sink from the previous hop and the Source of the next hop both have their

     Transactions open to ensure that the Event data is safely stored in the Channel of the next hop.

 在一個agent(用戶端)的通道内,事件在通道内承載運作對于目标(sink)響應被傳送到下一個agent(用戶端或代理)或者最終存儲在向HDFS 上。隻有當事件被存放到下一個agent 的通道或是

 最終存儲到最終的資源庫中 ,sink 的事件才被從通道内移除。這是為什麼flume 能夠保證端到端的傳輸過程高可靠性。完成的傳遞事件流。 Flume 使用傳動的高可靠的傳遞事件。

 Flume 使用傳統的方式,通過通道提供事務一緻性的,并保證了source 和sink資訊傳遞的高高可靠性。也確定了事件的集合資料從一個資料節點傳到下一資料節點的可靠性傳遞

 ,事件的資料通過通道的方式從一端傳到另一端,從前一個agent 的sink 到下一個agent 的source ,都在一個事務中,確定了可靠性。

 大白話的翻譯: 在一個agent 上能夠保證事務的一緻性,通過通道實作的,當一個事件流被消費的時候或是移除(從sink ),在發一一個事件給channel ,這個時候才把事件從對應的channel 中删除

 對于從一個agent sink 我兩一個 agent 源的情況,事務的一緻性也是通過channel 來實作的。 從分保證了高可靠性

 ---------------------------------------------------------------------

 Getting the source  如何擷取源代碼

Check-out the code using Git. Click here for the git repository root.

點選連結獲得git資源庫的根目錄,使用git 進行下載下傳

The Flume 1.x development happens under the branch “trunk” so this command line can be used:

git clone https://git-wip-us.apache.org/repos/asf/flume.git

如果使用指令行的方式擷取1.x的trunk下的開發版本,請執行指令 git clone https://git-wip-us.apache.org/repos/asf/flume.git

 ---------------------------------------------------------------------  ---------------------------------------------------------------------

Compile/test Flume  編譯和測試 flume

 The Flume build is mavenized. You can compile Flume using the standard Maven commands:

因 flume是 maven項目 ,你可以編譯 flume 使用标準的maven 指令

Compile only: mvn clean compile

僅僅編譯 :  mvn clean compile

Compile and run unit tests: mvn clean test

編譯并且運作 測試測試 :  mvn clean test

Run individual test(s): mvn clean test -Dtest=<Test1>,<Test2>,... -DfailIfNoTests=false

運作私人 的測試用例 :   mvn clean test -Dtest=<Test1>,<Test2>,... -DfailIfNoTests=false

Create tarball package: mvn clean install

建立 壓縮包 :  執行 mvn clean install

Create tarball package (skip unit tests): mvn clean install -DskipTests

如果想跳過 測試但願 執行 :  mvn clean install  -DskipTests

Please note that Flume builds requires that the Google Protocol Buffers compiler be in the path. You can download and install it by following the instructions here.

記住 flume 的 建構是 必須在google 的protocol buffers的 jar 放到項目的環境變量中 ,你可以下載下傳和初始化 按照下面的文檔 。

-----------------------------------------------------------------------------

Developing custom components

Client

The client operates at the point of origin of events and delivers them to a Flume agent.

Clients typically operate in the process space of the application they are consuming data from.

Flume currently supports Avro, log4j, syslog, and Http POST (with a JSON body) as ways to transfer data from a external source.

 Additionally, there’s an ExecSource that can consume the output of a local process as input to Flume.

It’s quite possible to have a use case where these existing options are not sufficient.

 In this case you can build a custom mechanism to send data to Flume. There are two ways of achieving this.

  The first option is to create a custom client that communicates with one of Flume’s existing Sources like AvroSource or SyslogTcpSource.

  Here the client should convert its data into messages understood by these Flume Sources.

  The other option is to write a custom Flume Source that directly talks with your existing client application using some IPC or RPC protocol,

   and then converts the client data into Flume Events to be sent downstream.

    Note that all events stored within the Channel of a Flume agent must exist as Flume Events.

開發 用戶端元件

用戶端

對于原始的 事件流的操作和傳輸 給 flume 的agent ,用戶端的典型的操作和傳遞消費的資料 是來自應用執行的時候 ,在目前的版本中 ,Flume支援的 AVRO,log4j,syslog,

和 HttpPOST(JSON bOGY)HTTP json 格式的請求 ,通用方式可以傳輸外部的資料

另外 本地的程式産生的輸出也可以 作為flume 的的資料 。

如果在實際的案例中使用存在的選項可能是不完全能夠滿足需求的 ,在這種情況下你可以使用用戶端機制 ,去發送資料到flume中,有兩種方式可以實作這個功能

一個選則是 建立客戶化的用戶端 ,是喲功能以及功能存在 想 AVROSOURCE HUOSHI SYSLOG TCPsource 。 這樣 用戶端會轉換為各自的資料 并發送flume 能讀懂的消息或是接受 SOURCE

另一種選擇是先用戶端 flume source ,直接的和已經存在的用戶端驚醒溝通,使用ipc 或是 rpc protocol這樣也能夠 吧clientdata 放到flume 的流中發送 。

注意 不是所有的事件在flume 的agent的通道中存儲 ,必須是存在的flume 的事件流 。

----------------------------------------------------------------------------------------------------

Client SDK  用戶端的開發工具

Though Flume contains a number of built-in mechanisms (i.e. Sources) to ingest data,

often one wants the ability to communicate with Flume directly from a custom application.

The Flume Client SDK is a library that enables applications to connect to Flume and send data into Flume’s data flow over RPC.

通常想要一個用戶端的應用程式與flume進行資料通信的 ,盡管flume包含了通信的機制,通常情況下 ,flume 的client 端的開發工具以jarlib 的確定應用正常連接配接flume 和通過 RPC的方式

将資料傳遞到flume 的資料流中

RPC client interface

An implementation of Flume’s RpcClient interface encapsulates the RPC mechanism supported by Flume.

 The user’s application can simply call the Flume Client SDK’s append(Event) or appendBatch(List<Event>) to send data

  and not worry about the underlying message exchange details. The user can provide the required Event arg by either directly

   implementing the Event interface, by using a convenience implementation such as the SimpleEvent class, or

   by using EventBuilder‘s overloaded withBody() static helper methods.

   RPC 的用戶端借口

 flume 支援通過實作Rpc用戶端的接口的方式在完成RPC的溝通機制 ,使用者可以使用簡單的電泳flume 的sdk 的用戶端的方式,把時間或是批量的事件集發送資料到 flume中

也不需要了解底層的轉換的詳細資訊 ,使用者可以通過向simpleEvet這樣實作類來很友善的實作事件借口,通過時間加載器 的從在的withbody的方法 ,

 提供請求時間的數組通過通過實作事件的接口 。調用靜态的幫助的方法 。沖在withbody()方法。

 ----------------------------------------------------------------------------------------------------------------------

 RPC clients - Avro and Thrift

As of Flume 1.4.0, Avro is the default RPC protocol.

The NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface.

 The client needs to create this object with the host and port of the target Flume agent,

 and can then use the RpcClient to send data into the agent.

 The following example shows how to use the Flume Client SDK API within a user’s data-generating application:

   RPC用戶端--Avro和Thrift

   像Flume1.4.0 的版本 ,預設的Avro 是預設的RPC 協定 。,

   nettyavrorpclient 和 thriftrpc用戶端都實作了 rpcclient 借口哦iu,

   這中類型的用戶端都需要 建立帶有主機和端口(目标位址)的對flumeagent 的對象

   就可以 使用Rpc用戶端發送資料到 agent 中 ,接下來的例子講像你閃失如何 讓使用者的應用生成資料 并調用sdkapi

   ----------------------------------------------------------------------------------------------------------

   代碼就不翻譯了代價都能看懂

   --------------------------------------------------------------------------------------------------------

  The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client)

   listening on some port. Below is an example Flume agent configuration that’s waiting for a connection from MyApp:

   遠端的flume 的agent 需要有一個 avrosource (或是使用thriftclient),監聽一些端口 ,下面是 flume agent 配置 ,waiting 從myapp 進行 連接配接的例子

   -------------------------------------------------------------------------------

   import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.api.RpcClient;

import org.apache.flume.api.RpcClientFactory;

import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;

public class MyApp {

  public static void main(String[] args) {

    MyRpcClientFacade client = new MyRpcClientFacade();

    // Initialize client with the remote Flume agent's host and port

    client.init("host.example.org", 41414);

    // Send 10 events to the remote Flume agent. That agent should be

    // configured to listen with an AvroSource.

    String sampleData = "Hello Flume!";

    for (int i = 0; i < 10; i++) {

      client.sendDataToFlume(sampleData);

    }

    client.cleanUp();

  }

}

class MyRpcClientFacade {

  private RpcClient client;

  private String hostname;

  private int port;

  public void init(String hostname, int port) {

    // Setup the RPC connection

    建立 rpc 連接配接

    this.hostname = hostname;

    this.port = port;

    this.client = RpcClientFactory.getDefaultInstance(hostname, port);

    // Use the following method to create a thrift client (instead of the above line):

    使用下面的方法去建立一個thrift的用戶端

    // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  }

  public void sendDataToFlume(String data) {

    // Create a Flume Event object that encapsulates the sample data

    建立一個內建了例子資料段額事件對象

    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event

    try {

      client.append(event);

    } catch (EventDeliveryException e) {

      // clean up and recreate the client

      如果失敗了,重新建立用戶端

      client.close();

      client = null;

      client = RpcClientFactory.getDefaultInstance(hostname, port);

      // Use the following method to create a thrift client (instead of the above line):

      // this.client = RpcClientFactory.getThriftInstance(hostname, port);

    }

  }

  public void cleanUp() {

    // Close the RPC connection

    client.close();

  }

}

The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client) listening on some port.

 Below is an example Flume agent configuration that’s waiting for a connection from MyApp:

 調用遠端的flume 的agent 需要有一個Avro source 或是一個 ThriftSOURCE ,要不停的對端口進行監聽

 像下面的flume 的執行個體的agent 配置,能夠連接配接到我app

a1.channels = c1

a1.sources = r1

a1.sinks = k1

a1.channels.c1.type = memory

a1.sources.r1.channels = c1

a1.sources.r1.type = avro

# For using a thrift source set the following instead of the above line.

# a1.source.r1.type = thrift

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 41414

a1.sinks.k1.channel = c1

a1.sinks.k1.type = logger

For more flexibility, the default Flume client implementations (NettyAvroRpcClient and ThriftRpcClient) can be configured with these properties:

為了提高适應性 ,預設的flume 用戶端時間 被配置在屬性檔案中

client.type = default (for avro) or thrift (for thrift)

hosts = h1                           # default client accepts only 1 host

                                     # (additional hosts will be ignored)

hosts.h1 = host1.example.org:41414   # host and port must both be specified

                                     # (neither has a default)

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

-----------------------------------------------------------------

Failover Client

This class wraps the default Avro RPC client to provide failover handling capability to clients. This takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a failover group. The Failover RPC Client currently does not support thrift. If there’s a communication error with the currently selected host (i.e. agent) agent, then the failover client automatically fails-over to the next host in the list. For example:

// Setup properties for the failover

Properties props = new Properties();

props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)

props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias

String host1 = "host1.example.org:41414";

String host2 = "host2.example.org:41414";

String host3 = "host3.example.org:41414";

props.put("hosts.h1", host1);

props.put("hosts.h2", host2);

props.put("hosts.h3", host3);

// create the client with failover properties

RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the failover Flume client implementation (FailoverRpcClient) can be configured with these properties:

client.type = default_failover

hosts = h1 h2 h3                     # at least one is required, but 2 or

                                     # more makes better sense

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

max-attempts = 3                     # Must be >=0 (default: number of hosts

                                     # specified, 3 in this case). A '0'

                                     # value doesn't make much sense because

                                     # it will just cause an append call to

                                     # immmediately fail. A '1' value means

                                     # that the failover client will try only

                                     # once to send the Event, and if it

                                     # fails then there will be no failover

                                     # to a second client, so this value

                                     # causes the failover client to

                                     # degenerate into just a default client.

                                     # It makes sense to set this value to at

                                     # least the number of hosts that you

                                     # specified.

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

---------------------------------------------------------------------------故障轉移的配置----------------

Failover Client  故障轉移用戶端

This class wraps the default Avro RPC client to provide failover handling capability to clients.

This takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a

 failover group. The Failover RPC Client currently does not support thrift.

 If there’s a communication error with the currently selected host (i.e. agent) agent,

 then the failover client automatically fails-over to the next host in the list. For example:

這類事包裝了預設的avro rpc 的用戶端程式,提供了對client端故障轉移的處理

這個類用空白行在分割一個list清單  去代替flume的用戶端建立一個故障轉移組 ,出現故障的用戶端不支援thrit 的方式,如果目前的版本出現

故障類的RPC,請選擇主機 HOST ,這樣出現故障的機器自動的加入了下一步的清單中

// Setup properties for the failover

Properties props = new Properties();

props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)

props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias

String host1 = "host1.example.org:41414";

String host2 = "host2.example.org:41414";

String host3 = "host3.example.org:41414";

props.put("hosts.h1", host1);

props.put("hosts.h2", host2);

props.put("hosts.h3", host3);

// create the client with failover properties

RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the failover Flume client implementation (FailoverRpcClient) can be configured with these properties:

client.type = default_failover

hosts = h1 h2 h3                     # at least one is required, but 2 or

                                     # more makes better sense

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

max-attempts = 3                     # Must be >=0 (default: number of hosts

                                     # specified, 3 in this case). A '0'

                                     # value doesn't make much sense because

                                     # it will just cause an append call to

                                     # immmediately fail. A '1' value means

                                     # that the failover client will try only

                                     # once to send the Event, and if it

                                     # fails then there will be no failover

                                     # to a second client, so this value

                                     # causes the failover client to

                                     # degenerate into just a default client.

                                     # It makes sense to set this value to at

                                     # least the number of hosts that you

                                     # specified.

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

-----------------------------------------------------------------------------------------------------------------

LoadBalancing RPC client  負載均衡的RPC 用戶端的使用

The Flume Client SDK also supports an RpcClient which load-balances among multiple hosts.

 This type of client takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a load-balancing group.

 This client can be configured with a load balancing strategy that either randomly selects

  one of the configured hosts, or selects a host in a round-robin fashion.

  You can also specify your own custom class that implements the LoadBalancingRpcClient$HostSelector interface so that a custom selection order is used.

  In that case, the FQCN of the custom class needs to be specified as the value of the host-selector property.

   The LoadBalancing RPC Client currently does not support thrift.

If backoff is enabled then the client will temporarily blacklist hosts that fail,

 causing them to be excluded from being selected as a failover host until a given timeout.

  When the timeout elapses, if the host is still unresponsive then this is considered a sequential failure,

  and the timeout is increased exponentially to avoid potentially getting stuck in long waits on unresponsive hosts.

如果 回退被啟用了 ,用戶端将暫時的放到到主機失敗的鍊中将一直放到 傳回timeout 的時候 ,否則将是作為故障轉移的機器被選擇

這個主機仍然是不可相應的,因為考慮到一個序列失敗的問題 ,逾時可以之屬性的增長了為了避免對一個不可相應的主機的長時間潛在的無效的等待

The maximum backoff time can be configured by setting maxBackoff (in milliseconds).

最大的回相應的時間可以配置到 檔案中

The maxBackoff default is 30 seconds (specified in the OrderSelector class that’s the superclass of both load balancing strategies).

預設的是30秒 ,可以指定提供其的響應的時間 通過超類中的負載存儲政策

The backoff timeout will increase exponentially with each sequential failure up to the maximum possible backoff timeout.

The maximum possible backoff is limited to 65536 seconds (about 18.2 hours). For example:

flume 的sdk 也同樣支援負載均衡在多個主機清單中 ,這種類型的client 需要 用空白行分開的清單來實作,

這樣的client 能夠通過配置的方式實作,夾在不同的負載存儲,也可以支援随機的選擇一個節點的配置主機清單,或者選擇一個主機在整個國政中循環的調用一個host

你可以指定你自己的客戶化的類來實作負載均衡器的借口,那樣客戶化的選擇其别應用 ,

在這種情況下 ,fqcn 客戶化的淚,需要填寫 host選擇其的屬性檔案 ,負載均很的RPC用戶端 不支援 thrift 的方式

// Setup properties for the load balancing

Properties props = new Properties();

props.put("client.type", "default_loadbalance");

// List of hosts (space-separated list of user-chosen host aliases)

props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias

String host1 = "host1.example.org:41414";

String host2 = "host2.example.org:41414";

String host3 = "host3.example.org:41414";

props.put("hosts.h1", host1);

props.put("hosts.h2", host2);

props.put("hosts.h3", host3);

props.put("host-selector", "random"); // For random host selection

// props.put("host-selector", "round_robin"); // For round-robin host

//                                            // selection

props.put("backoff", "true"); // Disabled by default.

props.put("maxBackoff", "10000"); // Defaults 0, which effectively

                                  // becomes 30000 ms

// Create the client with load balancing properties

RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the load-balancing Flume client implementation (LoadBalancingRpcClient) can be configured with these properties:

client.type = default_loadbalance

hosts = h1 h2 h3                     # At least 2 hosts are required

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

backoff = false                      # Specifies whether the client should

                                     # back-off from (i.e. temporarily

                                     # blacklist) a failed host

                                     # (default: false).

maxBackoff = 0                       # Max timeout in millis that a will

                                     # remain inactive due to a previous

                                     # failure with that host (default: 0,

                                     # which effectively becomes 30000)

host-selector = round_robin          # The host selection strategy used

                                     # when load-balancing among hosts

                                     # (default: round_robin).

                                     # Other values are include "random"

                                     # or the FQCN of a custom class

                                     # that implements

                                     # LoadBalancingRpcClient$HostSelector

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

--------------------------------------------------------------------------------