Spark SQL 操作实战

Spark SQL 操作实战

码农世界 2024-05-28 前端 74 次浏览 0个评论

Spark SQL 基础

Spark SQL 是 Apache Spark 处理结构化数据的模块。

配置spark环境

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q www-us.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz  
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-2.4.8-bin-hadoop2.7"
import findspark
findspark.init()

初始化SparkSession

初始化

 SparkSession 用于创建数据帧,将数据帧注册为表,执行 SQL 查询,缓存表及读取 Parquet 文件。
from pyspark.sql import SparkSession
spark = SparkSession \
       .builder \
       .appName("Python Spark SQL basic example") \
       .config("spark.some.config.option", "some-value") \
       .getOrCreate()

创建数据帧

从 RDD 创建

from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *

推断 Schema

sc = spark.sparkContext
!wget https://github.com/awesome-AI-cheatsheets/tree/main/Spark/data/people.txt
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)

指定 Schema

people = parts.map(lambda p: Row(name=p[0], age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people, schema).show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

从Spark数据源创建

JSON

!wget -q https://github.com/awesome-AI-cheatsheets/tree/main/Spark/data/people.json
!wget -q https://github.com/awesome-AI-cheatsheets/tree/main/Spark/data/employees.json
!wget -q https://github.com/awesome-AI-cheatsheets/tree/main/Spark/data/customers.json
df = spark.read.json("customers.json")
df.show()
+--------------------+---+---------+----------+--------------------+
|             address|age|firstName|  lastName|         phoneNumber|
+--------------------+---+---------+----------+--------------------+
|[New Orleans, LA,...| 22|     Jane|Butterburg|[210-332-578, mob...|
|[Brighton, MI, 4 ...| 25|Josephine|   Darakjy|[210-319-103, mob...|
|[Bridgeport, NJ, ...| 32|    Boris|    Chemel|[210-322-007, mob...|
|[Bridgeport, NJ, ...| 35|  Johnson|     Smith|[210-303-029, mob...|
+--------------------+---+---------+----------+--------------------+
df2 = spark.read.load("people.json", format="json")

Parquet文件

!wget -q https://github.com/apache/spark/raw/master/examples/src/main/resources/users.parquet
df3 = spark.read.load("users.parquet")

文本文件

df4 = spark.read.text("people.txt")

查阅数据信息

查阅spark Dataframe的信息

df.dtypes   #返回 df 的列名与数据类型
[('address', 'struct'),
 ('age', 'bigint'),
 ('firstName', 'string'),
 ('lastName', 'string'),
 ('phoneNumber', 'struct')]
df.show()   #显示 df 的内容
+--------------------+---+---------+----------+--------------------+
|             address|age|firstName|  lastName|         phoneNumber|
+--------------------+---+---------+----------+--------------------+
|[New Orleans, LA,...| 22|     Jane|Butterburg|[210-332-578, mob...|
|[Brighton, MI, 4 ...| 25|Josephine|   Darakjy|[210-319-103, mob...|
|[Bridgeport, NJ, ...| 32|    Boris|    Chemel|[210-322-007, mob...|
|[Bridgeport, NJ, ...| 35|  Johnson|     Smith|[210-303-029, mob...|
+--------------------+---+---------+----------+--------------------+
df.head(3)   #返回前 n 行数据
[Row(address=Row(city='New Orleans', state='LA', street='6649 N Blue Gum St', zip='70116'), age=22, firstName='Jane', lastName='Butterburg', phoneNumber=Row(number='210-332-578', type='mobile')),
 Row(address=Row(city='Brighton', state='MI', street='4 B Blue Ridge Blvd', zip='48116'), age=25, firstName='Josephine', lastName='Darakjy', phoneNumber=Row(number='210-319-103', type='mobile')),
 Row(address=Row(city='Bridgeport', state='NJ', street='8 W Cerritos Ave #54', zip='08014'), age=32, firstName='Boris', lastName='Chemel', phoneNumber=Row(number='210-322-007', type='mobile'))]
df.first()   #返回第 1 行数据
Row(address=Row(city='New Orleans', state='LA', street='6649 N Blue Gum St', zip='70116'), age=22, firstName='Jane', lastName='Butterburg', phoneNumber=Row(number='210-332-578', type='mobile'))
df.take(3)   #返回前 n 行数据
[Row(address=Row(city='New Orleans', state='LA', street='6649 N Blue Gum St', zip='70116'), age=22, firstName='Jane', lastName='Butterburg', phoneNumber=Row(number='210-332-578', type='mobile')),
 Row(address=Row(city='Brighton', state='MI', street='4 B Blue Ridge Blvd', zip='48116'), age=25, firstName='Josephine', lastName='Darakjy', phoneNumber=Row(number='210-319-103', type='mobile')),
 Row(address=Row(city='Bridgeport', state='NJ', street='8 W Cerritos Ave #54', zip='08014'), age=32, firstName='Boris', lastName='Chemel', phoneNumber=Row(number='210-322-007', type='mobile'))]
df.schema   #返回 df 的 Schema
StructType(List(StructField(address,StructType(List(StructField(city,StringType,true),StructField(state,StringType,true),StructField(street,StringType,true),StructField(zip,StringType,true))),true),StructField(age,LongType,true),StructField(firstName,StringType,true),StructField(lastName,StringType,true),StructField(phoneNumber,StructType(List(StructField(number,StringType,true),StructField(type,StringType,true))),true)))
df.describe().show()   #汇总统计数据
+-------+-----------------+---------+----------+
|summary|              age|firstName|  lastName|
+-------+-----------------+---------+----------+
|  count|                4|        4|         4|
|   mean|             28.5|     null|      null|
| stddev|6.027713773341708|     null|      null|
|    min|               22|    Boris|Butterburg|
|    max|               35|Josephine|     Smith|
+-------+-----------------+---------+----------+
df.columns   #返回 df 的列名
['address', 'age', 'firstName', 'lastName', 'phoneNumber']
df.count()   #返回 df 的行数
4
df.distinct().count()   #返回 df 中不重复的行数
4
df.printSchema()   #返回 df的 Schema
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- age: long (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- phoneNumber: struct (nullable = true)
 |    |-- number: string (nullable = true)
 |    |-- type: string (nullable = true)
df.explain()   #返回逻辑与实体方案
== Physical Plan ==
*(1) FileScan json [address#452,age#453L,firstName#454,lastName#455,phoneNumber#456] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/content/customers.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct,age:bigint,firstName:str...

重复值

dropDuplicates函数

df = df.dropDuplicates()

查询

自定义函数F

from pyspark.sql import functions as F

Select

df.select("firstName").show()   #显示 firstName 列的所有条目
+---------+
|firstName|
+---------+
|Josephine|
|     Jane|
|    Boris|
|  Johnson|
+---------+
df.select("firstName","lastName").show()
+---------+----------+
|firstName|  lastName|
+---------+----------+
|Josephine|   Darakjy|
|     Jane|Butterburg|
|    Boris|    Chemel|
|  Johnson|     Smith|
+---------+----------+
df.show()
+--------------------+---+---------+----------+--------------------+
|             address|age|firstName|  lastName|         phoneNumber|
+--------------------+---+---------+----------+--------------------+
|[Brighton, MI, 4 ...| 25|Josephine|   Darakjy|[210-319-103, mob...|
|[New Orleans, LA,...| 22|     Jane|Butterburg|[210-332-578, mob...|
|[Bridgeport, NJ, ...| 32|    Boris|    Chemel|[210-322-007, mob...|
|[Bridgeport, NJ, ...| 35|  Johnson|     Smith|[210-303-029, mob...|
+--------------------+---+---------+----------+--------------------+
df.select("firstName", "age", df.phoneNumber \
            .alias("contactInfo")) \
            .select("contactInfo", "firstName", "age") \
            .show()   #显示 firstName、age 的所有条目和类型
+--------------------+---------+---+
|         contactInfo|firstName|age|
+--------------------+---------+---+
|[210-319-103, mob...|Josephine| 25|
|[210-332-578, mob...|     Jane| 22|
|[210-322-007, mob...|    Boris| 32|
|[210-303-029, mob...|  Johnson| 35|
+--------------------+---------+---+
df.select(df["firstName"],df["age"]+ 1).show()   # 显示 firstName 和 age 列的所有 记录,并对 age 记录添加1
+---------+---------+
|firstName|(age + 1)|
+---------+---------+
|Josephine|       26|
|     Jane|       23|
|    Boris|       33|
|  Johnson|       36|
+---------+---------+
df.select(df['age'] > 24).show()   #显示所有小于24岁的记录
+----------+
|(age > 24)|
+----------+
|      true|
|     false|
|      true|
|      true|
+----------+

When

df.select("firstName", F.when(df.age > 30, 1) \
            .otherwise(0)) \
            .show()   #显示 firstName,且大于30岁显示 1,小于30岁显示0
+---------+--------------------------------------+
|firstName|CASE WHEN (age > 30) THEN 1 ELSE 0 END|
+---------+--------------------------------------+
|Josephine|                                     0|
|     Jane|                                     0|
|    Boris|                                     1|
|  Johnson|                                     1|
+---------+--------------------------------------+
df[df.firstName.isin("Jane","Boris")].collect()   # 显示符合指定条件的 firstName 列 的记录
[Row(address=Row(city='New Orleans', state='LA', street='6649 N Blue Gum St', zip='70116'), age=22, firstName='Jane', lastName='Butterburg', phoneNumber=Row(number='210-332-578', type='mobile')),
 Row(address=Row(city='Bridgeport', state='NJ', street='8 W Cerritos Ave #54', zip='08014'), age=32, firstName='Boris', lastName='Chemel', phoneNumber=Row(number='210-322-007', type='mobile'))]

Like

df.select("firstName", df.lastName.like("Smith")) \
  .show()   # 显示 lastName 列中包含 Smith 的 firstName 列的记录
+---------+-------------------+
|firstName|lastName LIKE Smith|
+---------+-------------------+
|Josephine|              false|
|     Jane|              false|
|    Boris|              false|
|  Johnson|               true|
+---------+-------------------+

Startswith - Endswith

df.select("firstName", df.lastName \
  .startswith("Sm")) \
  .show()   # 显示 lastName 列中以 Sm 开头的 firstName 列的记录
+---------+------------------------+
|firstName|startswith(lastName, Sm)|
+---------+------------------------+
|Josephine|                   false|
|     Jane|                   false|
|    Boris|                   false|
|  Johnson|                    true|
+---------+------------------------+
df.select(df.lastName \
  .endswith("th")) \
  .show()   # 显示以 th 结尾的 lastName
+----------------------+
|endswith(lastName, th)|
+----------------------+
|                 false|
|                 false|
|                 false|
|                  true|
+----------------------+

Substring

df.select(df.firstName.substr(1, 3) \
 .alias("name")) \
 .collect()   #返回 firstName 的子字符串
[Row(name='Jos'), Row(name='Jan'), Row(name='Bor'), Row(name='Joh')]

Between

df.select(df.age.between(22, 24)) \
        .show()   #显示介于22岁至24岁之间的 age 列的记录
+-----------------------------+
|((age >= 22) AND (age <= 24))|
+-----------------------------+
|                        false|
|                         true|
|                        false|
|                        false|
+-----------------------------+

添加、修改、删除列

添加列

df.select(df.phoneNumber.number).show()
+------------------+
|phoneNumber.number|
+------------------+
|       210-319-103|
|       210-332-578|
|       210-322-007|
|       210-303-029|
+------------------+
df = df.withColumn('city',df.address.city) \
             .withColumn('postalCode',df.address.zip) \
             .withColumn('state',df.address.state) \
             .withColumn('streetAddress',df.address.street) \
             .withColumn('telePhoneNumber', df.phoneNumber.number) \
             .withColumn('telePhoneType', df.phoneNumber.type)
df.show()
+--------------------+---+---------+----------+--------------------+-----------+----------+-----+--------------------+---------------+-------------+
|             address|age|firstName|  lastName|         phoneNumber|       city|postalCode|state|       streetAddress|telePhoneNumber|telePhoneType|
+--------------------+---+---------+----------+--------------------+-----------+----------+-----+--------------------+---------------+-------------+
|[Brighton, MI, 4 ...| 25|Josephine|   Darakjy|[210-319-103, mob...|   Brighton|     48116|   MI| 4 B Blue Ridge Blvd|    210-319-103|       mobile|
|[New Orleans, LA,...| 22|     Jane|Butterburg|[210-332-578, mob...|New Orleans|     70116|   LA|  6649 N Blue Gum St|    210-332-578|       mobile|
|[Bridgeport, NJ, ...| 32|    Boris|    Chemel|[210-322-007, mob...| Bridgeport|     08014|   NJ|8 W Cerritos Ave #54|    210-322-007|       mobile|
|[Bridgeport, NJ, ...| 35|  Johnson|     Smith|[210-303-029, mob...| Bridgeport|     08112|   NJ|     5 W Blue Ave St|    210-303-029|       mobile|
+--------------------+---+---------+----------+--------------------+-----------+----------+-----+--------------------+---------------+-------------+

修改列

df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')

删除列

df = df.drop("address", "phoneNumber")
# 等价于
# df = df.drop(df.address).drop(df.phoneNumber)

分组

groupBy操作

df.groupBy("age")\
      .count() \
      .show()   #按 age 列分组,统计每组人数
+---+-----+
|age|count|
+---+-----+
| 22|    1|
| 32|    1|
| 25|    1|
| 35|    1|
+---+-----+

筛选

filter筛选

df.filter(df["age"]>24).show()   #按 age 列筛选,保留年龄大于24岁的
+---+---------+--------+----------+----------+-----+--------------------+-------------+
|age|firstName|lastName|      city|postalCode|state|       streetAddress|telePhoneType|
+---+---------+--------+----------+----------+-----+--------------------+-------------+
| 25|Josephine| Darakjy|  Brighton|     48116|   MI| 4 B Blue Ridge Blvd|       mobile|
| 32|    Boris|  Chemel|Bridgeport|     08014|   NJ|8 W Cerritos Ave #54|       mobile|
| 35|  Johnson|   Smith|Bridgeport|     08112|   NJ|     5 W Blue Ave St|       mobile|
+---+---------+--------+----------+----------+-----+--------------------+-------------+

排序

sort与orderBy操作

peopledf.sort(peopledf.age.desc()).collect()
[Row(age=30, name='Andy'),
 Row(age=29, name='Michael'),
 Row(age=19, name='Justin')]
df.sort("age", ascending=False).collect()
[Row(age=35, firstName='Johnson', lastName='Smith', city='Bridgeport', postalCode='08112', state='NJ', streetAddress='5 W Blue Ave St', telePhoneType='mobile'),
 Row(age=32, firstName='Boris', lastName='Chemel', city='Bridgeport', postalCode='08014', state='NJ', streetAddress='8 W Cerritos Ave #54', telePhoneType='mobile'),
 Row(age=25, firstName='Josephine', lastName='Darakjy', city='Brighton', postalCode='48116', state='MI', streetAddress='4 B Blue Ridge Blvd', telePhoneType='mobile'),
 Row(age=22, firstName='Jane', lastName='Butterburg', city='New Orleans', postalCode='70116', state='LA', streetAddress='6649 N Blue Gum St', telePhoneType='mobile')]
df.orderBy(["age","city"],ascending=[0,1]).collect()
[Row(age=35, firstName='Johnson', lastName='Smith', city='Bridgeport', postalCode='08112', state='NJ', streetAddress='5 W Blue Ave St', telePhoneType='mobile'),
 Row(age=32, firstName='Boris', lastName='Chemel', city='Bridgeport', postalCode='08014', state='NJ', streetAddress='8 W Cerritos Ave #54', telePhoneType='mobile'),
 Row(age=25, firstName='Josephine', lastName='Darakjy', city='Brighton', postalCode='48116', state='MI', streetAddress='4 B Blue Ridge Blvd', telePhoneType='mobile'),
 Row(age=22, firstName='Jane', lastName='Butterburg', city='New Orleans', postalCode='70116', state='LA', streetAddress='6649 N Blue Gum St', telePhoneType='mobile')]

替换缺失值

replace操作

df.na.fill(50).show()   #用一个值替换空值
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
|age|firstName|  lastName|       city|postalCode|state|       streetAddress|telePhoneType|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
| 25|Josephine|   Darakjy|   Brighton|     48116|   MI| 4 B Blue Ridge Blvd|       mobile|
| 22|     Jane|Butterburg|New Orleans|     70116|   LA|  6649 N Blue Gum St|       mobile|
| 32|    Boris|    Chemel| Bridgeport|     08014|   NJ|8 W Cerritos Ave #54|       mobile|
| 35|  Johnson|     Smith| Bridgeport|     08112|   NJ|     5 W Blue Ave St|       mobile|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
df.na.drop().show()   #去除 df 中为空值的行
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
|age|firstName|  lastName|       city|postalCode|state|       streetAddress|telePhoneType|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
| 25|Josephine|   Darakjy|   Brighton|     48116|   MI| 4 B Blue Ridge Blvd|       mobile|
| 22|     Jane|Butterburg|New Orleans|     70116|   LA|  6649 N Blue Gum St|       mobile|
| 32|    Boris|    Chemel| Bridgeport|     08014|   NJ|8 W Cerritos Ave #54|       mobile|
| 35|  Johnson|     Smith| Bridgeport|     08112|   NJ|     5 W Blue Ave St|       mobile|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
df.na.replace(10, 20).show()
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
|age|firstName|  lastName|       city|postalCode|state|       streetAddress|telePhoneType|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
| 25|Josephine|   Darakjy|   Brighton|     48116|   MI| 4 B Blue Ridge Blvd|       mobile|
| 22|     Jane|Butterburg|New Orleans|     70116|   LA|  6649 N Blue Gum St|       mobile|
| 32|    Boris|    Chemel| Bridgeport|     08014|   NJ|8 W Cerritos Ave #54|       mobile|
| 35|  Johnson|     Smith| Bridgeport|     08112|   NJ|     5 W Blue Ave St|       mobile|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+

重分区

repartition重分区

df.repartition(10)\
      .rdd \
      .getNumPartitions()   #将 df 拆分为10个分区
10
df.coalesce(1).rdd.getNumPartitions()   #将 df 合并为1个分区
1

运行 SQL 查询

将数据帧注册为视图

peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")

查询视图

df5 = spark.sql("SELECT * FROM customer").show()
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
|age|firstName|  lastName|       city|postalCode|state|       streetAddress|telePhoneType|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
| 25|Josephine|   Darakjy|   Brighton|     48116|   MI| 4 B Blue Ridge Blvd|       mobile|
| 22|     Jane|Butterburg|New Orleans|     70116|   LA|  6649 N Blue Gum St|       mobile|
| 32|    Boris|    Chemel| Bridgeport|     08014|   NJ|8 W Cerritos Ave #54|       mobile|
| 35|  Johnson|     Smith| Bridgeport|     08112|   NJ|     5 W Blue Ave St|       mobile|
+---+---------+----------+-----------+----------+-----+--------------------+-------------+
peopledf2 = spark.sql("SELECT * FROM global_temp.people").show()
+---+-------+
|age|   name|
+---+-------+
| 29|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+

输出

数据结构

rdd1 = df.rdd   #将 df 转换为 RDD
df.toJSON().first()   #将 df 转换为 RDD 字符串
'{"age":25,"firstName":"Josephine","lastName":"Darakjy","city":"Brighton","postalCode":"48116","state":"MI","streetAddress":"4 B Blue Ridge Blvd","telePhoneType":"mobile"}'
df.toPandas()   #将 df 的内容转为 Pandas 的数据帧
agefirstNamelastNamecitypostalCodestatestreetAddresstelePhoneType
025JosephineDarakjyBrighton48116MI4 B Blue Ridge Blvdmobile
122JaneButterburgNew Orleans70116LA6649 N Blue Gum Stmobile
232BorisChemelBridgeport08014NJ8 W Cerritos Ave #54mobile
335JohnsonSmithBridgeport08112NJ5 W Blue Ave Stmobile

保存至文件

df.select("firstName", "city")\
      .write \
      .save("nameAndCity.parquet")
df.select("firstName", "age") \
      .write \
      .save("namesAndAges.json",format="json")

终止SparkSession

终止spark session

spark.stop()

转载请注明来自码农世界,本文标题:《Spark SQL 操作实战》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,74人围观)参与讨论

还没有评论,来说两句吧...

Top