天天看點

[Spark][Streaming]Spark讀取網絡輸入的例子

Spark讀取網絡輸入的例子:

參考如下的URL進行試驗

https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream

http://www.cnblogs.com/FG123/p/5324743.html

發現 先執行 nc -lk 9999 ,再執行 spark 程式之後, 

如果停止 nc ,spark程式會報錯:

類似于:

[Spark][Streaming]Spark讀取網絡輸入的例子

-------------------------------------------

Time: 2017-10-28 19:32:02

17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:589)

at java.net.Socket.connect(Socket.java:538)

at java.net.Socket.<init>(Socket.java:434)

at java.net.Socket.<init>(Socket.java:211)

at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)

at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

[Spark][Streaming]Spark讀取網絡輸入的例子

這表明,兩者已經建立 的 通信。但是沒有看到預想的 word count 輸出。我猜測是 用于參與計算的程序數不夠,是以進行如下改動:

sc = SparkContext("local[2]", "streamwordcount")

改為:

sc = SparkContext("local[3]", "streamwordcount")

整個程式如下:

[Spark][Streaming]Spark讀取網絡輸入的例子

[training@localhost ab]$ cat test.py

#showing remote messages

from __future__ import print_function

import sys

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

if __name__ == "__main__":

# 建立本地的SparkContext對象,包含3個執行線程

ssc = StreamingContext(sc, 2)

# 建立本地的StreamingContext對象,處理的時間片間隔時間,設定為2s

lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))

# 使用flatMap和Split對2秒内收到的字元串進行分割

pairs = words.map(lambda word: (word, 1))

wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

ssc.start() 

# 啟動Spark Streaming應用

ssc.awaitTermination()

[Spark][Streaming]Spark讀取網絡輸入的例子

再次運作 nc 程式

[training@localhost ~]$ nc -lk 9999

運作 spark 程式:

[training@localhost ~]$ spark-submit /home/training/ab/test.py

在nc視窗中輸入一些資料:

[Spark][Streaming]Spark讀取網絡輸入的例子

aaa bbb ccc

ddd aaa sss

sss bbb bbb

kkk jjj mmm

ooo kkk jjj

mmm ccc ddd

eee fff sss

rrr nnn ooo

ppp sss zzz

mmm sss ttt

kkk sss ttt

rrr ooo ppp

kkk qqq kkk

lll nnn jjj

rrr ooo sss

kkk aaa ddd

aaa aaa fff

eee sss nnn

ooo ppp qqq

qqq sss eee

sss mmm nnn

[Spark][Streaming]Spark讀取網絡輸入的例子

此時,經過一小會,可以看到,spark 程式的視窗輸出:

[Spark][Streaming]Spark讀取網絡輸入的例子

------------------------------------------- 

Time: 2017-10-28 19:33:50

Time: 2017-10-28 19:33:52

Time: 2017-10-28 19:33:54

Time: 2017-10-28 19:33:56

Time: 2017-10-28 19:33:58

Time: 2017-10-28 19:34:00

(u'', 1)

(u'mmm', 2)

(u'bbb', 3)

(u'nnn', 1)

(u'ccc', 2)

(u'rrr', 1)

(u'sss', 3)

(u'fff', 1)

(u'aaa', 2)

(u'ooo', 2)

...

Time: 2017-10-28 19:34:02

Time: 2017-10-28 19:34:04

(u'ppp', 1)

(u'sss', 1)

(u'zzz', 1)

Time: 2017-10-28 19:34:06

Time: 2017-10-28 19:34:08

(u'mmm', 1)

(u'ttt', 1)

Time: 2017-10-28 19:34:10

Time: 2017-10-28 19:34:12

(u'kkk', 1)

Time: 2017-10-28 19:34:14

Time: 2017-10-28 19:34:16

(u'ooo', 1)

Time: 2017-10-28 19:34:18

(u'qqq', 1)

(u'kkk', 2)

Time: 2017-10-28 19:34:20

Time: 2017-10-28 19:34:22

[Spark][Streaming]Spark讀取網絡輸入的例子

<a></a>

<a>本文轉自健哥的資料花園部落格園部落格,原文連結:http://www.cnblogs.com/gaojian/p/7749538.html,如需轉載請自行聯系原作者</a>