天天看點

使用Flink實作索引資料到Elasticsearch

5萬人關注的大資料成神之路,不來了解一下嗎? 5萬人關注的大資料成神之路,真的不來了解一下嗎? 5萬人關注的大資料成神之路,确定真的不來了解一下嗎?
歡迎您關注 《大資料成神之路》

使用Flink處理資料時,可以基于Flink提供的批式處理(Batch Processing)和流式處理(Streaming Processing)API來實作,分别能夠滿足不同場景下應用資料的處理。這兩種模式下,輸入處理都被抽象為Source Operator,包含對應輸入資料的處理邏輯;輸出處理都被抽象為Sink Operator,包含了對應輸出資料的處理邏輯。這裡,我們隻關注輸出的Sink Operator實作。

Flink批式處理模式,運作Flink Batch Job時作用在有界的輸入資料集上,是以Job運作的時間是有時限的,一旦Job運作完成,對應的整個資料處理應用就已經結束,比如,輸入是一個資料檔案,或者一個Hive SQL查詢對應的結果集,等等。在批式處理模式下處理資料的輸出時,主要需要實作一個自定義的OutputFormat,然後基于該OutputFormat來建構一個Sink,下面看下OutputFormat接口的定義,如下所示:

@Public
public interface OutputFormat<IT> extends Serializable {
    void configure(Configuration parameters);
    void open(int taskNumber, int numTasks) throws IOException;
    void writeRecord(IT record) throws IOException;
    void close() throws IOException;
}           

上面,configure()方法用來配置一個OutputFormat的一些輸出參數;open()方法用來實作與外部存儲系統建立連接配接;writeRecord()方法用來實作對Flink Batch Job處理後,将資料記錄輸出到外部存儲系統。開發Batch Job時,通過調用DataSet的output()方法,參數值使用一個OutputFormat的具體實作即可。後面,我們會基于Elasticsearch來實作上面接口中的各個方法。

Flink流式處理模式,運作Flink Streaming Job時一般輸入的資料集為流資料集,也就是說輸入資料元素會持續不斷地進入到Streaming Job的處理過程中,但你仍然可以使用一個HDFS資料檔案作為Streaming Job的輸入,即使這樣,一個Flink Streaming Job啟動運作後便會永遠運作下去,除非有意外故障或有計劃地操作使其終止。在流式處理模式下處理資料的輸出時,我們需要是實作一個SinkFunction,它指定了如下将流資料處理後的結果,輸出到指定的外部存儲系統中,下面看下SinkFunction的接口定義,如下所示:

@Public
public interface SinkFunction<IN> extends Function, Serializable {
    @Deprecated
    default void invoke(IN value) throws Exception {}
    default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }
 
    @Public
    interface Context<T> {
        long currentProcessingTime();
        long currentWatermark();
        Long timestamp();
    }
}           

通過上面接口可以看到,需要實作一個invoke()方法,實作該方法來将一個輸入的IN value輸出到外部存儲系統中。一般情況下,對一些主流的外部存儲系統,Flink實作了一下内置(社群貢獻)的SinkFunction,我們隻需要配置一下就可以直接使用。而且,對于Streaming Job來說,實作的SinkFunction比較豐富一些,可以減少自己開發的工作量。開發Streaming Job時,通過調用DataStream的addSink()方法,參數是一個SinkFlink的具體實作。

下面,我們分别基于批式處理模式和批式處理模式,分别使用或實作對應元件将Streaming Job和Batch Job的處理結果輸出到Elasticsearch中:

基于Flink DataSteam API實作

在開發基于Flink的應用程式過程中,發現Flink Streaming API對Elasticsearch的支援還是比較好的,比如,如果想要從Kafka消費事件記錄,經過處理最終将資料記錄索引到Elasticsearch 5.x,可以直接在Maven的POM檔案中添加如下依賴即可:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
   <version>1.5.3</version>
 </dependency>           

我們使用Flink Streaming API來實作将流式資料處理後,寫入到Elasticsearch中。其中,輸入資料源是Kafka中的某個Topic;輸出處理結果到lasticsearch中,我們使用使用Transport API的方式來連接配接Elasticsearch,需要指定Transport位址和端口。具體實作,對應的Scala代碼,如下所示:

def main(args: Array[String]): Unit = {
  // parse input arguments
  val params = ParameterTool.fromArgs(args)
 
  if (params.getNumberOfParameters < 9) {
    val cmd = getClass.getName
    println("Missing parameters!\n"
      + "Usage: " + cmd
      + " --input-topic <topic> "
      + "--es-cluster-name <es cluster name> "
      + "--es-transport-addresses <es address> "
      + "--es-port <es port> "
      + "--es-index <es index> "
      + "--es-type <es type> "
      + "--bootstrap.servers <kafka brokers> "
      + "--zookeeper.connect <zk quorum> "
      + "--group.id <some id> [--prefix <prefix>]")
    return
  }
 
  val env = StreamExecutionEnvironment.getExecutionEnvironment
 
  val kafkaConsumer = new FlinkKafkaConsumer010[String](
    params.getRequired("input-topic"),
    new SimpleStringSchema(),
    params.getProperties
  )
 
  val dataStream = env
    .addSource(kafkaConsumer)
    .filter(!_.isEmpty)
 
  val esClusterName = params.getRequired("es-cluster-name")
  val esAddresses = params.getRequired("es-transport-addresses")
  val esPort = params.getInt("es-port", 9300)
  val transportAddresses = new java.util.ArrayList[InetSocketAddress]
 
  val config = new java.util.HashMap[String, String]
  config.put("cluster.name", esClusterName)
  // This instructs the sink to emit after every element, otherwise they would be buffered
  config.put("bulk.flush.max.actions", "100")
 
  esAddresses.split(",").foreach(address => {
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName(address), esPort))
  })
  val esIndex = params.getRequired("es-index")
  val esType = params.getRequired("es-type")
  val sink = new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
 
    def createIndexRequest(element: String): IndexRequest = {
      return Requests.indexRequest()
        .index(esIndex)
        .`type`(esType)
        .source(element)
    }
 
    override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
      requestIndexer.add(createIndexRequest(t))
    }
  })
  dataStream.addSink(sink)
 
  val jobName = getClass.getSimpleName
  env.execute(jobName)
}           

上面有關資料索引到Elasticsearch的進行中, 最核心的就是建立一個ElasticsearchSink,然後通過DataStream的API調用addSink()添加一個Sink,實際是一個SinkFunction的實作,可以參考Flink對應DataStream類的addSink()方法代碼,如下所示:

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] =
  stream.addSink(sinkFunction)           

基于Flink DataSet API實作

目前,Flink還沒有在Batch處理模式下實作對應Elasticsearch對應的Connector,需要自己根據需要實作,是以我們基于Flink已經存在的Streaming處理模式下已經實作的Elasticsearch Connector對應的代碼,經過部分修改,可以直接拿來在Batch處理模式下,将資料記錄批量索引到Elasticsearch中

我們基于Flink 1.6.1版本,以及Elasticsearch 6.3.2版本,并且使用Elasticsearch推薦的High Level REST API來實作(為了複用Flink 1.6.1中對應的Streaming處理模式下的Elasticsearch 6 Connector實作代碼,我們選擇使用該REST Client),需要在Maven的POM檔案中添加如下依賴:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>6.3.2</version>
</dependency>
<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>6.3.2</version>
</dependency>           

我們實作的各個類的類圖及其關系,如下圖所示:

使用Flink實作索引資料到Elasticsearch

如果熟悉Flink Streaming處理模式下Elasticsearch對應的Connector實作,可以看到上面的很多類都在org.apache.flink.streaming.connectors.elasticsearch包裡面存在,其中包括批量向Elasticsearch中索引資料(内部實作了使用BulkProcessor)。上圖中引入的ElasticsearchApiCallBridge,目的是能夠實作對Elasticsearch不同版本的支援,隻需要根據Elasticsearch不同版本中不同Client實作,進行一些适配,上層抽象保持不變。

如果需要在Batch處理模式下批量索引資料到Elasticsearch,可以直接使用ElasticsearchOutputFormat即可實作。但是建立ElasticsearchOutputFormat,需要幾個參數:

private ElasticsearchOutputFormat(
    Map<String, String> bulkRequestsConfig,
    List<HttpHost> httpHosts,
    ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
    DocWriteRequestFailureHandler failureHandler,
    RestClientFactory restClientFactory) {
  super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),  bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
}           

當然,我們可以通過代碼中提供的Builder來非常友善的建立一個ElasticsearchOutputFormat。下面,我們看下我們Flink Batch Job實作邏輯。

實作ElasticsearchSinkFunction

我們需要實作ElasticsearchSinkFunction接口,實作一個能夠索引資料到Elasticsearch中的功能,代碼如下所示:

final ElasticsearchSinkFunction<String> elasticsearchSinkFunction = new ElasticsearchSinkFunction<String>() {
 
   @Override
   public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
     indexer.add(createIndexRequest(element, parameterTool));
   }
 
   private IndexRequest createIndexRequest(String element, ParameterTool parameterTool) {
     LOG.info("Create index req: " + element);
     JSONObject o = JSONObject.parseObject(element);
     return Requests.indexRequest()
             .index(parameterTool.getRequired("es-index"))
             .type(parameterTool.getRequired("es-type"))
             .source(o);
   }           

上面代碼,主要是把一個将要輸出的資料記錄,通過RequestIndexer來實作索引到Elasticsearch中。

讀取Elasticsearch配置參數

配置連接配接Elasticsearch的參數。從程式輸入的ParameterTool中讀取Elasticsearch相關的配置:

使用Flink實作索引資料到Elasticsearch

建立ElasticsearchOutputFormat

建立一個我們實作的ElasticsearchOutputFormat,代碼片段如下所示:

使用Flink實作索引資料到Elasticsearch

上面很多配置項指定了向Elasticsearch中進行批量寫入的行為,在ElasticsearchOutputFormat内部會進行設定并建立Elasticsearch6BulkProcessorIndexer,優化索引資料處理的性能。

實作Batch Job主要制流程

最後我們就可以建構我們的Flink Batch應用程式了,代碼如下所示:

使用Flink實作索引資料到Elasticsearch

我們輸入的HDFS檔案中,是一些已經加工好的JSON格式記錄行,這裡為了簡單,直接将原始JSON字元串索引到Elasticsearch中,而沒有進行更多其他的處理操作。

參考連結:

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/elasticsearch.html https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/#data-sinks