天天看点

Spark_rdd_and_map_reduce基本操作

from __future__ import print_function, division
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local") \
   .appName("test") \
   .enableHiveSupport() \
   .getOrCreate()

sc = spark.sparkContext
           

Part1. RDD 的基本操作

RDD 由于采分散式架构,在计算以及操作是使用 Map 与 Reduce 的方式,与一般单执行绪程式逻辑不同.

本节将介绍基本的 Map 与 Reduce 等基本指令,让各位同学能够熟悉如何操作 RDD.

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, )
print(type(wordsRDD))
           
<class 'pyspark.rdd.RDD'>
           

map (Transform)

map 能將 fuction 套用在 rdd 中的每個元素上

def makePlural(word):
    """Adds an 's' to `word`.

    Note:
        This is a simple function that only adds an 's'.  No attempt is made to follow proper
        pluralization rules.

    Args:
        word (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return word + 's'

print(makePlural('cat'))
           
cats
           
appliedRDD = wordsRDD.map(makePlural)
           
print(type(appliedRDD))
           
<class 'pyspark.rdd.PipelinedRDD'>
           

Collect (Action)

將 RDD 元素送回 Master 並回傳為 List

print(appliedRDD.collect())
print(type(appliedRDD.collect()))
           
['cats', 'elephants', 'rats', 'rats', 'cats']
<type 'list'>
           

使用 Lambda function

Lambda function 亦为匿名函数,并不另外定义函数,直接透过 lambda 来宣告函数逻辑.

lambdaRDD = wordsRDD.map(lambda word: word + 's')
print(lambdaRDD.collect())
           
['cats', 'elephants', 'rats', 'rats', 'cats']
           

小练习: 计算每个单字长度

lenofRDD = (wordsRDD
            .map(lambda word: len(word))
            .collect())
print(lenofRDD)
           
[3, 8, 3, 3, 3]
           

Part2. 使用 Pair RDD 来做计算

Pair RDD

Pair RDD 是一种以 (key, value) 方式储存的RDD

pairRDD = wordsRDD.map(lambda word: (word, ))
           
print(pairRDD.take())
print(pairRDD)
           
[('cat', 1)]
PythonRDD[6] at RDD at PythonRDD.scala:48
           

groupByKey(Transform)

将资料依照 Key 值重新排序

wordsGrouped = pairRDD.groupByKey()
for key, value in wordsGrouped.collect():
    print('{0}: {1}'.format(key, list(value)))
           
rat: [1, 1]
elephant: [1]
cat: [1, 1]
           

依照 key 值加总

wordCountsGrouped = wordsGrouped.map(lambda (k,v): (k, sum(v)))
           
wordCountsGrouped.collect()
           
[('rat', 2), ('elephant', 1), ('cat', 2)]
           

更好的解决方案!

reduceBykey (Transform)

reduceByKey 能够直接将资料根据 key 值聚合,减少多余的交换(shuffle)动作

wordCounts = pairRDD.reduceByKey(lambda a,b: a+b)
print(wordCounts.collect())
           
[('rat', 2), ('elephant', 1), ('cat', 2)]
           

小练习: 现在你也会写 wordcount

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordRDD = sc.parallelize(wordsList)
           
wordCountRDD = (wordRDD
                 .map(lambda x: (x, ))
                 .reduceByKey(lambda x ,y : x + y))
           
wordCountRDD.collect()
           
[('rat', 2), ('cat', 2), ('elephant', 1)]
           

Part3. 寻找不重复值

RDD 提供了多样的现成 API 可供使用,建议先查找官方文件了解现有的 API,避免重复造轮.

disticnt (Transform)

uniqueWords = wordsRDD.map(lambda word: (word, )).distinct()
print(uniqueWords.collect())
           
[('elephant', 1), ('cat', 1), ('rat', 1)]
           
countUniqueWords = wordsRDD.map(lambda word: (word, )).distinct().count()
print(countUniqueWords)
           
3
           

小作业: 计算每个字平均出现几次

wordsCount = [('cat',),('elephant',),('rat',)]
           
wordCountRDD = sc.parallelize(wordsCount)
           
totalCount = (wordCountRDD
                .map(lambda (x,y): y)
                .reduce(lambda x, y: x + y))
           
average = totalCount / (wordCountRDD.distinct().count())
           
print(average)
           
1.66666666667
           

Part4. Rdd 也可以当作函数的参数

def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (wordListRDD
           .map(lambda a : (a,))
           .reduceByKey(lambda a,b: a+b))
           
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, )
print(wordCount(wordsRDD).collect())
           
[('rat', 2), ('elephant', 1), ('cat', 2)]
           

Part5. 与 python library 互动

使用 pyspark 最方便的地方就是在于可以直接引用 python 的库,并用在 map 或 reduce 中

from operator import add

totalCount = (wordCounts
              .map(lambda (a,b): b)
              .reduce(add))

print(totalCount)
           
5
           

Part6. Text Mining

结合目前的课程,最后带大家做个 Text Mining 的例子.

需要读取一个外部 text 文字档,清理文档内容,计算使用频次最高的几个词.

6.1 去除符号

import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub("[^a-zA-Z0-9 ]", "", text.strip(" ").lower())
print(removePunctuation('Hi, you!'))
print(removePunctuation(' No under_score!'))
           
hi you
no underscore
           

6.2 从 HDFS 读取档案

!hadoop fs -tail /tmp/shakespear.txt
           
the tillage of thy husbandry?
  Or who is he so fond will be the tomb,
  Of his self-love to stop posterity?
  Thou art thy mother's glass and she in thee
  Calls back the lovely April of her prime,
  So thou through windows of thine age shalt see,
  Despite of wrinkles this thy golden time.
    But if thou live remembered not to be,
    Die single and thine image dies with thee.


                     4
  Unthrifty loveliness why dost thou spend,
  Upon thy self thy beauty's legacy?
  Nature's bequest gives nothing but doth lend,
  And being frank she lends to those are free:
  Then beauteous niggard why dost thou abuse,
  The bounteous largess given thee to give?
  Profitless usurer why dost thou use
  So great a sum of sums yet canst not live?
  For having traffic with thy self alone,
  Thou of thy self thy sweet self dost deceive,
  Then how when nature calls thee to be gone,
  What acceptable audit canst thou leave?
    Thy unused beauty must be tombed with thee,
    Which used lives th' executor to be.
           
filepath = "hdfs:///tmp/shakespear.txt"
shakespeareRDD = (sc
                  .textFile(filepath)
                  .map(removePunctuation))
           
shakespeareRDD.collect()
           
[u'1609',
 u'',
 u'the sonnets',
 u'',
 u'by william shakespeare',
 u'',
 u'',
 u'',
 u'1',
 u'from fairest creatures we desire increase',
 u'that thereby beautys rose might never die',
 u'but as the riper should by time decease',
 u'his tender heir might bear his memory',
 u'but thou contracted to thine own bright eyes',
 u'feedst thy lights flame with selfsubstantial fuel',
 u'making a famine where abundance lies',
 u'thy self thy foe to thy sweet self too cruel',
 u'thou that art now the worlds fresh ornament',
 u'and only herald to the gaudy spring',
 u'within thine own bud buriest thy content',
 u'and tender churl makst waste in niggarding',
 u'pity the world or else this glutton be',
 u'to eat the worlds due by the grave and thee',
 u'',
 u'',
 u'2',
 u'when forty winters shall besiege thy brow',
 u'and dig deep trenches in thy beautys field',
 u'thy youths proud livery so gazed on now',
 u'will be a tattered weed of small worth held',
 u'then being asked where all thy beauty lies',
 u'where all the treasure of thy lusty days',
 u'to say within thine own deep sunken eyes',
 u'were an alleating shame and thriftless praise',
 u'how much more praise deserved thy beautys use',
 u'if thou couldst answer this fair child of mine',
 u'shall sum my count and make my old excuse',
 u'proving his beauty by succession thine',
 u'this were to be new made when thou art old',
 u'and see thy blood warm when thou feelst it cold',
 u'',
 u'',
 u'3',
 u'look in thy glass and tell the face thou viewest',
 u'now is the time that face should form another',
 u'whose fresh repair if now thou not renewest',
 u'thou dost beguile the world unbless some mother',
 u'for where is she so fair whose uneared womb',
 u'disdains the tillage of thy husbandry',
 u'or who is he so fond will be the tomb',
 u'of his selflove to stop posterity',
 u'thou art thy mothers glass and she in thee',
 u'calls back the lovely april of her prime',
 u'so thou through windows of thine age shalt see',
 u'despite of wrinkles this thy golden time',
 u'but if thou live remembered not to be',
 u'die single and thine image dies with thee',
 u'',
 u'',
 u'4',
 u'unthrifty loveliness why dost thou spend',
 u'upon thy self thy beautys legacy',
 u'natures bequest gives nothing but doth lend',
 u'and being frank she lends to those are free',
 u'then beauteous niggard why dost thou abuse',
 u'the bounteous largess given thee to give',
 u'profitless usurer why dost thou use',
 u'so great a sum of sums yet canst not live',
 u'for having traffic with thy self alone',
 u'thou of thy self thy sweet self dost deceive',
 u'then how when nature calls thee to be gone',
 u'what acceptable audit canst thou leave',
 u'thy unused beauty must be tombed with thee',
 u'which used lives th executor to be']
           
print('\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                .take()))
           
0: 1609
1: 
2: the sonnets
3: 
4: by william shakespeare
5: 
6: 
7: 
8: 1
9: from fairest creatures we desire increase
10: that thereby beautys rose might never die
11: but as the riper should by time decease
12: his tender heir might bear his memory
13: but thou contracted to thine own bright eyes
14: feedst thy lights flame with selfsubstantial fuel
           

6.3 切词

shakespeareWordsRDD = shakespeareRDD.flatMap(lambda a: a.split(" "))
shakespeareWordCount = shakespeareWordsRDD.count()
print(shakespeareWordsRDD.top())
print(shakespeareWordCount)
           
[u'youths', u'yet', u'wrinkles', u'worth', u'worlds']
461
           

6.4 移除空白值

shakeWordsRDD = shakespeareWordsRDD.filter(lambda word: len(word) > )
shakeWordCount = shakeWordsRDD.count()
print(shakeWordCount)
           
450
           

6.5 计算字词数并排序

top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(, lambda (a,b): -b)
print('\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts)))
           
thy: 22
thou: 16
the: 15
to: 12
and: 11
of: 10
be: 8
thee: 6
thine: 6
self: 6
dost: 5
so: 5
his: 4
now: 4
where: 4
           

继续阅读