再試試其他的案例:
https://mp.weixin.qq.com/s/m2eyFaG_j0S8sgPeNUfdNA
https://mp.weixin.qq.com/s/EJMaFG-f2KadFw7kLHsvlQ
講真,Nifi用起來體驗感覺真不錯~~~ 快速找到文檔連結................

對了,備注一下,Nifi的程序名稱,免得下次挂了我們還不知道:
Flink 程式讀取Nifi的資料:
依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-nifi_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Flink的程式代碼:
package aliyun.product.customer_analysis_system
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.nifi.{NiFiDataPacket, NiFiSource}
import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
import org.apache.flink.streaming.api.scala._
object Nifi2Flink {
def main(args: Array[String]): Unit = {
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://172.10.4.95:9901/nifi")
.portName("test")
// .portName("flink2")
.requestBatchCount(2)
.buildConfig()
val nifiSource: NiFiSource = new NiFiSource(clientConfig)
val NifiData: DataStream[NiFiDataPacket] = streamExecEnv.addSource(nifiSource)
NifiData.print()
NifiData.map(x=> {
println(x)
x.getContent.toString
}).print()
streamExecEnv.execute()
}
}
輸出到Nifi:
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
.portName("Data from Flink")
.requestBatchCount(5)
.buildConfig()
val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})
streamExecEnv.addSink(nifiSink)
下一篇文章講Nifi+Flink