天天看点

[Spark][Streaming]Spark读取网络输入的例子

Spark读取网络输入的例子:

参考如下的URL进行试验

https://stackoverflow.com/questions/46739081/how-to-get-record-in-string-format-from-sockettextstream

http://www.cnblogs.com/FG123/p/5324743.html

发现 先执行 nc -lk 9999 ,再执行 spark 程序之后, 

如果停止 nc ,spark程序会报错:

类似于:

[Spark][Streaming]Spark读取网络输入的例子

-------------------------------------------

Time: 2017-10-28 19:32:02

17/10/28 19:32:23 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:9999 - java.net.ConnectException: Connection refused

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:589)

at java.net.Socket.connect(Socket.java:538)

at java.net.Socket.<init>(Socket.java:434)

at java.net.Socket.<init>(Socket.java:211)

at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73)

at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59)

[Spark][Streaming]Spark读取网络输入的例子

这表明,两者已经建立 的 通信。但是没有看到预想的 word count 输出。我猜测是 用于参与计算的进程数不够,所以进行如下改动:

sc = SparkContext("local[2]", "streamwordcount")

改为:

sc = SparkContext("local[3]", "streamwordcount")

整个程序如下:

[Spark][Streaming]Spark读取网络输入的例子

[training@localhost ab]$ cat test.py

#showing remote messages

from __future__ import print_function

import sys

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

if __name__ == "__main__":

# 创建本地的SparkContext对象,包含3个执行线程

ssc = StreamingContext(sc, 2)

# 创建本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line: line.split(" "))

# 使用flatMap和Split对2秒内收到的字符串进行分割

pairs = words.map(lambda word: (word, 1))

wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()

ssc.start() 

# 启动Spark Streaming应用

ssc.awaitTermination()

[Spark][Streaming]Spark读取网络输入的例子

再次运行 nc 程序

[training@localhost ~]$ nc -lk 9999

运行 spark 程序:

[training@localhost ~]$ spark-submit /home/training/ab/test.py

在nc窗口中输入一些数据:

[Spark][Streaming]Spark读取网络输入的例子

aaa bbb ccc

ddd aaa sss

sss bbb bbb

kkk jjj mmm

ooo kkk jjj

mmm ccc ddd

eee fff sss

rrr nnn ooo

ppp sss zzz

mmm sss ttt

kkk sss ttt

rrr ooo ppp

kkk qqq kkk

lll nnn jjj

rrr ooo sss

kkk aaa ddd

aaa aaa fff

eee sss nnn

ooo ppp qqq

qqq sss eee

sss mmm nnn

[Spark][Streaming]Spark读取网络输入的例子

此时,经过一小会,可以看到,spark 程序的窗口输出:

[Spark][Streaming]Spark读取网络输入的例子

------------------------------------------- 

Time: 2017-10-28 19:33:50

Time: 2017-10-28 19:33:52

Time: 2017-10-28 19:33:54

Time: 2017-10-28 19:33:56

Time: 2017-10-28 19:33:58

Time: 2017-10-28 19:34:00

(u'', 1)

(u'mmm', 2)

(u'bbb', 3)

(u'nnn', 1)

(u'ccc', 2)

(u'rrr', 1)

(u'sss', 3)

(u'fff', 1)

(u'aaa', 2)

(u'ooo', 2)

...

Time: 2017-10-28 19:34:02

Time: 2017-10-28 19:34:04

(u'ppp', 1)

(u'sss', 1)

(u'zzz', 1)

Time: 2017-10-28 19:34:06

Time: 2017-10-28 19:34:08

(u'mmm', 1)

(u'ttt', 1)

Time: 2017-10-28 19:34:10

Time: 2017-10-28 19:34:12

(u'kkk', 1)

Time: 2017-10-28 19:34:14

Time: 2017-10-28 19:34:16

(u'ooo', 1)

Time: 2017-10-28 19:34:18

(u'qqq', 1)

(u'kkk', 2)

Time: 2017-10-28 19:34:20

Time: 2017-10-28 19:34:22

[Spark][Streaming]Spark读取网络输入的例子

<a></a>

<a>本文转自健哥的数据花园博客园博客,原文链接:http://www.cnblogs.com/gaojian/p/7749538.html,如需转载请自行联系原作者</a>