天天看点

pyspark读取csv文件创建DataFrame

mark一下,感谢作者分享!

方法一:用pandas辅助

from pyspark import SparkContext

from pyspark.sql import SQLContext

import pandas as pd

sc = SparkContext()

sqlContext=SQLContext(sc)

df=pd.read_csv(r’game-clicks.csv’)

sdf=sqlc.createDataFrame(df)

文件不大的情况下可以

方法二:纯spark

from pyspark import SparkContext

from pyspark.sql import SQLContext

sc = SparkContext()

sqlContext = SQLContext(sc)

sqlContext.read.format(‘com.databricks.spark.csv’).options(header=’true’, inferschema=’true’).load(‘game-clicks.csv’)

需要com.databricks.spark.csv环境

方法三:pyspark.sql import SparkSession(spark 2.X)

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(‘dataFrameApply’).getOrCreate()

filepath = “main.csv”

tttt = spark.read.csv(filepath,header=’true’,inferSchema=’true’,sep=’,’)

print(tttt)

有时候因为数据格式和编码的问题会出错

检查csv文件是否标准

filepath = “main_amount.csv”

main_amount = sc.textFile(filepath)

print(main_amount.count())

header = main_amount.first()

main_amount_data = main_amount.filter(lambda lines: lines != header)

header_columns = header.split(‘,’)

main_amount_data = main_amount_data.map(lambda line: line.split(‘,’))

print(len(header_columns)) 输出12列

//print(main_amount_data.take(2))

fields_len = main_amount_data.map(lambda fields: len(fields)).countByValue()

print(fields_len)

输出:{12: 64200, 13: 11310, 15: 2928, 14: 218, 17: 6, 18: 4, 16: 3}

发现列数不一致,并且部分数据行超过header_columns。

1、用pandas读将不会自动分割‘,’

2、com.databricks.spark.csv和SparkSession下read.csv会报错

继续阅读