聚焦在 Twitter 上關于Apache Spark的資料, 這些是準備用于機器學習和流式處理應用的資料。 重點是如何通過分布式網絡交換代碼和資料,獲得 串行化, 持久化 , 排程和緩存的實戰經驗 。 認真使用 Spark SQL, 互動性探索結構化和半結構化資料. Spark SQL 的基礎資料結構是 Spark dataframe, Spark dataframe 受到了 Python Pandas dataframe 和R dataframe 的啟發. 這是一個強大的資料結構, 有R 或Python 背景的資料科學家非常容易了解并喜歡上它.
主要關注以下幾點:
• 連接配接 Twitter, 收集有關資料, 然後存儲到各種格式中如 JSON ,CSV 以及 MongoDB這樣的資料存儲
• 使用Blaze and Odo分析資料, 一個Blaze的副産品庫, 能夠在各種源和目标之間建立連接配接并傳輸資料
• 引入 Spark dataframes 作為各個 Spark 子產品交換資料的基礎,同時使用 Spark SQL互動性探索資料
回顧資料密集型應用的架構
首先審視資料密集型應用架構,集中關注內建層以及擷取、提煉和資料持久化疊代循環的基本運作. 這一循環命名為 5C. 5C 代表了connect, collect,correct, compose和consume. 這是內建層運作的基本過程以便于保證從Twitter 擷取資料的品質和數量. 我們也将深入持久化層,建立如 MongoDB這樣的資料存儲友善後面資料的處理.
通過Blaze探索資料, 這是資料操控的一個Python 庫, 通過Spark SQL使用 Spark dataframe, 完成互動性資料發現,感受一下三種 dataframe flavors 的細微差别。
下圖指出了本章的重點, 集中在內建層和持久化層:
資料的序列化和反序列化
由于在通過API擷取資料是的限制,我們需要資料存儲. 資料在分布式叢集上處理,我們需要一緻的方式來儲存狀态以便于将來的提取使用。現在定義序列化, 持久化, 排程和緩存.
序列化一個Python對象是将它轉換一個位元組流. 該Python 對象在程式挂掉的時候能夠通過反序列化提取.序列化後的 Python 對象在網絡上傳輸或者存在持久化存儲中. 反序列化是其逆運算将位元組流轉化為初始的 Python 對象是以程式能夠從儲存的狀态中提取。 Python中最流行的序列化庫是Pickle. 事實上,PySpark指令将pickled 的資料傳輸到多個工作節點.
持久化 将程式的狀态資料儲存到硬碟或記憶體,因而在離開或重新開機時繼續使用。把一個Python 對象從記憶體儲存到檔案或資料庫,在以後加載的時候擁有相同的狀态。
排程 是在多核或者分布式系統中在網絡TCP連接配接上發送 Python 代碼和資料.
緩存 是将一個Python 對象轉化為記憶體中的字元串可以作為字典中的關鍵字. Spark 支援将資料放入叢集範圍的記憶體緩沖區. 這在資料重複通路時非常有用,例如查詢一個小引用的資料集或者象 Google PageRank那樣的疊代算法.
緩存是Spark中非常關鍵的一個概念,允許我們将RDDs 存入記憶體或者溢出到硬碟 . 緩存政策的選擇依賴于資料的線性程度或者RDD轉換的DAG ,這樣可以最小化 shuffle 或跨網絡資料交換.Spark 為了獲得更好的性能,需要注意資料shuffling. 一個好的分區政策和 RDD 緩存, 避免不必要的操作, 可以導緻Spark更好的性能.
擷取和存儲資料
在深入如MongoDB這樣的資料庫存儲之前,先看一下廣泛使用的檔案存儲 : CSV和JSON檔案存儲. 這兩種格式被廣泛使用的主要原因: 可讀性好, 簡單, 輕度關聯, 并容易使用.
在CSV中持久化資料
CSV 是輕量級可讀易用的格式. 擁有已分隔的文本列和内在表格制式。Python提供了強健的csv庫能将 cvs檔案序列化為一個Python的字典. 為了我們的程式友善, 寫了一個 python類來管理CSV格式中 資料的存儲,和從CSV中讀取資料. 看一下 IO_csv 類的代碼. init 部分 執行個體化了檔案路徑,檔案名和檔案字尾(本例中, .csv):
class IO_csv(object):
def __init__(self, filepath, filename, filesuffix='csv'):
self.filepath = filepath # /path/to/file without the /'
at the end
self.filename = filename # FILE_NAME
self.filesuffix = filesuffix
該類的存儲方法使用了tuple 和 csv 檔案的頭字段作為scheme來持久化資料。如果csv檔案存在,則追加資料,否則建立:
def save(self, data, NTname, fields):
# NTname = Name of the NamedTuple
# fields = header of CSV - list of the fields name
NTuple = namedtuple(NTname, fields)
if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix)):
# Append existing file
with open('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix), 'ab') as f:
writer = csv.writer(f)
# writer.writerow(fields) # fields = header of CSV
writer.writerows([row for row in map(NTuple._make,
data)])
# list comprehension using map on the NamedTuple._
make() iterable and the data file to be saved
# Notice writer.writerows and not writer.writerow
(i.e. list of multiple rows sent to csv file
else:
# Create new file
with open('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix), 'wb') as f:
writer = csv.writer(f)
writer.writerow(fields) # fields = header of CSV -
list of the fields name
writer.writerows([row for row in map(NTuple._make,
data)])
# list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
# Notice writer.writerows and not writer.writerow
(i.e. list of multiple rows sent to csv file
該類的加載方法使用了tuple 和 csv 檔案的頭字段使用一緻的schema來提取資料。 加載方法使用生成器來提高記憶體的有效性,使用yield 傳回:
def load(self, NTname, fields):
# NTname = Name of the NamedTuple
# fields = header of CSV - list of the fields name
NTuple = namedtuple(NTname, fields)
with open('{0}/{1}.{2}'.format(self.filepath,self.filename,self.filesuffix),'rU') as f:
reader = csv.reader(f)
for row in map(NTuple._make, reader):
# Using map on the NamedTuple._make() iterable and the reader file to be loaded
yield row
我們使用tuple解析tweet儲存到csv或者從csv中提取資料:
fields01 = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text','url']
Tweet01 = namedtuple('Tweet01',fields01)
def parse_tweet(data):
"""
Parse a ``tweet`` from the given response data.
"""
return Tweet01(
id=data.get('id', None),
created_at=data.get('created_at', None),
user_id=data.get('user_id', None),
user_name=data.get('user_name', None),
tweet_text=data.get('tweet_text', None),
url=data.get('url')
)
在 JSON中持久化
JSON 是網際網路應用中使用最廣泛的資料格式之一. 所有我們使用的API,Twitter, GitHub, 和Meetup, 都通過JSON格式發送資料. JSON 格式比 XML格式要輕,可讀性好,在JSON 中内嵌模式. 對于CSV 格式, 所有記錄遵從相同的表結構,而JSON 的結構能夠變化,是半結構化的,一條JSON 記錄能夠映射成Python中的字典。 看一下 IO_json類的代碼. init 部分例化了檔案路徑,檔案名和檔案字尾(本例中,.json):
class IO_json(object):
def __init__(self, filepath, filename, filesuffix='json'):
self.filepath = filepath # /path/to/file without the /'
at the end
self.filename = filename # FILE_NAME
self.filesuffix = filesuffix
# self.file_io = os.path.join(dir_name, .'.join((base_
filename, filename_suffix)))
該類的save方法使用utf-8編碼來保證資料讀寫的相容性。 如果JSON存在, 則追加資料否則建立:
def save(self, data):
if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix)):
# Append existing file
with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'a', encoding='utf-8') as f:
f.write(unicode(json.dumps(data, ensure_ascii=
False))) # In python 3, there is no "unicode" function
# f.write(json.dumps(data, ensure_ascii= False)) #
create a \" escape char for " in the saved file
else:
# Create new file
with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'w', encoding='utf-8') as f:
f.write(unicode(json.dumps(data, ensure_ascii=
False)))
# f.write(json.dumps(data, ensure_ascii= False))
這個類的load 方法傳回了讀取的檔案 , 擷取json資料需要調用 json.loads函數:
def load(self):
with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), encoding='utf-8') as f:
return f.read()
搭建MongoDB
鑒于存儲所收集資訊的重要性,搭建MongoDB 作為我們的文檔存儲資料庫 . 所有采集的資訊是 JSON 格式, MongoDB 以 BSON (short for Binary JSON)格式資訊, 是以是一個自然的選擇.
現在完成下列步驟:
• 安裝MongoDB 伺服器和用戶端
• 運作MongoDB server
• 運作 Mongo client
• 安裝PyMongo driver
• 建立 Python Mongo client
安裝MongoDB伺服器和用戶端
執行如下步驟安裝 MongoDB 包:
1. 使用包管理工具導入公鑰(in our case, Ubuntu’s apt),指令如下:
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv
7F0CEB10
- 建立 MongoDB 的檔案清單,指令如下. :
echo "deb http://repo.mongodb.org/apt/ubuntu "$("lsb_release
-sc)"/ mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.
list.d/mongodb-org-3.0.list
3.更新本地包的資料庫:
sudo apt-get update
4.安裝MongoDB 的最新穩定版:
sudo apt-get install -y mongodb-org
運作MongoDB伺服器
啟動MongoDB server:
1. 啟動MongoDB server, 指令如下:
sudo service mongodb start
- 檢查mongod 是否正常啟動:
an@an-VB:/usr/bin$ ps -ef | grep mongo
mongodb
967 1 4 07:03 ? 00:02:02 /usr/bin/mongod
--config /etc/mongod.conf
an
3143 3085 0 07:45 pts/3 00:00:00 grep --color=auto
mongo
In
在本例中,mongodb 運作在967程序.
3. The mongod server 監聽預設端口27017 可以在配置檔案中修改.
4. 檢查/var/log/mongod/mongod.log 日志檔案的内容:
an@an-VB:/var/lib/mongodb$ ls -lru
total 81936
drwxr-xr-x 2 mongodb nogroup 4096 Apr 25 11:19 _tmp
-rw-r--r-- 1 mongodb nogroup 69 Apr 25 11:19 storage.bson
-rwxr-xr-x 1 mongodb nogroup 5 Apr 25 11:19 mongod.lock
-rw------- 1 mongodb nogroup 16777216 Apr 25 11:19 local.ns
-rw------- 1 mongodb nogroup 67108864 Apr 25 11:19 local.0
drwxr-xr-x 2 mongodb nogroup 4096 Apr 25 11:19 journal
5. 停止mongodb 的伺服器, 指令如下:
sudo service mongodb stop
運作Mongo用戶端
在控制台運作Mongo client 很簡單,指令如下:
an@an-VB:/usr/bin$ mongo
MongoDB shell version: 3.0.2
connecting to: test
Server has startup warnings:
2015-05-30T07:03:49.387+0200 I CONTROL [initandlisten]
2015-05-30T07:03:49.388+0200 I CONTROL [initandlisten]
在mongo client console 提示下, 檢視資料庫的指令如下:
> show dbs
local 0.078GB
test 0.078GB
選擇 test資料庫:
> use test
switched to db test
在資料庫中顯示 collections:
> show collections
restaurants
system.indexes
我們檢視 restaurant collection 中的紀錄:
> db.restaurants.find()
{ "_id" : ObjectId("553b70055e82e7b824ae0e6f"), "address : { "building
: "1007", "coord" : [ -73.856077, 40.848447 ], "street : "Morris Park
Ave", "zipcode : "10462 }, "borough : "Bronx", "cuisine : "Bakery",
"grades : [ { "grade : "A", "score" : 2, "date" : ISODate("2014-
03-03T00:00:00Z") }, { "date" : ISODate("2013-09-11T00:00:00Z"),
"grade : "A", "score" : 6 }, { "score" : 10, "date" : ISODate("2013-
01-24T00:00:00Z"), "grade : "A }, { "date" : ISODate("2011-11-
23T00:00:00Z"), "grade : "A", "score" : 9 }, { "date" : ISODate("2011-
03-10T00:00:00Z"), "grade : "B", "score" : 14 } ], "name : "Morris
Park Bake Shop", "restaurant_id : "30075445" }
安裝PyMongo driver
在anaconda 中安裝mongodb的Python驅動也很簡單:
conda install pymongo
建立 MongoDB的Python client
我們建立一個 IO_mongo 類用來收集資料 存儲采集的資料 提取儲存的資料. 為了建立mongo client, 需要import pymongo. 連接配接本地端口 27017指令如下:
from pymongo import MongoClient as MCli
class IO_mongo(object):
conn={'host':'localhost', 'ip':'27017'}
我們的類初始化了用戶端連接配接, 資料庫 (本例中, twtr_db),和被通路連接配接的collection (本例中, twtr_coll):
def __init__(self, db='twtr_db', coll='twtr_coll', **conn ):
# Connects to the MongoDB server
self.client = MCli(**conn)
self.db = self.client[db]
self.coll = self.db[coll]
save方法插入新的紀錄:
def save(self, data):
#Insert to collection in db
return self.coll.insert(data)
load 方法根據規則提取資料. 在資料量大的情況下 傳回遊标:
def load(self, return_cursor=False, criteria=None, projection=None):
if criteria is None:
criteria = {}
if projection is None:
cursor = self.coll.find(criteria)
else:
cursor = self.coll.find(criteria, projection)
# Return a cursor for large amounts of data
if return_cursor:
return cursor
else:
return [ item for item in cursor ]
從Twitter汲取資料
每個社交網絡都有自己的限制和挑戰, 一個主要的障礙就是強加的流量限制. 在長連接配接或重複執行時要有暫停, 必須要避免重複資料.我們重新設計了連接配接程式來關注流量限制。
TwitterAPI 類根據查詢條件來搜尋和采集,我們已經添加了如下操作:
•日志能力,使用 Python logging 庫在程式失敗時紀錄錯誤和警告
• 使用MongoDB 的持久化能力,象使用 IO_json 操作JSON 檔案那樣操作 IO_mongo 類
• API 流量限制和錯誤管理能力 , 保證我們彈性調用 Twitter 而不會被認為是惡意攻擊
步驟如下:
1. 通過證書初始化Twitter API 的執行個體:
class TwitterAPI(object):
"""
TwitterAPI class allows the Connection to Twitter via OAuth
once you have registered with Twitter and receive the
necessary credentials
"""
def __init__(self):
consumer_key = 'get_your_credentials'
consumer_secret = get your_credentials'
access_token = 'get_your_credentials'
access_secret = 'get your_credentials'
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_secret = access_secret
self.retries = 3
self.auth = twitter.oauth.OAuth(access_token, access_
secret, consumer_key, consumer_secret)
self.api = twitter.Twitter(auth=self.auth)
2 設定日志等級,初始化 logger:
° logger.debug(debug message)
° logger.info(info message)
° logger.warn(warn message)
° logger.error(error message)
° logger.critical(critical message)
3設定日志路徑和内容格式:
# logger initialisation
appName = 'twt150530'
self.logger = logging.getLogger(appName)
#self.logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
logPath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data'
fileName = appName
fileHandler = logging.FileHandler("{0}/{1}.log".
format(logPath, fileName))
formatter = logging.Formatter('%(asctime)s - %(name)s -
%(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
self.logger.addHandler(fileHandler)
self.logger.setLevel(logging.DEBUG)
4.初始化JSON檔案的持久化指令:
# Save to JSON file initialisation
jsonFpath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/
examples/AN_Spark/data'
jsonFname = 'twtr15053001'
self.jsonSaver = IO_json(jsonFpath, jsonFname)
5.初始化 MongoDB database 和 collection :
# Save to MongoDB Intitialisation
self.mongoSaver = IO_mongo(db='twtr01_db', coll='twtr01_
coll')
6.searchTwitter 方法 根據指定的查詢條件搜尋:
def searchTwitter(self, q, max_res=10,**kwargs):
search_results = self.api.search.tweets(q=q, count=10,
**kwargs)
statuses = search_results['statuses']
max_results = min(1000, max_res)
for _ in range(10):
try:
next_results = search_results['search_metadata']
['next_results']
# self.logger.info('info' in searchTwitter - next_
results:%s'% next_results[1:])
except KeyError as e:
self.logger.error('error' in searchTwitter: %s',%(e))
break
# next_results = urlparse.parse_qsl(next_results[1:])
# python 2.7
next_results = urllib.parse.parse_qsl(next_results[1:])
# self.logger.info('info' in searchTwitter - next_
results[max_id]:', next_results[0:])
kwargs = dict(next_results)
# self.logger.info('info' in searchTwitter - next_
results[max_id]:%s'% kwargs['max_id'])
search_results = self.api.search.tweets(**kwargs)
statuses += search_results['statuses']
self.saveTweets(search_results['statuses'])
if len(statuses) > max_results:
self.logger.info('info' in searchTwitter - got %i tweets - max: %i' %(len(statuses), max_results))
break
return statuses
7.saveTweets 方法将所選的tweets 儲存為JSON 存入MongoDB:
def saveTweets(self, statuses):
# Saving to JSON File
self.jsonSaver.save(statuses)
# Saving to MongoDB
for s in statuses:
self.mongoSaver.save(s)
8.parseTweets 方法從Twitter API 提供的大量資訊中提取關鍵的 tweet 資訊:
def parseTweets(self, statuses):
return [ (status['id'],
status['created_at'],
status['user']['id'],
status['user']['name']
status['text''text'],
url['expanded_url'])
for status in statuses
for url in status['entities']['urls']
]
9.getTweets 方法調用searchTwitter,保證API 調用的穩定性并重點關注速率限制。代碼如下:
def getTweets(self, q, max_res=10):
"""
Make a Twitter API call whilst managing rate limit and errors.
"""
def handleError(e, wait_period=2, sleep_when_rate_
limited=True):
if wait_period > 3600: # Seconds
self.logger.error('Too many retries in getTweets:
%s', %(e))
raise e
if e.e.code == 401:
self.logger.error('error 401 * Not Authorised * in
getTweets: %s', %(e))
return None
elif e.e.code == 404:
self.logger.error('error 404 * Not Found * in
getTweets: %s', %(e))
return None
elif e.e.code == 429:
self.logger.error('error 429 * API Rate Limit
Exceeded * in getTweets: %s', %(e))
if sleep_when_rate_limited:
self.logger.error('error 429 * Retrying in 15
minutes * in getTweets: %s', %(e))
sys.stderr.flush()
time.sleep(60*15 + 5)
self.logger.info('error 429 * Retrying now *
in getTweets: %s', %(e))
return 2
else:
raise e # Caller must handle the rate limiting issue
elif e.e.code in (500, 502, 503, 504):
self.logger.info('Encountered %i Error. Retrying
in %i seconds' % (e.e.code, wait_period))
time.sleep(wait_period)
wait_period *= 1.5
return wait_period
else:
self.logger.error('Exit - aborting - %s', %(e))
raise e
10.根據指定的參數查詢調用searchTwitter API . 如果遇到了任何錯誤, 由handleError 方法處理:
while True:
try:
self.searchTwitter( q, max_res=10)
except twitter.api.TwitterHTTPError as e:
error_count = 0
wait_period = handleError(e, wait_period)
if wait_period is None:
return
使用Blaze探索資料
Blaze是個由Continuum.io,開發的 Python庫 ,利用了 Python Numpy arrays 和 Pandas dataframe. Blaze 擴充到多核計算, 而Pandas 和 Numpy 是單核的.
Blaze 為各種後端提供了統一适配的一緻性使用者接口. Blaze 精心安排了:
• Data: 不同資料存儲的無縫交換如 CSV, JSON, HDF5, HDFS, 和 Bcolz 檔案
• Computation: 對不同的後端采用同樣的查詢方式如 Spark, MongoDB, Pandas, or SQL Alchemy.
• Symbolic expressions: 在一定範圍内使用了與Pandas類似的文法來抽象表達 join, group-by, filter, selection, 和注入,參考R語言實作了 split-apply-combine 方法. Blaze 表達式 和Spark RDD 資料轉換一緻,采用延遲計算.
深入 Blaze首先要引入所需的庫: numpy, pandas, blaze 和 odo. Odo 是 Blaze的一個派生品保證了各種資料後端的資料移植,指令如下:
import numpy as np
import pandas as pd
from blaze import Data, by, join, merge
from odo import odo
BokehJS successfully loaded.
讀取存儲在CSV檔案中解析過的tweets 生成Pandas Dataframe:
twts_csv:
twts_pd_df = pd.DataFrame(twts_csv_read, columns=Tweet01._fields)
twts_pd_df.head()
Out[65]:
id created_at user_id user_name tweet_text url
1 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
2 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
3 98808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://www.webex.com/ciscospark/
4 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://sparkjava.com/
運作Tweets Panda Dataframe 的 describe() 函數 獲得資料集中的信心:
twts_pd_df.describe()
Out[66]:
id created_at user_id user_name tweet_text url
count 19 19 19 19 19 19
unique 7 7 6 6 6 7
top 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://bit.ly/1Hfd0Xm
freq 6 6 9 9 6 6
簡單的調用Data() 函數将Pandas dataframe 轉化為一個 Blaze dataframe:
#
# Blaze dataframe
#
twts_bz_df = Data(twts_pd_df)
通過傳遞schema 函數提取一個 Blaze dataframe 的schema 表達:
twts_bz_df.schema
Out[73]:
dshape("""{
id: ?string,
created_at: ?string,
user_id: ?string,
user_name: ?string,
tweet_text: ?string,
url: ?string
}""")
.dshape 函數給出一條記錄和schema:
twts_bz_df.dshape
Out[74]:
dshape("""19 * {
id: ?string,
created_at: ?string,
user_id: ?string,
user_name: ?string,
tweet_text: ?string,
url: ?string
}""")
列印Blaze dataframe 的内容:
twts_bz_df.data
Out[75]:
id created_at user_id user_name tweet_text url
1 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
2 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
...
18 598782970082807808 2015-05-14 09:32:39 1377652806
embeddedcomputer.nl RT @BigDataTechCon: Moving Rating
Prediction w... http://buff.ly/1QBpk8J
19 598777933730160640 2015-05-14 09:12:38 294862170 Ellen
Friedman I'm still on Euro time. If you are too check o...
http://bit.ly/1Hfd0Xm
提取 tweet_text 字段,獲得唯一的值:
twts_bz_df.tweet_text.distinct()
Out[76]:
tweet_text
0 RT @pacoid: Great recap of @StrataConf EU in L...
1 RT @alvaroagea: Simply @ApacheSpark http://t.c...
2 RT @PrabhaGana: What exactly is @ApacheSpark a...
3 RT @Ellen_Friedman: I'm still on Euro time. If...
4 RT @BigDataTechCon: Moving Rating Prediction w...
5 I'm still on Euro time. If you are too check o...
從dataframe 中提取了多個字段 [‘id’, ‘user_name’,’tweet_text’] 并計算唯一的記錄:
twts_bz_df[['id', 'user_name','tweet_text']].distinct()
Out[78]:
id user_name tweet_text
0 598831111406510082 raulsaeztapia RT @pacoid: Great recap of @
StrataConf EU in L...
1 598808944719593472 raulsaeztapia RT @alvaroagea: Simply @
ApacheSpark http://t.c...
2 598796205091500032 John Humphreys RT @PrabhaGana: What exactly
is @ApacheSpark a...
3 598788561127735296 Leonardo D'Ambrosi RT @Ellen_Friedman: I'm
still on Euro time. If...
4 598785545557438464 Alexey Kosenkov RT @Ellen_Friedman: I'm
still on Euro time. If...
5 598782970082807808 embeddedcomputer.nl RT @BigDataTechCon:
Moving Rating Prediction w...
6 598777933730160640 Ellen Friedman I'm still on Euro time. If
you are too check o...
使用 Odo傳輸資料
Odo 是Blaze的一個衍生項目. 用于資料交換,保證了各種不同格式資料間的移植 (CSV, JSON, HDFS, and more) 并且跨越不同的資料庫 (SQL 資料庫, MongoDB, 等等) ,用法簡單,Odo(source, target) 為了 傳輸到一個資料庫,需要指定URL位址. 例如, MongoDB , 用法如下:
mongodb://username:password@hostname:port/database_name::collection_name
使用Odo 運作一些例子,這裡通過讀取CSV檔案并建立一個 Blaze dataframe來展示Odo的用法:
filepath = csvFpath
filename = csvFname
filesuffix = csvSuffix
twts_odo_df = Data('{0}/{1}.{2}'.format(filepath, filename,
filesuffix))
計算 dataframe中的記錄個數:
twts_odo_df.count()
Out[81]:
19
顯示dataframe中最初的5條記錄:
twts_odo_df.head(5)
Out[82]:
id created_at user_id user_name tweet_text url
0 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
2 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c... http://www.webex.com/ciscospark/
3 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://sparkjava.com/
4 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
https://www.sparkfun.com/
從dataframe 中獲得 dshape 的資訊 , 這裡得到記錄的個數和 schema:
twts_odo_df.dshape
Out[83]:
dshape("var * {
id: int64,
created_at: ?datetime,
user_id: int64,
user_name: ?string,
tweet_text: ?string,
url: ?string
}""")
将處理過的 Blaze dataframe 存入 JSON:
odo(twts_odo_distinct_df, '{0}/{1}.{2}'.format(jsonFpath, jsonFname,
jsonSuffix))
Out[92]:
<odo.backends.json.JSONLines at 0x7f77f0abfc50>
轉換JSON 檔案為 CSV 檔案:
odo('{0}/{1}.{2}'.format(jsonFpath, jsonFname, jsonSuffix), '{0}/{1}.{2}'.format(csvFpath, csvFname, csvSuffix))
Out[94]:
<odo.backends.csv.CSV at 0x7f77f0abfe10>
使用Spark SQL探索資料
Spark SQL 是建立在Spark 核心之上的關系型查詢引擎. Spark SQL 使用的查詢優化叫 Catalyst.
關系型查詢使用 SQL 或HiveQL 表達,在 JSON, CSV, 和各種資料庫中查詢. Spark SQL 為 RDD 函數式程式設計之上的Spark dataframes 提供了完整的聲明式表達.
了解 Spark dataframe
從 @bigdata 而來的一個tweet 意味着 Spark SQL和 dataframes都可以使用了,參見圖中下方的各種資料源. 在頂部, R作為一個新的語言在Scala, Java和Python之後将逐漸被支援. 最終, Data Frame 機理遍布在 R, Python, 和 Spark 中.
Spark dataframes 從 SchemaRDDs 中産生. 它結合了 RDD 和可以被Spark 推導 schema, 注冊過的dataframe 才能被請求,允許通過直白的SQL 完成複雜嵌套的JSON 資料查詢,同時支援 延遲計算, lineage,分區,和持久化.
通過 Spark SQL 查詢資料, 首先要導入 SparkContext 和SQLContex:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
In [95]:
sc
Out[95]:
<pyspark.context.SparkContext at 0x7f7829581890>
In [96]:
sc.master
Out[96]:
u'local[*]'
''In [98]:
# Instantiate Spark SQL context
sqlc = SQLContext(sc)
讀取存儲在 Odo中的JSON檔案:
twts_sql_df_01 = sqlc.jsonFile ("/home/an/spark/spark-1.3.0-bin-
hadoop2.4/examples/AN_Spark/data/twtr15051401_distinct.json")
In [101]:
twts_sql_df_01.show()
created_at id tweet_text user_id
user_name
2015-05-14T12:43:57Z 598831111406510082 RT @pacoid: Great... 14755521
raulsaeztapia
2015-05-14T11:15:52Z 598808944719593472 RT @alvaroagea: S... 14755521
raulsaeztapia
2015-05-14T10:25:15Z 598796205091500032 RT @PrabhaGana: W... 48695135
John Humphreys
2015-05-14T09:54:52Z 598788561127735296 RT @Ellen_Friedma...
2385931712 Leonardo D'Ambrosi
2015-05-14T09:42:53Z 598785545557438464 RT @Ellen_Friedma... 461020977
Alexey Kosenkov
2015-05-14T09:32:39Z 598782970082807808 RT @BigDataTechCo...
1377652806 embeddedcomputer.nl
2015-05-14T09:12:38Z 598777933730160640 I'm still on Euro... 294862170
Ellen Friedman
列印 Spark dataframe 的schema:
twts_sql_df_01.printSchema()
root
|-- created_at: string (nullable = true)
|-- id: long (nullable = true)
|-- tweet_text: string (nullable = true)
|-- user_id: long (nullable = true)
|-- user_name: string (nullable = true)
從dataframe中選擇 user_name 字段:
twts_sql_df_01.select('user_name').show()
user_name
raulsaeztapia
raulsaeztapia
John Humphreys
Leonardo D'Ambrosi
Alexey Kosenkov
embeddedcomputer.nl
Ellen Friedman
将 dataframe 注冊成一個表 ,在上執行一個 SQL 查詢:
twts_sql_df_01.registerAsTable('tweets_01')
可以處理更複雜的 JSON; 讀取原始的 Twitter JSON 檔案:
twts_sql_df_01_selection = sqlc.sql("SELECT * FROM tweets_01 WHERE
user_name = 'raulsaeztapia'")
In [109]:
twts_sql_df_01_selection.show()
created_at id tweet_text user_id
user_name
2015-05-14T12:43:57Z 598831111406510082 RT @pacoid: Great... 14755521
raulsaeztapia
2015-05-14T11:15:52Z 598808944719593472 RT @alvaroagea: S... 14755521
raulsaeztapia
Let's process some more complex JSON; we read the original Twitter JSON file:
tweets_sqlc_inf = sqlc.jsonFile(infile)
Spark SQL is able to infer the schema of a complex nested JSON file:
tweets_sqlc_inf.printSchema()
root
|-- contributors: string (nullable = true)
|-- coordinates: string (nullable = true)
|-- created_at: string (nullable = true)
|-- entities: struct (nullable = true)
| |-- hashtags: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- indices: array (nullable = true)
| | | | |-- element: long (containsNull = true)
| | | |-- text: string (nullable = true)
| |-- media: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- display_url: string (nullable = true)
| | | |-- expanded_url: string (nullable = true)
| | | |-- id: long (nullable = true)
| | | |-- id_str: string (nullable = true)
| | | |-- indices: array (nullable = true)
... (snip) ...
| |-- statuses_count: long (nullable = true)
| |-- time_zone: string (nullable = true)
| |-- url: string (nullable = true)
| |-- utc_offset: long (nullable = true)
| |-- verified: boolean (nullable = true)
從dataframe 標明列中讀取感興趣的關鍵資訊 (本例中, [‘created_at’, ‘id’, ‘text’, ‘user.id’, ‘user.name’, ‘entities.urls.expanded_url’]):
tweets_extract_sqlc = tweets_sqlc_inf[['created_at', 'id', 'text',
'user.id', 'user.name', 'entities.urls.expanded_url']].distinct()
In [145]:
tweets_extract_sqlc.show()
created_at id text id
name expanded_url
Thu May 14 09:32:... 598782970082807808 RT @BigDataTechCo...
1377652806 embeddedcomputer.nl ArrayBuffer(http:...
Thu May 14 12:43:... 598831111406510082 RT @pacoid: Great... 14755521
raulsaeztapia ArrayBuffer(http:...
Thu May 14 12:18:... 598824733086523393 @rabbitonweb spea...
...
Thu May 14 12:28:... 598827171168264192 RT @baandrzejczak... 20909005
Paweł Szulc ArrayBuffer()
了解Spark SQL query optimizer
在 dataframe 中執行SQL 語句:
tweets_extract_sqlc_sel = sqlc.sql("SELECT * from Tweets_xtr_001 WHERE
name='raulsaeztapia'")
看一下Spark SQL 執行查詢計劃的細節:
• 解析
• 分析
• 優化
• 實體查詢
查詢計劃使用了 Spark SQL’s Catalyst 優化器. 為了沖查詢部分生成編譯過的位元組碼, Catalyst 優化器在實體計劃評估後根據成本執行解析和優化.
在tweet中的解釋:
回顧一下代碼, 在執行Spark SQL 查詢時調用 .explain 函數, 給出了 Catalyst optimizer之行時的全部細節:
tweets_extract_sqlc_sel.explain(extended = True)
== Parsed Logical Plan ==
'Project [*]
'Filter ('name = raulsaeztapia)'name' 'UnresolvedRelation' [Tweets_
xtr_001], None
== Analyzed Logical Plan ==
Project [created_at#7,id#12L,text#27,id#80L,name#81,expanded_url#82]
Filter (name#81 = raulsaeztapia)
Distinct
Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.
name AS name#81,entities#8.urls.expanded_url AS expanded_url#82]
Relation[contributors#5,coordinates#6,created_
at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_
str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_
to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#
18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#2
3L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,us
er#29] JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/
AN_Spark/data/twtr15051401.json,1.0,None)
== Optimized Logical Plan ==
Filter (name#81 = raulsaeztapia)
Distinct
Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.
name AS name#81,entities#8.urls.expanded_url AS expanded_url#82]
Relation[contributors#5,coordinates#6,created_
at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_
str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_
to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#
18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#2
3L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,us
er#29] JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/
AN_Spark/data/twtr15051401.json,1.0,None)
== Physical Plan ==
Filter (name#81 = raulsaeztapia)
Distinct false
Exchange (HashPartitioning [created_at#7,id#12L,text#27,id#80L,name#
81,expanded_url#82], 200)
Distinct true
Project [created_at#7,id#12L,text#27,user#29.id AS id#80L,user#29.
name AS name#81,entities#8.urls.expanded_url AS expanded_url#82]
PhysicalRDD [contributors#5,coordinates#6,created_
at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#
18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#2
3L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,us
er#29], MapPartitionsRDD[165] at map at JsonRDD.scala:41
Code Generation: false
== RDD ==
最後, 這裡是查詢的結果:
tweets_extract_sqlc_sel.show()
created_at id text id
name expanded_url
Thu May 14 12:43:... 598831111406510082 RT @pacoid: Great... 14755521
raulsaeztapia ArrayBuffer(http:...
Thu May 14 11:15:... 598808944719593472 RT @alvaroagea: S... 14755521
raulsaeztapia ArrayBuffer(http:...
In [148]:
用Spark SQL 加載和處理 CSV files with Spark
使用 Spark 的 spark-csv_2.11:1.2.0 包. 在IPython Notebook 啟動PySpark 需要準确地通過 –packages 指定 spark-csv 的包名:
$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/
pyspark --packages com.databricks:spark-csv_2.11:1.2.0
這觸發了下面的輸出; 可以看到 spark-csv 包使用的所有依賴:
an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$ IPYTHON_
OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark
--packages com.databricks:spark-csv_2.11:1.2.0
... (snip) ...
Ivy Default Cache set to: /home/an/.ivy2/cache
The jars for the packages stored in: /home/an/.ivy2/jars
:: loading settings :: url = jar:file:/home/an/spark/spark-1.5.0-bin-
hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/
core/settings/ivysettings.xml
com.databricks#spark-csv_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.databricks#spark-csv_2.11;1.2.0 in central
found org.apache.commons#commons-csv;1.1 in central
found com.univocity#univocity-parsers;1.5.1 in central
:: resolution report :: resolve 835ms :: artifacts dl 48ms
:: modules in use:
com.databricks#spark-csv_2.11;1.2.0 from central in [default]
com.univocity#univocity-parsers;1.5.1 from central in [default]
org.apache.commons#commons-csv;1.1 from central in [default]
----------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
----------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0
----------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/45ms)
We are now ready to load our csv file and process it. Let’s first import the
SQLContext:
#
# Read csv in a Spark DF
#
sqlContext = SQLContext(sc)
spdf_in = sqlContext.read.format('com.databricks.spark.csv')\
.options(delimiter=";").
options(header="true")\
.options(header='true').load(csv_
in)
通路從加載的CSV中建立的 dataframe 的schema:
In [10]:
spdf_in.printSchema()
root
|-- : string (nullable = true)
|-- id: string (nullable = true)
|-- created_at: string (nullable = true)
|-- user_id: string (nullable = true)
|-- user_name: string (nullable = true)
|-- tweet_text: string (nullable = true)
檢查 dataframe的列:
In [12]:
spdf_in.columns
Out[12]:
['', 'id', 'created_at', 'user_id', 'user_name', 'tweet_text']
審視一下 dataframe 的内容:
In [13]:
spdf_in.show()
+---+------------------+--------------------+----------+--------------
----+--------------------+
| | id| created_at| user_id| user_
name| tweet_text|
+---+------------------+--------------------+----------+--------------
----+--------------------+
| 0|638830426971181057|Tue Sep 01 21:46:...|3276255125| True
Equality|ernestsgantt: Bey...|
| 1|638830426727911424|Tue Sep 01 21:46:...|3276255125| True
Equality|ernestsgantt: Bey...|
| 2|638830425402556417|Tue Sep 01 21:46:...|3276255125| True
Equality|ernestsgantt: Bey...|
... (snip) ...
| 41|638830280988426250|Tue Sep 01 21:46:...| 951081582| Jack
Baldwin|RT @cloudaus: We ...|
| 42|638830276626399232|Tue Sep 01 21:46:...| 6525302|Masayoshi
Nakamura|PynamoDB使使使使使使使 |
+---+------------------+--------------------+----------+--------------
----+--------------------+
only showing top 20 rows
通過Spark SQL查詢MangoDB
有兩個方法完成MongoDB 和Spark的互動: 首先是通過 Hadoop MongoDB connector, 第二種的直接通路. 第一種方法需要搭建一個 Hadoop 環境才能從 Hadoop MongoDB connector中完成查詢. onnector 托管在GitHub 上
https://github.com/mongodb/mongo-hadoop/
wiki/Spark-Usage.
MongoDB 發表了一系列的官方部落格描述了真實的使用場景:
• Using MongoDB with Hadoop & Spark: Part 1 - Introduction & Setup (https:// www.mongodb.com/blog/post/using-mongodb-hadoop-spark-part-1- introduction-setup)
• Using MongoDB with Hadoop and Spark: Part 2 - Hive Example (
https://www. mongodb.com/blog/post/using-mongodb-hadoop-spark-part-2-hive- example)
• Using MongoDB with Hadoop & Spark: Part 3 - Spark Example & Key Takeaways (
https://www.mongodb.com/blog/post/using-mongodb-hadoop-spark-part-3-spark-example-key-takeaways)
搭建一個完整的 Hadoop 環境是個力氣活. 使用第二種方法,利用Stratio開發并維護的spark-mongodb ,這是托管在Spark.packages.org.上的 Stratio spark-mongodb 包. 該包的版本和相關資訊可以從 spark.packages.org :
Releases
Version: 0.10.1 ( 8263c8 | zip | jar ) / Date: 2015-11-18 / License:
Apache-2.0 / Scala version: 2.10
(
http://spark-packages.org/package/Stratio/spark-mongodb )
在IPython Notebook 中啟動PySpark, 同時準确指定spark-mongodb 的包名:
$ IPYTHON_OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/
pyspark --packages com.stratio.datasource:spark-mongodb_2.10:0.10.1
這将觸發下面的輸出; 可以看到 spark-mongodb 包的所有依賴:
an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$ IPYTHON_
OPTS='notebook' /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark
--packages com.stratio.datasource:spark-mongodb_2.10:0.10.1
... (snip) ...
Ivy Default Cache set to: /home/an/.ivy2/cache
The jars for the packages stored in: /home/an/.ivy2/jars
:: loading settings :: url = jar:file:/home/an/spark/spark-1.5.0-bin-
hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/
core/settings/ivysettings.xml
com.stratio.datasource#spark-mongodb_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.stratio.datasource#spark-mongodb_2.10;0.10.1 in central
[W 22:10:50.910 NotebookApp] Timeout waiting for kernel_info reply
from 764081d3-baf9-4978-ad89-7735e6323cb6
found org.mongodb#casbah-commons_2.10;2.8.0 in central
found com.github.nscala-time#nscala-time_2.10;1.0.0 in central
found joda-time#joda-time;2.3 in central
found org.joda#joda-convert;1.2 in central
found org.slf4j#slf4j-api;1.6.0 in central
found org.mongodb#mongo-java-driver;2.13.0 in central
found org.mongodb#casbah-query_2.10;2.8.0 in central
found org.mongodb#casbah-core_2.10;2.8.0 in central
downloading https://repo1.maven.org/maven2/com/stratio/datasource/
park-mongodb_2.10/0.10.1/spark-mongodb_2.10-0.10.1.jar...
[SUCCESSFUL ] com.stratio.datasource#spark-
mongodb_2.10;0.10.1!spark-mongodb_2.10.jar (3130ms)
downloading https://repo1.maven.org/maven2/org/mongodb/casbah-
ommons_2.10/2.8.0/casbah-commons_2.10-2.8.0.jar...
[SUCCESSFUL ] org.mongodb#casbah-commons_2.10;2.8.0!casbah-
commons_2.10.jar (2812ms)
downloading https://repo1.maven.org/maven2/org/mongodb/casbah-
uery_2.10/2.8.0/casbah-query_2.10-2.8.0.jar...
[SUCCESSFUL ] org.mongodb#casbah-query_2.10;2.8.0!casbah-query_2.10.
jar (1432ms)
downloading https://repo1.maven.org/maven2/org/mongodb/casbah-
ore_2.10/2.8.0/casbah-core_2.10-2.8.0.jar...
[SUCCESSFUL ] org.mongodb#casbah-core_2.10;2.8.0!casbah-core_2.10.
jar (2785ms)
downloading https://repo1.maven.org/maven2/com/github/nscala-time/
scala-time_2.10/1.0.0/nscala-time_2.10-1.0.0.jar...
[SUCCESSFUL ] com.github.nscala-time#nscala-time_2.10;1.0.0!nscala-
time_2.10.jar (2725ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.6.0/
slf4j-api-1.6.0.jar ...
[SUCCESSFUL ] org.slf4j#slf4j-api;1.6.0!slf4j-api.jar (371ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongo-java-
driver/2.13.0/mongo-java-driver-2.13.0.jar ...
[SUCCESSFUL ] org.mongodb#mongo-java-driver;2.13.0!mongo-java-
driver.jar (5259ms)
downloading https://repo1.maven.org/maven2/joda-time/joda-time/2.3/
joda-time-2.3.jar ...
[SUCCESSFUL ] joda-time#joda-time;2.3!joda-time.jar (6949ms)
downloading https://repo1.maven.org/maven2/org/joda/joda-convert/1.2/
joda-convert-1.2.jar ...
[SUCCESSFUL ] org.joda#joda-convert;1.2!joda-convert.jar (548ms)
:: resolution report :: resolve 11850ms :: artifacts dl 26075ms
:: modules in use:
com.github.nscala-time#nscala-time_2.10;1.0.0 from central in
[default]
com.stratio.datasource#spark-mongodb_2.10;0.10.1 from central in
[default]
joda-time#joda-time;2.3 from central in [default]
org.joda#joda-convert;1.2 from central in [default]
org.mongodb#casbah-commons_2.10;2.8.0 from central in [default]
org.mongodb#casbah-core_2.10;2.8.0 from central in [default]
org.mongodb#casbah-query_2.10;2.8.0 from central in [default]
org.mongodb#mongo-java-driver;2.13.0 from central in [default]
org.slf4j#slf4j-api;1.6.0 from central in [default]
-----------------------------------------------------------------
| | modules || artifacts
|
| conf | number| search|dwnlded|evicted||
number|dwnlded|
-------------------------------------------------------------------
--
| default | 9 | 9 | 9 | 0 || 9 | 9
|
-------------------------------------------------------------------
--
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
9 artifacts copied, 0 already retrieved (2335kB/51ms)
... (snip) ...
查詢MongDB的27017端口,從twtr01_db中的collection twtr01_coll讀取資料。
首先import SQLContext:
In [5]:
from pyspark.sql import SQLContext
sqlContext.sql("CREATE TEMPORARY TABLE tweet_table USING com.stratio.datasource.mongodb OPTIONS (host 'localhost:27017', database 'twtr01_
db', collection 'twtr01_coll')")
sqlContext.sql("SELECT * FROM tweet_table where id=598830778269769728
").collect()
這裡是查詢輸出:
Out[5]:
[Row(text=u'@spark_io is now @particle - awesome news - now I can enjoy my Particle Cores/Photons + @sparkfun sensors + @ApacheSpark analytics :-)', _id=u'55aa640fd770871cba74cb88', contributors=None,
retweeted=False, user=Row(contributors_enabled=False, created_at=u'Mon Aug 25 14:01:26 +0000 2008', default_profile=True, default_profile_image=False, description=u'Building open source tools for and teaching
enterprise software developers', entities=Row(description=Row(urls=[]), url=Row(urls=[Row(url=u'http://t.co/TSHp13EWeu', indices=[0,22],
... (snip) ...
9], name=u'Spark is Particle', screen_name=u'spark_io'),
Row(id=487010011, id_str=u'487010011', indices=[17, 26],
name=u'Particle', screen_name=u'particle'), Row(id=17877351,
id_str=u'17877351', indices=[88, 97], name=u'SparkFun
Electronics', screen_name=u'sparkfun'), Row(id=1551361069, id_
str=u'1551361069', indices=[108, 120], name=u'Apache Spark', screen_name=u'ApacheSpark')]), is_quote_status=None, lang=u'en', quoted_status_id_str=None, quoted_status_id=None, created_at=u'Thu May 14 12:42:37 +0000 2015', retweeted_status=None, truncated=False,
place=None, id=598830778269769728, in_reply_to_user_id=3187046084,
retweet_count=0, in_reply_to_status_id=None, in_reply_to_screen_name=u'spark_io', in_reply_to_user_id_str=u'3187046084', source=u'<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>',
id_str=u'598830778269769728', coordinates=None, metadata=Row(iso_language_code=u'en', result_type=u'recent'), quoted_status=None)]
#