Spark讀取網絡輸入的例子:
參考如下的URL進行試驗
https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream
http://www.cnblogs.com/FG123/p/5324743.html
發現 先執行 nc -lk 9999 ,再執行 spark 程式之後,
如果停止 nc ,spark程式會報錯:
類似于:

-------------------------------------------
Time: 2017-10-28 19:32:02
17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

這表明,兩者已經建立 的 通信。但是沒有看到預想的 word count 輸出。我猜測是 用于參與計算的程序數不夠,是以進行如下改動:
sc = SparkContext("local[2]", "streamwordcount")
改為:
sc = SparkContext("local[3]", "streamwordcount")
整個程式如下:

[training@localhost ab]$ cat test.py
#showing remote messages
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
# 建立本地的SparkContext對象,包含3個執行線程
ssc = StreamingContext(sc, 2)
# 建立本地的StreamingContext對象,處理的時間片間隔時間,設定為2s
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
# 使用flatMap和Split對2秒内收到的字元串進行分割
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
# 啟動Spark Streaming應用
ssc.awaitTermination()

再次運作 nc 程式
[training@localhost ~]$ nc -lk 9999
運作 spark 程式:
[training@localhost ~]$ spark-submit /home/training/ab/test.py
在nc視窗中輸入一些資料:

aaa bbb ccc
ddd aaa sss
sss bbb bbb
kkk jjj mmm
ooo kkk jjj
mmm ccc ddd
eee fff sss
rrr nnn ooo
ppp sss zzz
mmm sss ttt
kkk sss ttt
rrr ooo ppp
kkk qqq kkk
lll nnn jjj
rrr ooo sss
kkk aaa ddd
aaa aaa fff
eee sss nnn
ooo ppp qqq
qqq sss eee
sss mmm nnn

此時,經過一小會,可以看到,spark 程式的視窗輸出:

-------------------------------------------
Time: 2017-10-28 19:33:50
Time: 2017-10-28 19:33:52
Time: 2017-10-28 19:33:54
Time: 2017-10-28 19:33:56
Time: 2017-10-28 19:33:58
Time: 2017-10-28 19:34:00
(u'', 1)
(u'mmm', 2)
(u'bbb', 3)
(u'nnn', 1)
(u'ccc', 2)
(u'rrr', 1)
(u'sss', 3)
(u'fff', 1)
(u'aaa', 2)
(u'ooo', 2)
...
Time: 2017-10-28 19:34:02
Time: 2017-10-28 19:34:04
(u'ppp', 1)
(u'sss', 1)
(u'zzz', 1)
Time: 2017-10-28 19:34:06
Time: 2017-10-28 19:34:08
(u'mmm', 1)
(u'ttt', 1)
Time: 2017-10-28 19:34:10
Time: 2017-10-28 19:34:12
(u'kkk', 1)
Time: 2017-10-28 19:34:14
Time: 2017-10-28 19:34:16
(u'ooo', 1)
Time: 2017-10-28 19:34:18
(u'qqq', 1)
(u'kkk', 2)
Time: 2017-10-28 19:34:20
Time: 2017-10-28 19:34:22

<a></a>
<a>本文轉自健哥的資料花園部落格園部落格,原文連結:http://www.cnblogs.com/gaojian/p/7749538.html,如需轉載請自行聯系原作者</a>