天天看点

apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Apache Spark (Apache Spark)

Apache Spark is an open-source analytics engine and cluster-computing framework that boosts your data processing performance. As they claim, Spark is a lightning-fast unified analytics engine. Spark is entirely written in Scala.

Apache Spark是一个开源分析引擎和集群计算框架,可提高您的数据处理性能。 正如他们所说,Spark是一个闪电般的统一分析引擎。 Spark完全用Scala编写。

Spark is effectively used in the field of Big-Data and Machine Learning for analytical purposes. Spark has been adopted by various companies like Amazon, eBay, and Yahoo.

Spark可有效地用于分析的大数据和机器学习领域。 Spark已被Amazon,eBay和Yahoo等多家公司采用。

Spark的功能 (Features of Spark)

  • Spark is polyglot which means you can utilize Spark using one or more programming languages. Spark provides you with high-level APIs in Java, Python, R, SQL, and Scala. Apache Spark package written in Python is called Pyspark.

    Spark是多语言的 ,这意味着您可以使用一种或多种编程语言来利用Spark。 Spark为您提供Java,Python,R,SQL和Scala的高级API。 用Python编写的Apache Spark软件包称为Pyspark 。

  • Spark supports multiple data formats such as Parquet, CSV (Comma Separated Values), JSON (JavaScript Object Notation), ORC (Optimized Row Columnar), Text files, and RDBMS tables.

    Spark支持多种数据格式,例如Parquet,CSV(逗号分隔值),JSON(JavaScript对象表示法),ORC(优化行列),文本文件和RDBMS表。

  • Spark has low latency because of its in-memory computation. Spark has been designed to deal with humongous data, so scalability is an inherent feature of Spark.

    由于其内存中计算, Spark具有低延迟。 Spark旨在处理庞大的数据,因此可伸缩性是Spark的固有功能。

  • Spark can be seamlessly integrated with Hadoop and has the capability of running on top of Hadoop clusters.

    Spark可以与Hadoop无缝集成,并具有在Hadoop群集之上运行的功能。

Spark如何工作 (How Spark works)

  • Spark uses Master-Slave architecture. The Master node assigns tasks to the slave nodes that reside across the cluster and the slave nodes would execute them.

    Spark使用主从结构 。 主节点将任务分配给跨集群的从节点,从节点将执行任务。

  • A Spark Session must be created to utilize all the functionalities provided by Spark. A Spark Session is created inside the Driver program. The Driver program resides inside the Master node.

    必须创建一个Spark会话以利用Spark提供的所有功能。 在驱动程序内部创建一个Spark会话。 驱动程序位于主节点内。

# Example of creating a Spark Session in Pysparkspark = SparkSession.\
builder.master("local").\
appName("AppName").getOrCreate()
           
  • When you read the DataFrame using Spark Session the DataFrame will be partitioned and stored across the cluster nodes so that it can be operated parallelly. The partitions of the DataFrame are collectively called RDD (Resilient Distributed Datasets). RDDs are fault-tolerant which means it is resilient to failures.

    当您使用Spark Session读取DataFrame时,该DataFrame将在群集节点上进行分区和存储,以便可以并行操作。 DataFrame的分区统称为RDD(弹性分布式数据集) 。 RDD是容错的 ,这意味着它可以容错。

  • When an Action is invoked through the Spark Session, the Spark creates DAG (Directed Acyclic Graph) of transformations (which would be applied to the partitions of data) and implements them by assigning tasks to the Executors in the slave nodes. A transformation is never implemented until an Action is invoked. The tendency of implementing a transformation only when an Action is invoked is called Lazy Evaluation.

    通过Spark会话调用Action时,Spark会创建转换的DAG(有向无环图) (将应用于数据分区),并通过将任务分配给从属节点中的执行器来实现这些转换。 在调用动作之前,永远不会实现转换。 仅在调用动作时才实现转换的趋势称为“ 惰性评估” 。

  • The Driver Program running in the Master node assigns Spark jobs to the slave nodes when an action is invoked. The spark jobs are split into stages and those stages are further split into tasks.

    当调用动作时,在主节点中运行的驱动程序将Spark作业分配给从节点。 火花作业分为多个阶段,而这些阶段又分为多个任务 。

  • The slave nodes contain many Executors that receive tasks and execute them parallelly on the partitions of the data. The executors are the ones which cache data for in-memory computation.

    从节点包含许多执行器 ,它们执行任务并在数据分区上并行执行任务。 执行程序是为数据进行缓存以进行内存计算的执行程序。

火花转换 (Spark Transformations)

The Spark Transformations perform some operations on RDDs and produce new RDD. Various Spark transformations include map, flatMap, filter, groupBy, reduceBy, and join.

Spark转换对RDD执行一些操作并产生新的RDD。 各种Spark转换包括map , flatMap,filter,groupBy,reduceBy和join。

Spark Transformations are further classified into two types,

Spark转换进一步分为两种类型,

  • Narrow transformations

    缩小转换

  • Wide transformations

    广泛的转变

缩小转换 (Narrow transformations)

Spark transformations are called narrow transformations when the operation does not require Shuffling. A Narrow transformation does not require partitions of data to be shuffled across nodes in the cluster.

当操作不需要改组时,火花转换称为窄转换。 窄转换不需要在群集中的节点之间对数据分区进行混洗。

Examples of Narrow transformations are map, flatMap, filter, sample, etc.

窄转换的示例是map,flatMap,filter,sample等。

广泛的转变 (Wide transformations)

Spark transformations are called wide transformations when the operation requires Shuffling. Shuffling is an operation that involves shuffling the partitions of the data across the nodes of the cluster to perform an operation.

当操作需要改组时,Spark转换称为广泛转换。 改组是一种操作,涉及跨集群节点对数据分区进行改组以执行操作。

Examples of Wide transformations are groupBy, reduceBy, join, etc.

宽转换的示例包括groupBy,reduceBy,join等。

  • The groupBy is a transformation in which the values of the column are grouped to form a unique set of values. To perform this operation is costly in distributed environments because all the values to be grouped must be collected from various partitions of data that reside in nodes of the cluster.

    groupBy是一种转换,其中将列的值分组以形成一组唯一的值。 在分布式环境中执行此操作的成本很高,因为必须从位于群集节点中的各种数据分区中收集所有要分组的值。

Spark中的动作 (Actions in Spark)

Actions are operations that trigger Spark Jobs. Spark does not immediately perform transformations. It requires an Action to trigger the implementation of the Spark transformations.

操作是触发Spark Jobs的操作。 Spark不会立即执行转换。 它需要一个Action来触发Spark转换的实现。

Examples of Spark actions are collect, count, take, first, saveAsTextFile, etc.

Spark操作的示例包括collect , count,take,first,saveAsTextFile等。

  • Collect is an action that collects all the partitions of data that resides across the nodes of the cluster and stores them in the Driver that resides in the Master node.

    收集是一项收集群集节点上所有数据分区并将其存储在主节点上的驱动程序中的操作。

Spark Jobs (Spark Jobs)

Spark jobs are triggered when an Action is invoked. Spark jobs are further divided into Stages and Tasks.

调用动作时将触发Spark作业。 Spark作业进一步分为阶段和任务。

阶段 (Stages)

Spark jobs involving wide transformations are grouped together as a stage and the jobs involving narrow transformations are grouped as another stage.

涉及广泛转换的Spark作业被分组为一个阶段,涉及狭窄转换的作业被分组为另一个阶段。

# A Spark Job
df.filter(col('A')).groupBy('A')
           

The entire code above is considered to be a Spark job, in this filter is a separate stage and groupBy is a separate stage because filter is a narrow transformation and groupBy is a wide transformation.

上面的整个代码被认为是Spark作业,在此过滤器中是一个单独的阶段,在groupBy中是一个单独的阶段,因为filter是一个狭窄的转换而groupBy是一个广泛的转换。

任务 (Tasks)

The stages of Spark Jobs are further divided into tasks. The tasks are the operations that are applied to each of the partitions across the nodes of the cluster.

Spark Jobs的阶段进一步分为任务。 任务是应用于集群节点上每个分区的操作。

使用Pyspark进行数据准备 (Data preparation using Pyspark)

安装Pyspark (Install Pyspark)

Pyspark can be installed by executing the following command

可以通过执行以下命令来安装Pyspark

pip install pyspark
           

导入所需的库 (Import the required libraries)

import math
import numpy as np 
import pandas as pd  
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, isnull, asc, desc, mean'''Create a spark session'''
spark = SparkSession.\
builder.\
master("local").appName("DataWrangling").getOrCreate()'''Set this configuration to get output similar to pandas'''
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

'''Find the count of a dataframe'''
df.count()
"""OUTPUT:891"""
           

列中的值计数 (Count of values in a column)

df.groupBy('Sex').count()
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

查找数据框中列的不同值 (Find distinct values of a column in a Dataframe)

df.select('Embarked').distinct()
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

在数据框中选择一组特定的列 (Select a specific set of columns in a Dataframe)

df.select('Survived', 'Age', 'Ticket').limit(5)
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

查找缺失值的计数 (Find the count of missing values)

df.select([count(when(isnull(column), column)).alias(column) \for column in df.columns])
           

过滤null和非null值 (Filtering null and not null values)

'''Find the null values of 'Age' '''
df.filter(col('Age').isNotNull()).limit(5)'''Another way to find not null values of 'Age' '''
df.filter("Age is not NULL").limit(5)
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

'''Find the null values of 'Age' '''
df.filter(col('Age').isNull()).limit(5)'''Another way to find null values of 'Age' '''
df.filter("Age is NULL").limit(5)
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

排序列 (Sorting columns)

'''Sort "Parch" column in ascending order and "Age" in descending order'''
df.sort(asc('Parch'),desc('Age')).limit(5)
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

删除列 (Dropping columns)

'''Drop multiple columns'''
df.drop('Age', 'Parch','Ticket').limit(5)
           
apache pyspark简介 Apache Spark (Apache Spark) 火花转换 (Spark Transformations) Spark中的动作 (Actions in Spark) Spark Jobs (Spark Jobs) 使用Pyspark进行数据准备 (Data preparation using Pyspark) 摘要 (Summary)

Output 输出量

分组和汇总 (Groupby and aggregation)

'''Finding the mean age of male and female'''
df.groupBy('Sex').agg(mean('Age'))
           

摘要 (Summary)

  • Spark is a lightning-fast cluster computing framework used for analytical purposes.

    Spark是用于分析目的的闪电般的群集计算框架。

  • SparkSession is the entry point for all the functionalities in Spark and is responsible for creating and scheduling Spark Jobs.

    SparkSession是Spark中所有功能的入口点,并负责创建和调度Spark Jobs。

  • The Executors implement the transformations on all the partitions of data available in RDD.

    执行器对RDD中可用数据的所有分区执行转换。

Find this post in my Kaggle notebook: https://www.kaggle.com/srivignesh/an-introduction-to-pyspark-apache-spark-in-python

在我的Kaggle笔记本中找到该帖子: https ://www.kaggle.com/srivignesh/an-introduction-to-pyspark-apache-spark-in-python

Connect with me on LinkedIn, Twitter!

在 LinkedIn , Twitter上 与我联系 !

Happy Learning!

学习愉快!

谢谢! (Thank you!)

翻译自: https://towardsdatascience.com/an-introduction-to-apache-pyspark-4454cc03bdfa
下一篇: Erlang概述