天天看點

Apache Nifi入門篇03--Nifi 基礎案例

再試試其他的案例:

https://mp.weixin.qq.com/s/m2eyFaG_j0S8sgPeNUfdNA

https://mp.weixin.qq.com/s/EJMaFG-f2KadFw7kLHsvlQ

講真,Nifi用起來體驗感覺真不錯~~~  快速找到文檔連結................

Apache Nifi入門篇03--Nifi 基礎案例
Apache Nifi入門篇03--Nifi 基礎案例

對了,備注一下,Nifi的程序名稱,免得下次挂了我們還不知道:

Apache Nifi入門篇03--Nifi 基礎案例

Flink 程式讀取Nifi的資料:

依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-nifi_2.11</artifactId>
  <version>1.7.2</version>
</dependency>
           

Flink的程式代碼:

package aliyun.product.customer_analysis_system

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.nifi.{NiFiDataPacket, NiFiSource}
import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
import org.apache.flink.streaming.api.scala._
object Nifi2Flink {
  def main(args: Array[String]): Unit = {
    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
      .url("http://172.10.4.95:9901/nifi")
      .portName("test")
//      .portName("flink2")
      .requestBatchCount(2)
      .buildConfig()

    val nifiSource: NiFiSource = new NiFiSource(clientConfig)
    val  NifiData: DataStream[NiFiDataPacket] = streamExecEnv.addSource(nifiSource)
    NifiData.print()
    NifiData.map(x=> {
      println(x)
      x.getContent.toString
    }).print()
    streamExecEnv.execute()

  }

}

      

輸出到Nifi:

val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment

val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.requestBatchCount(5)
.buildConfig()

val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})

streamExecEnv.addSink(nifiSink)      

下一篇文章講Nifi+Flink