【pyspark速成专家】8

【pyspark速成专家】8

码农世界 2024-05-28 后端 67 次浏览 0个评论

【pyspark速成专家】8

目录

​编辑

三,DataFrame保存成文件

四,DataFrame的API交互

五,DataFrame的SQL交互


三,DataFrame保存成文件

可以保存成csv文件,json文件,parquet文件或者保存成hive数据表

#保存成csv文件
df = spark.read.format("json").load("data/people.json")
df.write.format("csv").option("header","true").save("data/people_write.csv")
#先转换成rdd再保存成txt文件
df.rdd.saveAsTextFile("data/people_rdd.txt")
#保存成json文件
df.write.json("data/people_write.json")
#保存成parquet文件, 压缩格式, 占用存储小, 且是spark内存中存储格式,加载最快
df.write.partitionBy("age").format("parquet").save("data/namesAndAges.parquet")
df.write.parquet("data/people_write.parquet")
#保存成hive数据表
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

四,DataFrame的API交互

from pyspark.sql import Row
from pyspark.sql.functions import * 
df = spark.createDataFrame(
    [("LiLei",15,"male"),
     ("HanMeiMei",16,"female"),
     ("DaChui",17,"male")]).toDF("name","age","gender")
df.show()
df.printSchema()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)

1,Action操作

DataFrame的Action操作包括show,count,collect,,describe,take,head,first等操作。

#show
df.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+
#show(numRows: Int, truncate: Boolean) 
#第二个参数设置是否当输出字段长度超过20时进行截取
df.show(2,False) 
+---------+---+------+
|name     |age|gender|
+---------+---+------+
|LiLei    |15 |male  |
|HanMeiMei|16 |female|
+---------+---+------+
only showing top 2 rows
#count
df.count()
3
#collect
df.collect()
[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female'),
 Row(name='DaChui', age=17, gender='male')]
#first
df.first()
Row(name='LiLei', age=15, gender='male')
#take
df.take(2)
[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female')]
#head
df.head(2)
[Row(name='LiLei', age=15, gender='male'),
 Row(name='HanMeiMei', age=16, gender='female')]

2,类RDD操作

DataFrame支持RDD中一些诸如distinct,cache,sample,foreach,intersect,except等操作。

可以把DataFrame当做数据类型为Row的RDD来进行操作,必要时可以将其转换成RDD来操作。

df = spark.createDataFrame([("Hello World",),("Hello China",),("Hello Spark",)]).toDF("value")
df.show()
+-----------+
|      value|
+-----------+
|Hello World|
|Hello China|
|Hello Spark|
+-----------+
#map操作,需要先转换成rdd
rdd = df.rdd.map(lambda x:Row(x[0].upper()))
dfmap = rdd.toDF(["value"]).show()
+-----------+
|      value|
+-----------+
|HELLO WORLD|
|HELLO CHINA|
|HELLO SPARK|
+-----------+
#flatMap,需要先转换成rdd
df_flat = df.rdd.flatMap(lambda x:x[0].split(" ")).map(lambda x:Row(x)).toDF(["value"])
df_flat.show()
+-----+
|value|
+-----+
|Hello|
|World|
|Hello|
|China|
|Hello|
|Spark|
+-----+
#filter过滤
df_filter = df.rdd.filter(lambda s:s[0].endswith("Spark")).toDF(["value"])
df_filter.show()
+-----------+
|      value|
+-----------+
|Hello Spark|
+-----------+
# filter和broadcast混合使用
broads = sc.broadcast(["Hello","World"])
df_filter_broad = df_flat.filter(~col("value").isin(broads.value))
df_filter_broad.show() 
+-----+
|value|
+-----+
|China|
|Spark|
+-----+
#distinct
df_distinct = df_flat.distinct()
df_distinct.show() 
+-----+
|value|
+-----+
|World|
|China|
|Hello|
|Spark|
+-----+
#cache缓存
df.cache()
df.unpersist()
#sample抽样
dfsample = df.sample(False,0.6,0)
dfsample.show()  
+-----------+
|      value|
+-----------+
|Hello China|
|Hello Spark|
+-----------+
df2 = spark.createDataFrame([["Hello World"],["Hello Scala"],["Hello Spark"]]).toDF("value")
df2.show()
+-----------+
|      value|
+-----------+
|Hello World|
|Hello Scala|
|Hello Spark|
+-----------+
#intersect交集
dfintersect = df.intersect(df2)
dfintersect.show()
+-----------+
|      value|
+-----------+
|Hello Spark|
|Hello World|
+-----------+
#exceptAll补集
dfexcept = df.exceptAll(df2)
dfexcept.show()
+-----------+
|      value|
+-----------+
|Hello China|
+-----------+

3,类Excel操作

可以对DataFrame进行增加列,删除列,重命名列,排序等操作,去除重复行,去除空行,就跟操作Excel表格一样。

df = spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)
]).toDF("name","age","gender")
df.show()
df.printSchema()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
#增加列
dfnew = df.withColumn("birthyear",-df["age"]+2020)
dfnew.show() 
+---------+---+------+---------+
|     name|age|gender|birthyear|
+---------+---+------+---------+
|    LiLei| 15|  male|     2005|
|HanMeiMei| 16|female|     2004|
|   DaChui| 17|  male|     2003|
|    RuHua| 16|  null|     2004|
+---------+---+------+---------+
#置换列的顺序
dfupdate = dfnew.select("name","age","birthyear","gender")
dfupdate.show()
#删除列
dfdrop = df.drop("gender")
dfdrop.show() 
+---------+---+
|     name|age|
+---------+---+
|    LiLei| 15|
|HanMeiMei| 16|
|   DaChui| 17|
|    RuHua| 16|
+---------+---+
#重命名列
dfrename = df.withColumnRenamed("gender","sex")
dfrename.show() 
+---------+---+------+
|     name|age|   sex|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+
#排序sort,可以指定升序降序
dfsorted = df.sort(df["age"].desc())
dfsorted.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|   DaChui| 17|  male|
|    RuHua| 16|  null|
|HanMeiMei| 16|female|
|    LiLei| 15|  male|
+---------+---+------+
#排序orderby,默认为升序,可以根据多个字段
dfordered = df.orderBy(df["age"].desc(),df["gender"].desc())
dfordered.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|   DaChui| 17|  male|
|HanMeiMei| 16|female|
|    RuHua| 16|  null|
|    LiLei| 15|  male|
+---------+---+------+
#去除nan值行
dfnotnan = df.na.drop()
dfnotnan.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
+---------+---+------+
#填充nan值
df_fill = df.na.fill("female")
df_fill.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|female|
+---------+---+------+
#替换某些值
df_replace = df.na.replace({"":"female","RuHua":"SiYu"})
df_replace.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|     SiYu| 16|  null|
+---------+---+------+
#去重,默认根据全部字段
df2 = df.unionAll(df)
df2.show()
dfunique = df2.dropDuplicates()
dfunique.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
|   DaChui| 17|  male|
|    RuHua| 16|  null|
+---------+---+------+
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|    RuHua| 16|  null|
|   DaChui| 17|  male|
|HanMeiMei| 16|female|
|    LiLei| 15|  male|
+---------+---+------+
#去重,根据部分字段
dfunique_part = df.dropDuplicates(["age"])
dfunique_part.show()
+---------+---+------+
|     name|age|gender|
+---------+---+------+
|   DaChui| 17|  male|
|    LiLei| 15|  male|
|HanMeiMei| 16|female|
+---------+---+------+
#简单聚合操作
dfagg = df.agg({"name":"count","age":"max"})
dfagg.show()
+-----------+--------+
|count(name)|max(age)|
+-----------+--------+
|          4|      17|
+-----------+--------+
#汇总信息
df_desc = df.describe()
df_desc.show()
+-------+------+-----------------+------+
|summary|  name|              age|gender|
+-------+------+-----------------+------+
|  count|     4|                4|     3|
|   mean|  null|             16.0|  null|
| stddev|  null|0.816496580927726|  null|
|    min|DaChui|               15|female|
|    max| RuHua|               17|  male|
+-------+------+-----------------+------+
#频率超过0.5的年龄和性别
df_freq = df.stat.freqItems(("age","gender"),0.5)
df_freq.show()
+-------------+----------------+
|age_freqItems|gender_freqItems|
+-------------+----------------+
|         [16]|          [male]|
+-------------+----------------+

4,类SQL表操作

类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。

df = spark.createDataFrame([
("LiLei",15,"male"),
("HanMeiMei",16,"female"),
("DaChui",17,"male"),
("RuHua",16,None)]).toDF("name","age","gender")
df.show()
#表查询select
dftest = df.select("name").limit(2)
dftest.show()
dftest = df.select("name",df["age"] + 1)
dftest.show()
#表查询select
dftest = df.select("name",-df["age"]+2020).toDF("name","birth_year")
dftest.show()
#表查询selectExpr,可以使用UDF函数,指定别名等
import datetime
spark.udf.register("getBirthYear",lambda age:datetime.datetime.now().year-age)
dftest = df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(gender) as gender" )
dftest.show()
#表查询where, 指定SQL中的where字句表达式
dftest = df.where("gender='male' and age>15")
dftest.show()
#表查询filter
dftest = df.filter(df["age"]>16)
dftest.show()
#表查询filter
dftest = df.filter("gender ='male'")
dftest.show()
#表连接join
dfscore = spark.createDataFrame([("LiLei","male",88),("HanMeiMei","female",90),("DaChui","male",50)]) \
          .toDF("name","gender","score") 
dfscore.show()
#表连接join,根据单个字段
dfjoin = df.join(dfscore.select("name","score"),"name")
dfjoin.show()
#表连接join,根据多个字段
dfjoin = df.join(dfscore,["name","gender"])
dfjoin.show()
#表连接join,根据多个字段
#可以指定连接方式为"inner","left","right","outer","semi","full","leftanti","anti"等多种方式
dfjoin = df.join(dfscore,["name","gender"],"right")
dfjoin.show()
dfjoin = df.join(dfscore,["name","gender"],"outer")
dfjoin.show()
#表连接,灵活指定连接关系
dfmark = dfscore.withColumnRenamed("gender","sex")
dfmark.show()
dfjoin = df.join(dfmark,(df["name"] == dfmark["name"]) & (df["gender"]==dfmark["sex"]),
        "inner")
dfjoin.show()
#表合并union
dfstudent = spark.createDataFrame([("Jim",18,"male"),("Lily",16,"female")]).toDF("name","age","gender")
dfstudent.show()
dfunion = df.union(dfstudent)
dfunion.show()
#表分组 groupBy
from pyspark.sql import functions as F 
dfgroup = df.groupBy("gender").max("age")
dfgroup.show()
#表分组后聚合,groupBy,agg
dfagg = df.groupBy("gender").agg(F.mean("age").alias("mean_age"),
   F.collect_list("name").alias("names"))
dfagg.show()
#表分组聚合,groupBy,agg
dfagg = df.groupBy("gender").agg(F.expr("avg(age)"),F.expr("collect_list(name)"))
dfagg.show()
#表分组聚合,groupBy,agg
df.groupBy("gender","age").agg(F.collect_list(col("name"))).show()
#表分组后透视,groupBy,pivot
dfstudent = spark.createDataFrame([("LiLei",18,"male",1),("HanMeiMei",16,"female",1),
                    ("Jim",17,"male",2),("DaChui",20,"male",2)]).toDF("name","age","gender","class")
dfstudent.show()
dfstudent.groupBy("class").pivot("gender").max("age").show()
#窗口函数
df = spark.createDataFrame([("LiLei",78,"class1"),("HanMeiMei",87,"class1"),
                           ("DaChui",65,"class2"),("RuHua",55,"class2")]) \
    .toDF("name","score","class")
df.show()
dforder = df.selectExpr("name","score","class",
         "row_number() over (partition by class order by score desc) as order")
dforder.show()

五,DataFrame的SQL交互

将DataFrame注册为临时表视图或者全局表视图后,可以使用sql语句对DataFrame进行交互。

不仅如此,还可以通过SparkSQL对Hive表直接进行增删改查等操作。

1,注册视图后进行SQL交互

#注册为临时表视图, 其生命周期和SparkSession相关联
df = spark.createDataFrame([("LiLei",18,"male"),("HanMeiMei",17,"female"),("Jim",16,"male")],
                              ("name","age","gender"))
df.show()
df.createOrReplaceTempView("student")
dfmale = spark.sql("select * from student where gender='male'")
dfmale.show()
#注册为全局临时表视图,其生命周期和整个Spark应用程序关联
df.createOrReplaceGlobalTempView("student")
query = """
 select t.gender
 , collect_list(t.name) as names 
 from global_temp.student t 
 group by t.gender
""".strip("\n")
spark.sql(query).show()
#可以在新的Session中访问
spark.newSession().sql("select * from global_temp.student").show()

2,对Hive表进行增删改查操作

#删除hive表
query = "DROP TABLE IF EXISTS students" 
spark.sql(query) 
#建立hive分区表
#(注:不可以使用中文字段作为分区字段)
query = """CREATE TABLE IF NOT EXISTS `students`
(`name` STRING COMMENT '姓名',
`age` INT COMMENT '年龄'
)
PARTITIONED BY ( `class` STRING  COMMENT '班级', `gender` STRING  COMMENT '性别')
""".replace("\n"," ")
spark.sql(query) 
##动态写入数据到hive分区表
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") #注意此处有一个设置操作
dfstudents = spark.createDataFrame([("LiLei",18,"class1","male"),
                                    ("HanMeimei",17,"class2","female"),
                                    ("DaChui",19,"class2","male"),
                                    ("Lily",17,"class1","female")]).toDF("name","age","class","gender")
dfstudents.show()
#动态写入分区
dfstudents.write.mode("overwrite").format("hive")\
.partitionBy("class","gender").saveAsTable("students")
#写入到静态分区
dfstudents = spark.createDataFrame([("Jim",18,"class3","male"),
                                    ("Tom",19,"class3","male")]).toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass3")
#INSERT INTO 尾部追加, INSERT OVERWRITE TABLE 覆盖分区
query = """
INSERT OVERWRITE TABLE `students`
PARTITION(class='class3',gender='male') 
SELECT name,age from dfclass3
""".replace("\n"," ")
spark.sql(query)
#写入到混合分区
dfstudents = spark.createDataFrame([("David",18,"class4","male"),
                                    ("Amy",17,"class4","female"),
                                    ("Jerry",19,"class4","male"),
                                    ("Ann",17,"class4","female")]).toDF("name","age","class","gender")
dfstudents.createOrReplaceTempView("dfclass4")
query = """
INSERT OVERWRITE TABLE `students`
PARTITION(class='class4',gender) 
SELECT name,age,gender from dfclass4
""".replace("\n"," ")
spark.sql(query)
#读取全部数据
dfdata = spark.sql("select * from students")
dfdata.show()

转载请注明来自码农世界,本文标题:《【pyspark速成专家】8》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,67人围观)参与讨论

还没有评论,来说两句吧...

Top