pyspark cmd上的命令
1 读取文件
1.1 基本读取方式
pyspark中是惰性操作,所有变换类操作都是延迟计算的,pyspark只是记录了将要对数据集进行的操作
只有需要数据集将数据返回到 Driver 程序时(比如collect,count,show之类),所有已经记录的变换操作才会执行
注意读取出来的格式是Pyspark DataFrame,不是DataFrame,所以一些操作上是有区别的
1.1.1 format
DataFrame = spark.read.format("csv") .option(name,value) .load(path)
- format表示读取格式csv
- option就是读取csv时可选的选项
- path就是文件所在的路径
1.1.2 csv
DataFrame = spark.read .option(name,value) .csv(path)
- option就是读取csv时可选的选项
- path就是文件所在的路径
1.1.3 读取多个文件
使用spark.read.csv()可以读取多个csv文件
df = spark.read.csv("path1,path2,path3") #读取path1,path2和path3
df= spark.read.csv("Folder path") #读取Folder path里面的所有csv文件
1.2 option 主要参数
sep 默认,
指定单个字符分割字段和值
encoding 默认utf-8
通过给定的编码类型进行解码
header 默认false
是否将第一行作为列名
schema 手动设置输出结果的类型
inferSchema 根据数据预测数据类型
加了的话文件读取的次数是2次。
比如一列int 数据,不设置inferSchema=True的话,那么返回的类型就是string类型,设置了的话,返回类型就是int类型
nullValues
指定在 CSV 中要视为 null 的字符串 1.3 举例
三种设置option的方法:
celltable = spark.read.format("csv") .option("header", "true") .option("delimiter","\t") .load("xxx/test.txt") celltable = spark.read.format("csv") .options(header=True,delimiter='\t') .load("xxx/test.txt") celltable = spark.read.format("csv") .load("xxx/test.txt",header=True,delimiter='\t')
celltable = spark.read .option("header", "true") .option("delimiter","\t") .csv("xxx/test.txt")
此时的celltable不会加载数据
1.3.1 读入多个文件(使用通配符)
celltable = spark.read.format("csv") .option("header", "true") .option("delimiter","\t") .load("xxx/test_*.txt")
2 其他主要函数
3 stat
corr
两列的相关系数
4 创建pyspark DataFrame
4.1 使用Row
from pyspark.sql import Row data = [ Row(id=1, name="Alice", age=25), Row(id=2, name="Bob", age=30), Row(id=3, name="Charlie", age=28) ] df = spark.createDataFrame(data) df.show() ''' +---+---+-------+ |age| id| name| +---+---+-------+ | 25| 1| Alice| | 30| 2| Bob| | 28| 3|Charlie| +---+---+-------+ '''
4.2 不使用Row
employee_salary = [ ("Ali", "Sales", 8000), ("Bob", "Sales", 7000), ("Cindy", "Sales", 7500), ("Davd", "Finance", 10000), ("Elena", "Sales", 8000), ("Fancy", "Finance", 12000), ("George", "Finance", 11000), ("Haffman", "Marketing", 7000), ("Ilaja", "Marketing", 8000), ("Joey", "Sales", 9000)] columns= ["name", "department", "salary"] df = spark.createDataFrame(data = employee_salary, schema = columns) df.show(truncate=False)
参考内容:IBBD.github.io/hadoop/pyspark-csv.md at master · IBBD/IBBD.github.io · GitHub
还没有评论,来说两句吧...