天天看點

pyspark常用文法_讀取各類資料源

想了解更多,歡迎移步"文淵小站"

裡面有更多知識分享,以及一些有意思的小項目~

環境

spark 2.4.0

1 讀寫 csv檔案
# 讀csv
file_path = r"xx/xx/xx/"   # 可以是檔案路徑,也可以是檔案名
# df = spark.read.csv(file_path, header=True, inferSchema=True)
df = spark.read.format('csv').option("header", "true").option("inferSchema", "true").csv(file_path)
# 寫csv
# df.write.csv(file_path)
df.write.save(path=file_path, format='csv', mode='overwrite', sep=',', header=True)
# df.repartition(1).write.save(path=file_path, format='csv', mode='overwrite', sep=',', header=True)    # 寫成1個csv檔案
           
2 讀寫 parquet檔案
# 讀parquet
file_path = r"xx/xx/xx/"
df_buzz_country = spark.read.parquet(file_path)
# 寫parquet
df.write.parquet(file_path, mode="overwrite")
# df.repartition(1).write.parquet(file_path, mode="overwrite")   # 寫成1個parquet檔案
           
3 讀寫 json檔案
# 讀csv
file_path = r"xx/xx/xx.json"
peopleDF = spark.read.json(file_path)
# 寫csv
df.write.json(file_path)
           
4 讀 hive
spark = SparkSession.builder.appName("task_read_hive").enableHiveSupport().getOrCreate()
df = spark.sql("select * from table_1")
           
5 讀 mysql
# spark-submit 中,需要配置如下(即添加‘mysql-connector-java-5.1.47.jar’包):
# --jars xxx/xxx/mysql-connector-java-5.1.47.jar
mysql_url = 'jdbc:mysql://域名或ip:3306/資料庫名?characterEncoding=UTF-8'
mysql_user = 'xxx'
mysql_pw = 'xxx'
table = "(select * from table_A) a"
df = spark.read.format('jdbc').options(url=mysql_url, driver='com.mysql.jdbc.Driver', dbtable=table,
                                       user=mysql_user, password=mysql_pw).load()
           

想了解更多,歡迎移步"文淵小站"

裡面有更多知識分享,以及一些有意思的小項目~

繼續閱讀