天天看點

python 寫入es_如何使用python将Spark資料寫入ElasticSearch

這裡以将Apache的日志寫入到ElasticSearch為例,來示範一下如何使用Python将Spark資料導入到ES中。

實際工作中,由于資料與使用架構或技術的複雜性,資料的寫入變得比較複雜,在這裡我們簡單示範一下。

如果使用Scala或Java的話,Spark提供自帶了支援寫入ES的支援庫,但Python不支援。是以首先你需要去這裡下載下傳依賴的ES官方開發的依賴包包。

下載下傳完成後,放在本地目錄,以下面指令方式啟動pyspark:pyspark --jars elasticsearch-hadoop-6.4.1.jar

如果你想pyspark使用Python3,請設定環境變量:export PYSPARK_PYTHON=/usr/bin/python3

了解如何寫入ES的關鍵是要明白,ES是一個JSON格式的資料庫,它有一個必須的要求。資料格式必須采用以下格式 { "id: { the rest of your json}}

往下會展示如何轉換成這種格式。

解析Apache日志檔案

我們将Apache的日志檔案讀入,建構Spark RDD。然後我們寫一個parse()函數用正規表達式處理每條日志,提取我們需要的字rdd = sc.textFile("/home/ubuntu/walker/apache_logs")

regex='^(\S ) (\S ) (\S ) \[([\w:/] \s[ \-]\d{4})\] "(\S )\s?(\S )?\s?(\S )?" (\d{3}|-) (\d |-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'p=re.compile(regex)

def parse(str):

s=p.match(str)

d = {}

d['ip']=s.group(1)

d['date']=s.group(4)

d['operation']=s.group(5)

d['uri']=s.group(6)

return d

換句話說,我們剛開始從日志檔案讀入RDD的資料類似如下:['83.149.9.216 - - [17/May/2015:10:05:03 0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']

然後我們使用map函數轉換每條記錄:rdd2 = rdd.map(parse)

rdd2.take(1)

[{'date': '17/May/2015:10:05:03 0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]

現在看起來像JSON,但并不是JSON字元串,我們需要使用json.dumps将dict對象轉換。

我們同時增加一個doc_id字段作為整個JSON的ID。在配置ES中我們增加如下配置“es.mapping.id”: “doc_id”告訴ES我們将這個字段作為ID。

這裡我們使用SHA算法,将這個JSON字元串作為參數,得到一個唯一ID。

計算結果類似如下,可以看到ID是一個很長的SHA數值。rdd3.take(1)

[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]

現在我們需要制定ES配置,比較重要的兩項是:“es.resource” : ‘walker/apache’: "walker"是索引,apache是類型,兩者一般合稱索引

“es.mapping.id”: “doc_id”: 告訴ES那個字段作為整個文檔的ID,也就是查詢結果中的_id

其他的配置自己去探索。

然後我們使用saveAsNewAPIHadoopFile()将RDD寫入到ES。這部分代碼對于所有的ES都是一樣的,比較固定,不需要了解每一個細節es_write_conf = {

"es.nodes" : "localhost",

"es.port" : "9200",

"es.resource" : 'walker/apache',

"es.input.json": "yes",

"es.mapping.id": "doc_id"

}

rdd3.saveAsNewAPIHadoopFile(

path='-',

outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",

valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",

conf=es_write_conf)

rdd3 = rdd2.map(addID)

def addId(data):

j=json.dumps(data).encode('ascii', 'ignore')

data['doc_id'] = hashlib.sha224(j).hexdigest()

return (data['doc_id'], json.dumps(data))

最後我們可以使用curl進行查詢curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*

{

"_index" : "walker",

"_type" : "apache",

"_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",

"_score" : 1.0,

"_source" : {

"date" : "17/May/2015:10:05:32 0000",

"ip" : "91.177.205.119",

"operation" : "GET",

"doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",

"uri" : "/favicon.ico"

}

如下是所有代碼:import json

import hashlib

import re

def addId(data):

j=json.dumps(data).encode('ascii', 'ignore')

data['doc_id'] = hashlib.sha224(j).hexdigest()

return (data['doc_id'], json.dumps(data))

def parse(str):

s=p.match(str)

d = {}

d['ip']=s.group(1)

d['date']=s.group(4)

d['operation']=s.group(5)

d['uri']=s.group(6)

return d

regex='^(\S ) (\S ) (\S ) \[([\w:/] \s[ \-]\d{4})\] "(\S )\s?(\S )?\s?(\S )?" (\d{3}|-) (\d |-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'

p=re.compile(regex)

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")

rdd2 = rdd.map(parse)

rdd3 = rdd2.map(addID)

es_write_conf = {

"es.nodes" : "localhost",

"es.port" : "9200",

"es.resource" : 'walker/apache',

"es.input.json": "yes",

"es.mapping.id": "doc_id"

}

rdd3.saveAsNewAPIHadoopFile(

path='-',

outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",

valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",

conf=es_write_conf)https://www.icode9.com/content-1-677701.html