目录
编辑
三,DataFrame保存成文件
四,DataFrame的API交互
五,DataFrame的SQL交互
三,DataFrame保存成文件
可以保存成csv文件,json文件,parquet文件或者保存成hive数据表
#保存成csv文件 df = spark.read.format("json").load("data/people.json") df.write.format("csv").option("header","true").save("data/people_write.csv") #先转换成rdd再保存成txt文件 df.rdd.saveAsTextFile("data/people_rdd.txt") #保存成json文件 df.write.json("data/people_write.json") #保存成parquet文件, 压缩格式, 占用存储小, 且是spark内存中存储格式,加载最快 df.write.partitionBy("age").format("parquet").save("data/namesAndAges.parquet") df.write.parquet("data/people_write.parquet") #保存成hive数据表 df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
四,DataFrame的API交互
from pyspark.sql import Row from pyspark.sql.functions import * df = spark.createDataFrame( [("LiLei",15,"male"), ("HanMeiMei",16,"female"), ("DaChui",17,"male")]).toDF("name","age","gender") df.show() df.printSchema() +---------+---+------+ | name|age|gender| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| +---------+---+------+ root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true)
1,Action操作
DataFrame的Action操作包括show,count,collect,,describe,take,head,first等操作。
#show df.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| +---------+---+------+ #show(numRows: Int, truncate: Boolean) #第二个参数设置是否当输出字段长度超过20时进行截取 df.show(2,False) +---------+---+------+ |name |age|gender| +---------+---+------+ |LiLei |15 |male | |HanMeiMei|16 |female| +---------+---+------+ only showing top 2 rows #count df.count() 3 #collect df.collect() [Row(name='LiLei', age=15, gender='male'), Row(name='HanMeiMei', age=16, gender='female'), Row(name='DaChui', age=17, gender='male')] #first df.first() Row(name='LiLei', age=15, gender='male') #take df.take(2) [Row(name='LiLei', age=15, gender='male'), Row(name='HanMeiMei', age=16, gender='female')] #head df.head(2) [Row(name='LiLei', age=15, gender='male'), Row(name='HanMeiMei', age=16, gender='female')]
2,类RDD操作
DataFrame支持RDD中一些诸如distinct,cache,sample,foreach,intersect,except等操作。
可以把DataFrame当做数据类型为Row的RDD来进行操作,必要时可以将其转换成RDD来操作。
df = spark.createDataFrame([("Hello World",),("Hello China",),("Hello Spark",)]).toDF("value") df.show() +-----------+ | value| +-----------+ |Hello World| |Hello China| |Hello Spark| +-----------+ #map操作,需要先转换成rdd rdd = df.rdd.map(lambda x:Row(x[0].upper())) dfmap = rdd.toDF(["value"]).show() +-----------+ | value| +-----------+ |HELLO WORLD| |HELLO CHINA| |HELLO SPARK| +-----------+ #flatMap,需要先转换成rdd df_flat = df.rdd.flatMap(lambda x:x[0].split(" ")).map(lambda x:Row(x)).toDF(["value"]) df_flat.show() +-----+ |value| +-----+ |Hello| |World| |Hello| |China| |Hello| |Spark| +-----+ #filter过滤 df_filter = df.rdd.filter(lambda s:s[0].endswith("Spark")).toDF(["value"]) df_filter.show() +-----------+ | value| +-----------+ |Hello Spark| +-----------+ # filter和broadcast混合使用 broads = sc.broadcast(["Hello","World"]) df_filter_broad = df_flat.filter(~col("value").isin(broads.value)) df_filter_broad.show() +-----+ |value| +-----+ |China| |Spark| +-----+ #distinct df_distinct = df_flat.distinct() df_distinct.show() +-----+ |value| +-----+ |World| |China| |Hello| |Spark| +-----+ #cache缓存 df.cache() df.unpersist() #sample抽样 dfsample = df.sample(False,0.6,0) dfsample.show() +-----------+ | value| +-----------+ |Hello China| |Hello Spark| +-----------+ df2 = spark.createDataFrame([["Hello World"],["Hello Scala"],["Hello Spark"]]).toDF("value") df2.show() +-----------+ | value| +-----------+ |Hello World| |Hello Scala| |Hello Spark| +-----------+ #intersect交集 dfintersect = df.intersect(df2) dfintersect.show() +-----------+ | value| +-----------+ |Hello Spark| |Hello World| +-----------+ #exceptAll补集 dfexcept = df.exceptAll(df2) dfexcept.show() +-----------+ | value| +-----------+ |Hello China| +-----------+
3,类Excel操作
可以对DataFrame进行增加列,删除列,重命名列,排序等操作,去除重复行,去除空行,就跟操作Excel表格一样。
df = spark.createDataFrame([ ("LiLei",15,"male"), ("HanMeiMei",16,"female"), ("DaChui",17,"male"), ("RuHua",16,None) ]).toDF("name","age","gender") df.show() df.printSchema() +---------+---+------+ | name|age|gender| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| | RuHua| 16| null| +---------+---+------+ root |-- name: string (nullable = true) |-- age: long (nullable = true) |-- gender: string (nullable = true) #增加列 dfnew = df.withColumn("birthyear",-df["age"]+2020) dfnew.show() +---------+---+------+---------+ | name|age|gender|birthyear| +---------+---+------+---------+ | LiLei| 15| male| 2005| |HanMeiMei| 16|female| 2004| | DaChui| 17| male| 2003| | RuHua| 16| null| 2004| +---------+---+------+---------+ #置换列的顺序 dfupdate = dfnew.select("name","age","birthyear","gender") dfupdate.show() #删除列 dfdrop = df.drop("gender") dfdrop.show() +---------+---+ | name|age| +---------+---+ | LiLei| 15| |HanMeiMei| 16| | DaChui| 17| | RuHua| 16| +---------+---+ #重命名列 dfrename = df.withColumnRenamed("gender","sex") dfrename.show() +---------+---+------+ | name|age| sex| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| | RuHua| 16| null| +---------+---+------+ #排序sort,可以指定升序降序 dfsorted = df.sort(df["age"].desc()) dfsorted.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | DaChui| 17| male| | RuHua| 16| null| |HanMeiMei| 16|female| | LiLei| 15| male| +---------+---+------+ #排序orderby,默认为升序,可以根据多个字段 dfordered = df.orderBy(df["age"].desc(),df["gender"].desc()) dfordered.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | DaChui| 17| male| |HanMeiMei| 16|female| | RuHua| 16| null| | LiLei| 15| male| +---------+---+------+ #去除nan值行 dfnotnan = df.na.drop() dfnotnan.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| +---------+---+------+ #填充nan值 df_fill = df.na.fill("female") df_fill.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| | RuHua| 16|female| +---------+---+------+ #替换某些值 df_replace = df.na.replace({"":"female","RuHua":"SiYu"}) df_replace.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| | SiYu| 16| null| +---------+---+------+ #去重,默认根据全部字段 df2 = df.unionAll(df) df2.show() dfunique = df2.dropDuplicates() dfunique.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| | RuHua| 16| null| | LiLei| 15| male| |HanMeiMei| 16|female| | DaChui| 17| male| | RuHua| 16| null| +---------+---+------+ +---------+---+------+ | name|age|gender| +---------+---+------+ | RuHua| 16| null| | DaChui| 17| male| |HanMeiMei| 16|female| | LiLei| 15| male| +---------+---+------+ #去重,根据部分字段 dfunique_part = df.dropDuplicates(["age"]) dfunique_part.show() +---------+---+------+ | name|age|gender| +---------+---+------+ | DaChui| 17| male| | LiLei| 15| male| |HanMeiMei| 16|female| +---------+---+------+ #简单聚合操作 dfagg = df.agg({"name":"count","age":"max"}) dfagg.show() +-----------+--------+ |count(name)|max(age)| +-----------+--------+ | 4| 17| +-----------+--------+ #汇总信息 df_desc = df.describe() df_desc.show() +-------+------+-----------------+------+ |summary| name| age|gender| +-------+------+-----------------+------+ | count| 4| 4| 3| | mean| null| 16.0| null| | stddev| null|0.816496580927726| null| | min|DaChui| 15|female| | max| RuHua| 17| male| +-------+------+-----------------+------+ #频率超过0.5的年龄和性别 df_freq = df.stat.freqItems(("age","gender"),0.5) df_freq.show() +-------------+----------------+ |age_freqItems|gender_freqItems| +-------------+----------------+ | [16]| [male]| +-------------+----------------+
4,类SQL表操作
类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。
df = spark.createDataFrame([ ("LiLei",15,"male"), ("HanMeiMei",16,"female"), ("DaChui",17,"male"), ("RuHua",16,None)]).toDF("name","age","gender") df.show() #表查询select dftest = df.select("name").limit(2) dftest.show() dftest = df.select("name",df["age"] + 1) dftest.show() #表查询select dftest = df.select("name",-df["age"]+2020).toDF("name","birth_year") dftest.show() #表查询selectExpr,可以使用UDF函数,指定别名等 import datetime spark.udf.register("getBirthYear",lambda age:datetime.datetime.now().year-age) dftest = df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(gender) as gender" ) dftest.show() #表查询where, 指定SQL中的where字句表达式 dftest = df.where("gender='male' and age>15") dftest.show() #表查询filter dftest = df.filter(df["age"]>16) dftest.show() #表查询filter dftest = df.filter("gender ='male'") dftest.show() #表连接join dfscore = spark.createDataFrame([("LiLei","male",88),("HanMeiMei","female",90),("DaChui","male",50)]) \ .toDF("name","gender","score") dfscore.show() #表连接join,根据单个字段 dfjoin = df.join(dfscore.select("name","score"),"name") dfjoin.show() #表连接join,根据多个字段 dfjoin = df.join(dfscore,["name","gender"]) dfjoin.show() #表连接join,根据多个字段 #可以指定连接方式为"inner","left","right","outer","semi","full","leftanti","anti"等多种方式 dfjoin = df.join(dfscore,["name","gender"],"right") dfjoin.show() dfjoin = df.join(dfscore,["name","gender"],"outer") dfjoin.show() #表连接,灵活指定连接关系 dfmark = dfscore.withColumnRenamed("gender","sex") dfmark.show() dfjoin = df.join(dfmark,(df["name"] == dfmark["name"]) & (df["gender"]==dfmark["sex"]), "inner") dfjoin.show() #表合并union dfstudent = spark.createDataFrame([("Jim",18,"male"),("Lily",16,"female")]).toDF("name","age","gender") dfstudent.show() dfunion = df.union(dfstudent) dfunion.show() #表分组 groupBy from pyspark.sql import functions as F dfgroup = df.groupBy("gender").max("age") dfgroup.show() #表分组后聚合,groupBy,agg dfagg = df.groupBy("gender").agg(F.mean("age").alias("mean_age"), F.collect_list("name").alias("names")) dfagg.show() #表分组聚合,groupBy,agg dfagg = df.groupBy("gender").agg(F.expr("avg(age)"),F.expr("collect_list(name)")) dfagg.show() #表分组聚合,groupBy,agg df.groupBy("gender","age").agg(F.collect_list(col("name"))).show() #表分组后透视,groupBy,pivot dfstudent = spark.createDataFrame([("LiLei",18,"male",1),("HanMeiMei",16,"female",1), ("Jim",17,"male",2),("DaChui",20,"male",2)]).toDF("name","age","gender","class") dfstudent.show() dfstudent.groupBy("class").pivot("gender").max("age").show() #窗口函数 df = spark.createDataFrame([("LiLei",78,"class1"),("HanMeiMei",87,"class1"), ("DaChui",65,"class2"),("RuHua",55,"class2")]) \ .toDF("name","score","class") df.show() dforder = df.selectExpr("name","score","class", "row_number() over (partition by class order by score desc) as order") dforder.show()
五,DataFrame的SQL交互
将DataFrame注册为临时表视图或者全局表视图后,可以使用sql语句对DataFrame进行交互。
不仅如此,还可以通过SparkSQL对Hive表直接进行增删改查等操作。
1,注册视图后进行SQL交互
#注册为临时表视图, 其生命周期和SparkSession相关联 df = spark.createDataFrame([("LiLei",18,"male"),("HanMeiMei",17,"female"),("Jim",16,"male")], ("name","age","gender")) df.show() df.createOrReplaceTempView("student") dfmale = spark.sql("select * from student where gender='male'") dfmale.show() #注册为全局临时表视图,其生命周期和整个Spark应用程序关联 df.createOrReplaceGlobalTempView("student") query = """ select t.gender , collect_list(t.name) as names from global_temp.student t group by t.gender """.strip("\n") spark.sql(query).show() #可以在新的Session中访问 spark.newSession().sql("select * from global_temp.student").show()
2,对Hive表进行增删改查操作
#删除hive表 query = "DROP TABLE IF EXISTS students" spark.sql(query) #建立hive分区表 #(注:不可以使用中文字段作为分区字段) query = """CREATE TABLE IF NOT EXISTS `students` (`name` STRING COMMENT '姓名', `age` INT COMMENT '年龄' ) PARTITIONED BY ( `class` STRING COMMENT '班级', `gender` STRING COMMENT '性别') """.replace("\n"," ") spark.sql(query) ##动态写入数据到hive分区表 spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") #注意此处有一个设置操作 dfstudents = spark.createDataFrame([("LiLei",18,"class1","male"), ("HanMeimei",17,"class2","female"), ("DaChui",19,"class2","male"), ("Lily",17,"class1","female")]).toDF("name","age","class","gender") dfstudents.show() #动态写入分区 dfstudents.write.mode("overwrite").format("hive")\ .partitionBy("class","gender").saveAsTable("students") #写入到静态分区 dfstudents = spark.createDataFrame([("Jim",18,"class3","male"), ("Tom",19,"class3","male")]).toDF("name","age","class","gender") dfstudents.createOrReplaceTempView("dfclass3") #INSERT INTO 尾部追加, INSERT OVERWRITE TABLE 覆盖分区 query = """ INSERT OVERWRITE TABLE `students` PARTITION(class='class3',gender='male') SELECT name,age from dfclass3 """.replace("\n"," ") spark.sql(query) #写入到混合分区 dfstudents = spark.createDataFrame([("David",18,"class4","male"), ("Amy",17,"class4","female"), ("Jerry",19,"class4","male"), ("Ann",17,"class4","female")]).toDF("name","age","class","gender") dfstudents.createOrReplaceTempView("dfclass4") query = """ INSERT OVERWRITE TABLE `students` PARTITION(class='class4',gender) SELECT name,age,gender from dfclass4 """.replace("\n"," ") spark.sql(query) #读取全部数据 dfdata = spark.sql("select * from students") dfdata.show()
还没有评论,来说两句吧...