100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > Python技术栈与Spark交叉数据分析双向整合技术实战--大数据ML样本集案例实战

Python技术栈与Spark交叉数据分析双向整合技术实战--大数据ML样本集案例实战

时间:2020-10-16 00:30:40

相关推荐

Python技术栈与Spark交叉数据分析双向整合技术实战--大数据ML样本集案例实战

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@,如有任何学术交流,可随时联系。

1 Python Spark SQL 基本数据处理

Python Spark DataFrame 基础

df = spark.read.parquet('/sql/users.parquet')df.show()+------+--------------+----------------+| name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|null| [3, 9, 15, 20]|| Ben| red| []|+------+--------------+----------------+复制代码

Python Spark DataFrame 聚合统计

CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)0001,Male,19,15,390002,Male,21,15,810003,Female,20,16,60004,Female,23,16,770005,Female,31,17,400006,Female,22,17,76df = spark.read.csv('/sql/customers.csv',header=True)df.printSchema()df.show()root|-- CustomerID: string (nullable = true)|-- Genre: string (nullable = true)|-- Age: string (nullable = true)|-- Annual Income (k$): string (nullable = true)|-- Spending Score (1-100): string (nullable = true)+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0001| Male| 19|15|39||0002| Male| 21|15|81||0003|Female| 20|16| 6||0004|Female| 23|16|77||0005|Female| 31|17|40||0006|Female| 22|17|76||0007|Female| 35|18| 6||0008|Female| 23|18|94||0009| Male| 64|19| 3||0010|Female| 30|19|72||0011| Male| 67|19|14||0012|Female| 35|19|99||0013|Female| 58|20|15||0014|Female| 24|20|77||0015| Male| 37|20|13||0016| Male| 22|20|79||0017|Female| 35|21|35||0018| Male| 20|21|66||0019| Male| 52|23|29||0020|Female| 35|23|98|+----------+------+---+------------------+----------------------+df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show()+---------------------------+-----------------------+--------+|avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)|+---------------------------+-----------------------+--------+| 50.2| 60.56|70|+---------------------------+-----------------------+--------+复制代码

alias(alias)为DataFrame定义一个别名,稍后再函数中就可以利用这个别名来做相关的运 算,例如说自关联Join:

df1 = df.alias('cus1')type(df1)df2 = df.alias('cus2')df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')df3.count()200+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+|0001| Male| 19|15|39|0001| Male| 19|15|39||0002| Male| 21|15|81|0002| Male| 21|15|81||0003|Female| 20|16| 6|0003|Female| 20|16| 6||0004|Female| 23|16|77|0004|Female| 23|16|77||0005|Female| 31|17|40|0005|Female| 31|17|40||0006|Female| 22|17|76|0006|Female| 22|17|76||0007|Female| 35|18| 6|0007|Female| 35|18| 6||0008|Female| 23|18|94|0008|Female| 23|18|94||0009| Male| 64|19| 3|0009| Male| 64|19| 3||0010|Female| 30|19|72|0010|Female| 30|19|72|+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+only showing top 10 rows复制代码

cache(),将DataFrame缓存到StorageLevel对应的缓存级别中,默认是 MEMORY_AND_DISK

df = spark.read.csv('/sql/customers.csv',header=True)a = df.cache()a.show()+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0001| Male| 19|15|39||0002| Male| 21|15|81||0003|Female| 20|16| 6||0004|Female| 23|16|77||0005|Female| 31|17|40||0006|Female| 22|17|76||0007|Female| 35|18| 6||0008|Female| 23|18|94||0009| Male| 64|19| 3||0010|Female| 30|19|72||0011| Male| 67|19|14||0012|Female| 35|19|99|复制代码

checkpoint(eager=True) 对DataFrame设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个 DataFrame上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。

scsc.setCheckpointDir('/datas/checkpoint')a.checkpoint()a.show()复制代码

coalesce(numPartitions) 重分区算法,传入的参数是DataFrame的分区数量。

注意通过read方法读取文件,创建的DataFrame默认的分区数为文件的个数,即一个文件对应一个分区,在分区数少于coalesce指定的分区数的时候,调用coalesce是不起作用的df = spark.read.csv('/sql/customers.csv',header=True)df.rdd.getNumPartitions()1spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions()1df = spark.range(0,20,2,3)df.rdd.getNumPartitions()df.coalesce(2).rdd.getNumPartitions()2复制代码

repartition(numPartitions, *cols)这个方法和coalesce(numPartitions) 方法一样,都是 对DataFrame进行重新的分区,但是repartition这个方法会使用hash算法,在整个集群中进 行shuffle,效率较低。repartition方法不仅可以指定分区数,还可以指定按照哪些列来做分 区。

df = spark.read.csv('/sql/customers.csv',header=True)df.rdd.getNumPartitions()1df2 = df.repartition(3)df2.rdd.getNumPartitions()3df2.columnsdf3 = df2.repartition(6,'Genre')df3.show(20)+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0003|Female| 20|16| 6||0004|Female| 23|16|77||0005|Female| 31|17|40||0006|Female| 22|17|76||0007|Female| 35|18| 6||0008|Female| 23|18|94||0010|Female| 30|19|72||0012|Female| 35|19|99||0013|Female| 58|20|15|df3.rdd.getNumPartitions()6复制代码

colRegex(colName)用正则表达式的方式返回我们想要的列。

df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])df.select(df.colRegex("`(Col1)?+.+`")).show()+---+| a|+---+| 1|| 2|| 3|+---+复制代码

collect(),返回DataFrame中的所有数据,注意数据量大了容易造成Driver节点内存溢 出!

df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])df.collect()[Row(Col1='a', a=1), Row(Col1='b', a=2), Row(Col1='c', a=3)]复制代码

columns,以列表的形式返回DataFrame的所有列名

df = spark.read.csv('/sql/customers.csv',header=True)df.columnsdf = spark.read.csv('/sql/customers.csv',header=True)df.columns['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']复制代码

SparkSQL DataFrame 转换为 PandasDataFrame

df = spark.read.csv('/sql/customers.csv',header=True)pdf = df.toPandas()复制代码

Pandas 相关数据处理操作

pdf.info()<class 'pandas.core.frame.DataFrame'>RangeIndex: 200 entries, 0 to 199Data columns (total 5 columns):CustomerID200 non-null objectGenre 200 non-null objectAge 200 non-null objectAnnual Income (k$) 200 non-null objectSpending Score (1-100) 200 non-null objectdtypes: object(5)memory usage: 7.9+ KBpdf['Age'] = pdf['Age'].astype('int')pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int')pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int')pdf.info()<class 'pandas.core.frame.DataFrame'>RangeIndex: 200 entries, 0 to 199Data columns (total 5 columns):CustomerID200 non-null objectGenre 200 non-null objectAge 200 non-null int64Annual Income (k$) 200 non-null int64Spending Score (1-100) 200 non-null int64dtypes: int64(3), object(2)memory usage: 7.9+ KB复制代码

PandasDataFrame 转换为 SparkSQL DataFrame

df1 = spark.createDataFrame(pdf)df1.corr("Age","Annual Income (k$)")df1.corr("Spending Score (1-100)","Annual Income (k$)")0.009902848094037492复制代码

count()返回DataFrame中Row的数量

df = spark.read.csv('/sql/customers.csv',header=True)df.count()200复制代码

createGlobalTempView(name)使用DataFrame创建一个全局的临时表,其生命周期 和启动的app的周期一致,即启动的spark应用存在则这个临时的表就一直能访问。直到 sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库 中。

df = spark.read.csv('/sql/customers.csv',header=True)#df.createGlobalTempView('TT')spark.sql('select * from global_temp.TT').show()+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0001| Male| 19|15|39||0002| Male| 21|15|81||0003|Female| 20|16| 6||0004|Female| 23|16|77||0005|Female| 31|17|40||0006|Female| 22|17|76||0007|Female| 35|18| 6|复制代码

createOrReplaceGlobalTempView(name)上面的方法当遇到已经创建了的临时表名 的话会报错,而这个方法遇到已经存在的临时表会进行替换,没有则创建。

df = spark.read.csv('/sql/customers.csv',header=True)df.createOrReplaceGlobalTempView('TT')spark.sql('select * from global_temp.TT').show()+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0001| Male| 19|15|39||0002| Male| 21|15|81||0003|Female| 20|16| 6||0004|Female| 23|16|77||0005|Female| 31|17|40||0006|Female| 22|17|76|复制代码

crossJoin(other)返回两个DataFrame的笛卡尔积组合。不要轻易尝试这个方法,非常 耗时且费资源

df1 = spark.createDataFrame([('regan',27),('ting',24)],schema=['name','age'])df2 = spark.createDataFrame([('regan',65),('ting',48)],schema=['name','weight'])df3 = df1.coalesce(3).crossJoin(df2.coalesce(3))df3.show()+-----+---+-----+------+| name|age| name|weight|+-----+---+-----+------+|regan| 27|regan| 65||regan| 27| ting| 48|| ting| 24|regan| 65|| ting| 24| ting| 48|+-----+---+-----+------+复制代码

cube(*cols)在当前的DataFrame上创建多维的数据立方体

from pyspark.sql.functions import *df = spark.read.csv('/sql/customers.csv',header=True)df.cube('Age','Genre').count().orderBy(desc("count"), asc("Age")).show()+----+------+-----+| Age| Genre|count|+----+------+-----+|null| null| 200||null|Female| 112||null| Male| 88|| 32| null| 11|| 35| null| 9|| 19| null| 8|| 31| null| 8|| 30| null| 7|| 31|Female| 7|| 49| null| 7|| 19| Male| 6|| 23|Female| 6|| 23| null| 6|| 27| null| 6|| 32|Female| 6|| 35|Female| 6|| 36| null| 6|| 38| null| 6|| 40| null| 6|| 47| null| 6|+----+------+-----+only showing top 20 rows复制代码

describe(*cols)统计cols对应的基本的统计信息,包括数量、最大值、最小值、均值及标 准差

df = spark.read.csv('/sql/customers.csv',header=True)#df.describe('Age')df.describe('Age','Genre').show()+-------+-----------------+------+|summary| Age| Genre|+-------+-----------------+------+| count| 200| 200|| mean| 38.85| null|| stddev|13.96900733155888| null|| min|18|Female|| max|70| Male|+-------+-----------------+------+df.describe().show()+-------+------------------+------+-----------------+------------------+----------------------+|summary| CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|+-------+------------------+------+-----------------+------------------+----------------------+| count|200| 200| 200|200| 200|| mean| 100.5| null| 38.85| 60.56| 50.2|| stddev|57.879184513951124| null|13.96900733155888| 26.26472116527124| 25.823521668370173|| min| 0001|Female|18|101| 1|| max| 0200| Male|70|99|99|+-------+------------------+------+-----------------+------------------+----------------------+pdf=df.toPandas()pdf.describe()复制代码

distinct()返回DataFrame中非重复的数据

df = spark.createDataFrame([(1,1),(1,2),(1,2),(5,5)])df.count()df.distinct().count()df = spark.createDataFrame([(1,1),(1,2),(1,2),(5,5)])df.count()4 df.distinct().count()3复制代码

drop(*cols)按照列名删除DataFrame中的列,返回新的DataFrame

df = spark.read.csv('/sql/customers.csv',header=True)df.columns['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']df1 = df.drop('Age')df1.columns['CustomerID', 'Genre', 'Annual Income (k$)', 'Spending Score (1-100)']复制代码

dropDuplicates(subset=None)删除重复行,subset用于指定在删除重复行的时候考 虑那几列。

from pyspark.sql import Rowdf = sc.parallelize([Row(name='regan', age=27, height=170),Row(name='regan', age=27, height=170),Row(name='regan', age=27, height=155)],3).toDF()df.show()+---+------+-----+|age|height| name|+---+------+-----+| 27| 170|regan|| 27| 170|regan|| 27| 155|regan|+---+------+-----+df.dropDuplicates().show()+---+------+-----+|age|height| name|+---+------+-----+| 27| 155|regan|| 27| 170|regan|+---+------+-----+df.dropDuplicates(subset=['age','name']).show()+---+------+-----+|age|height| name|+---+------+-----+| 27| 170|regan|+---+------+-----+复制代码

2 Python Spark SQL 数据高级处理

numpy自由引入

dropna(how='any', thresh=None, subset=None)删除DataFrame中的na数据,关键字参 数how指定如何删,有“any”和‘all’两种选项,thresh指定行中na数据有多少个时删除整行数 据,这个设置将覆盖how关键字参数的设置,subset指定在那几列中删除na数据。

import numpy as npdf = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),(np.nan,np.nan,170.)],schema=['luck','age','weight'])df.show()+----+----+------+|luck| age|weight|+----+----+------+| NaN|27.0| 170.0||44.0|27.0| 170.0|| NaN| NaN| 170.0|+----+----+------+df.dropna(how='any').show()+----+----+------+|luck| age|weight|+----+----+------+|44.0|27.0| 170.0|+----+----+------+df.dropna(how='all').show()+----+----+------+|luck| age|weight|+----+----+------+| NaN|27.0| 170.0||44.0|27.0| 170.0|| NaN| NaN| 170.0|+----+----+------+df.dropna(thresh=2).show()+----+----+------+|luck| age|weight|+----+----+------+| NaN|27.0| 170.0||44.0|27.0| 170.0|+----+----+------+复制代码

dtypes返回DataFrame列的名字及对应的数据类型组成的tuple列表

import numpy as npdf = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),(np.nan,np.nan,170.)],schema=['luck','age','weight'])df.dtypes[('luck', 'double'), ('age', 'double'), ('weight', 'double')]复制代码

fillna(value, subset=None)用于DataFrame中空数据的填充。

import numpy as npf = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),(np.nan,np.nan,170.)],schema=['luck','age','weight']).show()+----+----+------+|luck| age|weight|+----+----+------+| NaN|27.0| 170.0||44.0|27.0| 170.0|| NaN| NaN| 170.0|+----+----+------+df.na.fill(0.0).show()+----+----+------+|luck| age|weight|+----+----+------+| 0.0|27.0| 170.0||44.0|27.0| 170.0|| 0.0| 0.0| 170.0|+----+----+------+df.fillna(0.0).show()+----+----+------+|luck| age|weight|+----+----+------+| 0.0|27.0| 170.0||44.0|27.0| 170.0|| 0.0| 0.0| 170.0|+----+----+------+df.na.fill(False).show()+----+----+------+|luck| age|weight|+----+----+------+| NaN|27.0| 170.0||44.0|27.0| 170.0|| NaN| NaN| 170.0|+----+----+------+df.na.fill({'luck':0.0,'age':50.0}).show()+----+----+------+|luck| age|weight|+----+----+------+| 0.0|27.0| 170.0||44.0|27.0| 170.0|| 0.0|50.0| 170.0|+----+----+------+复制代码

filter(condition)按照传入的条件进行过滤,其实where方法就是filter方法的一个别名 而已。

import numpy as npdf = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),(np.nan,np.nan,170.)],schema=['luck','age','weight'])df.filter(df.luck != np.nan).show()+----+----+------+|luck| age|weight|+----+----+------+|44.0|27.0| 170.0|+----+----+------+import numpy as npdf = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),(np.nan,np.nan,170.)],schema=['luck','age','weight'])df.filter('luck <> "NaN" ').show()+----+----+------+|luck| age|weight|+----+----+------+|44.0|27.0| 170.0|+----+----+------+复制代码

first()返回DataFrame的第一条记录

import numpy as npdf = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),(np.nan,np.nan,170.)],schema=['luck','age','weight'])df.show()df.first()Row(luck=nan, age=27.0, weight=170.0)复制代码

foreach(f),在每一个Row上运用f方法,实际上它调用的是df.rdd.foreach这个机遇 RDD上的foreach方法。(测试未通过)

import numpy as npdf = spark.createDataFrame([(np.nan,27.,170.),(44.,27.,170.),(np.nan,np.nan,170.)],schema=['luck','age','weight'])def myprint(x):print(x.age)df.foreach(lambda x:print(x))def pprint(x):for p in x:print(p.luck)df.foreachPartition(pprint)复制代码

groupBy(*cols)使用给定的列进行分组,返回GroupedData对象

df = spark.read.csv('/sql/customers.csv',header=True)df.columns['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']df.groupby('Genre').agg({'Age':'mean'}).show()+------+------------------+| Genre|avg(Age)|+------+------------------+|Female|38.098214285714285|| Male| 39.80681818181818|+------+------------------+复制代码

head(n=None)返回DataFrame前n行数据,默认是返回1行,可以通过n关键字参数指定

df = spark.read.csv('/sql/customers.csv',header=True)df.head(6)复制代码

hint(name, *parameters),hint方法用于两个DataFrame做Join操作的时候,指定Join的 方式,一般为broadcast的方式。hint是暗示的意思,可以看出作者还是挺幽默的,给程序一 个暗示,按照那种方式join。

df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])df2 = spark.createDataFrame([('regan',130),('ting',90)],schema=['name','weight'])df3 = df1.join(df2.hint('broadcast'),'name').show()+-----+---+------+| name|age|weight|+-----+---+------+|regan| 23| 130|| ting| 24| 90|+-----+---+------+复制代码

intersect(other)返回两个DataFrame的交集是集合中的概念

df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])df2 = spark.createDataFrame([('regan',23),('ting',90)],schema=['name','age'])df3 = df1.intersect(df2).show()+-----+---+| name|age|+-----+---+|regan| 23|+-----+---+复制代码

join(other, on=None, how=None),用来对两个DataFrame做连接关联操作,other是另 外一个DataFrame,on指定以哪个字段做关联,how指定怎么关联,有 inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti选项,默认是inner。

df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])df2 = spark.createDataFrame([('regan',130),('ting2',90)],schema=['name','weight'])df1.join(df2,on='name',how='left_outer').show()+-----+---+------+| name|age|weight|+-----+---+------+|regan| 23| 130|| ting| 24| null|+-----+---+------+df1.join(df2,on='name',how='right_outer').show()+-----+----+------+| name| age|weight|+-----+----+------+|regan| 23| 130||ting2|null| 90|+-----+----+------+df1.join(df2,on='name',how='left_semi').show()+-----+---+| name|age|+-----+---+|regan| 23|+-----+---+df1.join(df2,on='name',how='left_anti').show()+----+---+|name|age|+----+---+|ting| 24|+----+---+复制代码

limit(num)限制返回的数据的条数,防止返回到driver节点的数据过大造成OOM

df1 = spark.createDataFrame([('regan',23),('ting',24)],schema=['name','age'])df1.limit(1).collect()复制代码

orderBy(*cols, **kwargs),返回按照指定列排好序的新的DataFrame。

df = spark.read.csv('/sql/customers.csv',header=True)df.orderBy('Age').show(3)df.orderBy('Age',ascending=False).show(3)+----------+-----+---+------------------+----------------------+|CustomerID|Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+-----+---+------------------+----------------------+|0034| Male| 18|33|92||0066| Male| 18|48|59||0092| Male| 18|59|41|+----------+-----+---+------------------+----------------------+only showing top 3 rows+----------+-----+---+------------------+----------------------+|CustomerID|Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+-----+---+------------------+----------------------+|0061| Male| 70|46|56||0071| Male| 70|49|55||0058| Male| 69|44|46|+----------+-----+---+------------------+----------------------+only showing top 3 rowsdf.orderBy(desc("Age")).show(3)df.orderBy(df.Age.desc()).show(3)orderBy方法和sort方法类似df.sort(desc("Age")).show(3)df.sort(df.Age.desc()).show(3)复制代码

persist(storageLevel=StorageLevel(True, True, False, False, 1))用来指定DataFrame 的缓存级别,默认为内存和磁盘。

from pyspark import StorageLeveldf = spark.read.csv('/sql/customers.csv',header=True)df.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_2)DataFrame[CustomerID: string, Genre: string, Age: string, Annual Income (k$): string, Spending Score (1-100): string]复制代码

randomSplit(weights, seed=None),按照给定的权重将DataFrame分为几个 DataFrame,seed关键字参数用来指定随机种子,用于复现结果。

df = spark.range(0.,30.,2,3)df.show()df.describe().show()dfs = df.randomSplit([1.0,4.0],24)for df in dfs:df.show()复制代码

rdd,返回DataFrame对应的RDD对象,利用这个对象可以调用RDD上的所有的方法,但 是这些方法是比较底层的方法,在处理一些特殊任务的时候,顶层的DataFrame的方法可 能无法解决,需要转换到更底层的RDD上来进行操作。

df = spark.range(0.,30.,2,3)rdd = df.rddrdd.map(lambda x:x.id ** 2).collect()复制代码

replace(to_replace, value=, subset=None)这个方法通过第一个参数指定要 被替换掉的老的值,第二个参数指定新的值,subset关键字参数指定子集,默认是在整个 DataFrame上进行替换。把数据集中的99换成100

注意上面在替换的过程中to_replace和value的类型必须要相同,而且to_replace数据类型只能是:bool, int, long, float, string, list or dict。value数据类型只能是: bool, int, long, float,string, list or Nonedf = spark.read.csv('/sql/customers.csv',header=True)df.columnsdf.show()df2 = df.replace('99','100')df2.show()df.replace(['Female','Male'],['F','M'],'Genre').show()+----------+-----+---+------------------+----------------------+|CustomerID|Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+-----+---+------------------+----------------------+|0001| M| 19|15|39||0002| M| 21|15|81||0003| F| 20|16| 6||0004| F| 23|16|77||0005| F| 31|17|40||0006| F| 22|17|76||0007| F| 35|18| 6||0008| F| 23|18|94||0009| M| 64|19| 3||0010| F| 30|19|72||0011| M| 67|19|14||0012| F| 35|19|99||0013| F| 58|20|15||0014| F| 24|20|77||0015| M| 37|20|13||0016| M| 22|20|79||0017| F| 35|21|35||0018| M| 20|21|66||0019| M| 52|23|29||0020| F| 35|23|98|+----------+-----+---+------------------+----------------------+df.na.replace(['Female','Male'],['F','M'],'Genre').show()复制代码

rollup(*cols),按照指定的列名进行汇总,这样就可以在汇总的数据集上运用聚合函数

from pyspark.sql.functions import *df = spark.read.csv('/sql/customers.csv',header=True)df.rollup('Genre','Age').count().orderBy(desc('count'),'Genre').show()+------+----+-----+| Genre| Age|count|+------+----+-----+| null|null| 200||Female|null| 112|| Male|null| 88||Female| 31| 7||Female| 23| 6||Female| 49| 6||Female| 32| 6||Female| 35| 6|| Male| 19| 6||Female| 30| 5|| Male| 32| 5|| Male| 48| 5||Female| 21| 4||Female| 47| 4||Female| 50| 4||Female| 36| 4||Female| 29| 4||Female| 27| 4||Female| 38| 4|| Male| 59| 4|+------+----+-----+复制代码

sample(withReplacement=None, fraction=None, seed=None),用于从DataFrame中进行 采样的方法,withReplacement关键字参数用于指定是否采用有放回的采样,true为有放回 采用,false为无放回的采样,fraction指定采样的比例,seed采样种子,相同的种子对应的 采样总是相同的,用于场景的复现。

df = spark.read.csv('/sql/customers.csv',header=True)df.count()200df2 = df.sample(withReplacement=True,fraction=0.2,seed=1)df2.count()35复制代码

sampleBy(col, fractions, seed=None),按照指定的col列根据fractions指定的比例进行分 层抽样,seed是随机种子,用于场景的复现。

df = spark.read.csv('/sql/customers.csv',header=True)df.sampleBy('Genre',{'Male':0.1,'Female':0.15}).groupBy('Genre').count().show()+------+-----+| Genre|count|+------+-----+|Female| 15|| Male| 11|+------+-----+复制代码

select(*cols),通过表达式选取DataFrame中符合条件的数据,返回新的DataFrame

f = spark.read.csv('/sql/customers.csv',header=True)df.select('*').count()df.select('Age','Genre').show(10)df.select(df.Age.alias('age')).show(10)复制代码

selectExpr(*expr),这个方法是select方法的一个变体,他可以接收一个SQL表达式, 返回新的DataFrame

df = spark.read.csv('/sql/customers.csv',header=True)df.selectExpr('Age * 2','sqrt(Age)').show(10)df = spark.read.csv('/sql/customers.csv',header=True)df.selectExpr('Age * 2','sqrt(Age)').show(10)+---------+-------------------------+|(Age * 2)|SQRT(CAST(Age AS DOUBLE))|+---------+-------------------------+|38.0| 4.358898943540674||42.0| 4.58257569495584||40.0| 4.47213595499958||46.0| 4.795831523312719||62.0| 5.5677643628300215||44.0| 4.69041575982343||70.0| 5.916079783099616||46.0| 4.795831523312719|| 128.0| 8.0||60.0| 5.477225575051661|+---------+-------------------------+复制代码

show(n=20, truncate=True, vertical=False),这个方法默认返回DataFrame的前20行记 录,可以通过truncate指定超过20个字符的记录将会被截断,vertical指定是否垂直显示。

df = spark.read.csv('/sql/customers.csv',header=True)df.selectExpr('Age * 2','sqrt(Age)').show(10,truncate=False,vertical=False)复制代码

sortWithinPartitions(*cols, **kwargs)和sort(*cols, **kwargs),这两个方法都是 用指定的cols列进行排序,通过kwargs参数指定升序降序。

sortWithinPartitions(*cols, **kwargs)和sort(*cols, **kwargs),这两个方法都是 用指定的cols列进行排序,通过kwargs参数指定升序降序。

df = spark.read.csv('/sql/customers.csv',header=True)df.sort(['Age','Genre'],ascending=True).show(10)df.sort(df.Age.desc()).show(10)from pyspark.sql.functions import *df.sortWithinPartitions(['Age','Genre'],ascending=False).show(10)+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0061| Male| 70|46|56||0071| Male| 70|49|55||0058| Male| 69|44|46||0109| Male| 68|63|43||0068|Female| 68|48|48||0091|Female| 68|59|55||0011| Male| 67|19|14||0083| Male| 67|54|41||0103| Male| 67|62|59||0063|Female| 67|47|52|+----------+------+---+------------------+----------------------+df.sortWithinPartitions(desc('Age')).show(10)复制代码

subtract(other),这个方法用来获取在A集合里而不再B集合里的数据,返回新的 DataFrame

df1 = spark.createDataFrame([('regan',),('ting',),('yu',)],schema=['name'])df2 = spark.createDataFrame([('regan',),('ting',),('sha',)],schema=['name'])df3 = df1.subtract(df2)df3.show()复制代码

summary(*statistics),用传入的统计方法返回概要信息。不传参数会默认计算count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max, *statistics参数可以是: count ­ mean ­ stddev ­ min ­ max ­ arbitrary approximate percentiles

f = spark.read.csv('/sql/customers.csv',header=True)df.summary().show()df.summary('min','count','75%').show()+-------+------------------+------+-----------------+------------------+----------------------+|summary| CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|+-------+------------------+------+-----------------+------------------+----------------------+| count|200| 200| 200|200| 200|| mean| 100.5| null| 38.85| 60.56| 50.2|| stddev|57.879184513951124| null|13.96900733155888| 26.26472116527124| 25.823521668370173|| min| 0001|Female|18|101| 1|| 25%| 50.0| null| 28.0| 40.0| 34.0|| 50%| 100.0| null| 36.0| 61.0| 50.0|| 75%| 150.0| null| 49.0| 78.0| 73.0|| max| 0200| Male|70|99|99|+-------+------------------+------+-----------------+------------------+----------------------++-------+----------+------+----+------------------+----------------------+|summary|CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|+-------+----------+------+----+------------------+----------------------+| min|0001|Female| 18|101| 1|| count| 200| 200| 200|200| 200|| 75%|150.0| null|49.0| 78.0| 73.0|+-------+----------+------+----+------------------+----------------------+复制代码

take(num),返回DataFrame的前num个Row数据组成的列表,注意num不要太大,容易 造成driver节点的OOM。

df = spark.read.csv('/sql/customers.csv',header=True)df.take(3)复制代码

toDF(*cols),返回新的带有指定cols名字的DataFrame对象

df = spark.read.csv('/sql/customers.csv',header=True)df.columnsdf1 = df.toDF('id','sex','age','income','score')df1.columnsdf1.show(5)复制代码

toJSON(use_unicode=True),将DataFrame中的Row对象转换为json字符串,默认使用 unicode编码。toJSON方法返回的是RDD对象,而不是DataFrame对象。

df = spark.read.csv('/sql/customers.csv',header=True)df.show(5)df1 = df.toJSON()df1.collect()+----------+------+---+------------------+----------------------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0001| Male| 19|15|39||0002| Male| 21|15|81||0003|Female| 20|16| 6||0004|Female| 23|16|77||0005|Female| 31|17|40|+----------+------+---+------------------+----------------------+only showing top 5 rows['{"CustomerID":"0001","Genre":"Male","Age":"19","Annual Income (k$)":"15","Spending Score (1-100)":"39"}','{"CustomerID":"0002","Genre":"Male","Age":"21","Annual Income (k$)":"15","Spending Score (1-100)":"81"}','{"CustomerID":"0003","Genre":"Female","Age":"20","Annual Income (k$)":"16","Spending Score (1-100)":"6"}',......]复制代码

toLocalIterator(),将DataFrame中所有数据返回为本地的可迭代的数据,数据量大 了容易OOM。(调试未通过)

df = spark.read.csv('/sql/customers.csv',header=True)results = df.toLocalIterator()for data in results:print(data)复制代码

toPandas(),将Spark中的DataFrame对象转换为pandas中的DataFrame对象

df = spark.read.csv('/sql/customers.csv',header=True)pan_df = df.toPandas()pan_dfpan_df.head(10)复制代码

union(other),返回两个DataFrame的合集。

df1 = spark.createDataFrame([('regan',),('ting',),('yu',)],schema=['name'])df2 = spark.createDataFrame([('regan',),('ting',),('sha',)],schema=['name'])+-----+| name|+-----+|regan|| ting|| yu||regan|| ting|| sha|+-----+复制代码

unionByName(other)根据名字来找出两个DataFrame的合集,与字段的顺序没关系,只 要字段名称能对应上即可。

unpersist(blocking=False),这个方法用于将DataFrame上持久化的数据全部清除掉。

df1 = spark.createDataFrame([('regan',11),('ting',1),('yu',2)],schema=['name','score'])df1.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)df1.storageLeveldf1.unpersist()df1.storageLevel复制代码

where(condition),这个方法和filter方法类似。更具传入的条件作出选择。

df = spark.read.csv('/sql/customers.csv',header=True)df.where('Age >= 30').show()复制代码

withColumn(colName, col),返回一个新的DataFrame,这个DataFrame中新增加 colName的列,或者原来本身就有colName的列,则替换掉。

f = spark.read.csv('/sql/customers.csv',header=True)df.withColumn('Age',df.Age**2).show(10)df.withColumn('Age2',df.Age**2).show(10)+----------+------+------+------------------+----------------------+|CustomerID| Genre| Age|Annual Income (k$)|Spending Score (1-100)|+----------+------+------+------------------+----------------------+|0001| Male| 361.0|15|39||0002| Male| 441.0|15|81||0003|Female| 400.0|16| 6||0004|Female| 529.0|16|77||0005|Female| 961.0|17|40||0006|Female| 484.0|17|76||0007|Female|1225.0|18| 6||0008|Female| 529.0|18|94||0009| Male|4096.0|19| 3||0010|Female| 900.0|19|72|+----------+------+------+------------------+----------------------+only showing top 10 rows+----------+------+---+------------------+----------------------+------+|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)| Age2|+----------+------+---+------------------+----------------------+------+|0001| Male| 19|15|39| 361.0||0002| Male| 21|15|81| 441.0||0003|Female| 20|16| 6| 400.0||0004|Female| 23|16|77| 529.0||0005|Female| 31|17|40| 961.0||0006|Female| 22|17|76| 484.0||0007|Female| 35|18| 6|1225.0||0008|Female| 23|18|94| 529.0||0009| Male| 64|19| 3|4096.0||0010|Female| 30|19|72| 900.0|+----------+------+---+------------------+----------------------+------+only showing top 10 rows复制代码

withColumnRenamed(existing, new),对已经存在的列名重命名为new,若名称不存在 则这个操作不做任何事情。

df = spark.read.csv('/sql/customers.csv',header=True)df.withColumnRenamed('Age','age').show(10)df.withColumnRenamed('Age2','age').show(10)+----------+------+---+------------------+----------------------+|CustomerID| Genre|age|Annual Income (k$)|Spending Score (1-100)|+----------+------+---+------------------+----------------------+|0001| Male| 19|15|39||0002| Male| 21|15|81||0003|Female| 20|16| 6||0004|Female| 23|16|77||0005|Female| 31|17|40||0006|Female| 22|17|76||0007|Female| 35|18| 6||0008|Female| 23|18|94||0009| Male| 64|19| 3||0010|Female| 30|19|72|+----------+------+---+------------------+----------------------+复制代码

write,借助这个接口将DataFrame的内容保存到外部的系统

df = spark.read.csv('/sql/customers.csv',header=True)df.write复制代码

3 Spark SQL 高级用法cube及上卷

group by:主要用来对查询的结果进行分组,相同组合的分组条件在结果集中只显示一行记录。可以添加聚合函数。

grouping sets:对分组集中指定的组表达式的每个子集执行group by,group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是一个集合,比如group by A,B,C grouping sets((A,B),(A,C))。

rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作。

cube:为指定表达式集的每个可能组合创建分组集。首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),( C),最后对全表进行group by操作。

case class MemberOrderInfo(area:String,memberType:String,product:String,price:Int)import spark.implicits._val orders=Seq(MemberOrderInfo("深圳","钻石会员","钻石会员1个月",25),MemberOrderInfo("深圳","钻石会员","钻石会员1个月",25),MemberOrderInfo("深圳","钻石会员","钻石会员3个月",70),MemberOrderInfo("深圳","钻石会员","钻石会员12个月",300),MemberOrderInfo("深圳","铂金会员","铂金会员3个月",60),MemberOrderInfo("深圳","铂金会员","铂金会员3个月",60),MemberOrderInfo("深圳","铂金会员","铂金会员6个月",120),MemberOrderInfo("深圳","黄金会员","黄金会员1个月",15))把seq转换成DataFrameval memberDF:DataFrame =orders.toDF()把DataFrame注册成临时表memberDF.createOrReplaceGlobalTempView("orderTempTable")group byspark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product").show+----+----------+--------+-----+ |area|memberType| product|total|+----+----------+--------+-----+| 深圳|钻石会员| 钻石会员3个月| 70|| 深圳|钻石会员|钻石会员12个月| 300|| 深圳|铂金会员| 铂金会员6个月| 120|| 深圳|铂金会员| 铂金会员3个月| 120|| 深圳|钻石会员| 钻石会员1个月| 50|| 深圳|黄金会员| 黄金会员1个月| 15|+----+----------+--------+-----+spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product grouping sets(area,memberType,product)").show+----+----------+--------+-----+ |area|memberType| product|total|+----+----------+--------+-----+|null|null| 铂金会员3个月| 120||null|铂金会员| null| 240||null|null|钻石会员12个月| 300|| 深圳|null| null| 675||null|钻石会员| null| 420||null|null| 钻石会员1个月| 50||null|null| 黄金会员1个月| 15||null|null| 钻石会员3个月| 70||null|黄金会员| null| 15||null|null| 铂金会员6个月| 120|+----+----------+--------+-----+spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product grouping sets((area,memberType),(memberType,product))").show+----+----------+--------+-----+ |area|memberType| product|total|+----+----------+--------+-----+|null|铂金会员| 铂金会员6个月| 120||null|钻石会员|钻石会员12个月| 300||null|钻石会员| 钻石会员3个月| 70|| 深圳|钻石会员| null| 420||null|铂金会员| 铂金会员3个月| 120||null|黄金会员| 黄金会员1个月| 15||null|钻石会员| 钻石会员1个月| 50|| 深圳|黄金会员| null| 15|| 深圳|铂金会员| null| 240|+----+----------+--------+-----+spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product with rollup").show+----+----------+--------+-----+ |area|memberType| product|total|+----+----------+--------+-----+| 深圳|钻石会员| 钻石会员1个月| 50|| 深圳|钻石会员|钻石会员12个月| 300|| 深圳|铂金会员| 铂金会员3个月| 120|| 深圳|钻石会员| null| 420|| 深圳|null| null| 675||null|null| null| 675|| 深圳|钻石会员| 钻石会员3个月| 70|| 深圳|黄金会员| 黄金会员1个月| 15|| 深圳|黄金会员| null| 15|| 深圳|铂金会员| null| 240|| 深圳|铂金会员| 铂金会员6个月| 120|+----+----------+--------+-----+spark.sql("select area,memberType,product,sum(price) as total from orderTempTable group by area,memberType,product with cube").show+----+----------+--------+-----+|area|memberType| product|total|+----+----------+--------+-----+| 深圳|null| 黄金会员1个月| 15||null|null| 铂金会员3个月| 120|| 深圳|null| 铂金会员6个月| 120||null|铂金会员| 铂金会员6个月| 120||null|铂金会员| null| 240|| 深圳|钻石会员| 钻石会员1个月| 50|| 深圳|null| 钻石会员1个月| 50||null|钻石会员|钻石会员12个月| 300|| 深圳|钻石会员|钻石会员12个月| 300|| 深圳|铂金会员| 铂金会员3个月| 120||null|钻石会员| 钻石会员3个月| 70|| 深圳|钻石会员| null| 420||null|null|钻石会员12个月| 300|| 深圳|null| null| 675||null|铂金会员| 铂金会员3个月| 120||null|钻石会员| null| 420||null|黄金会员| 黄金会员1个月| 15||null|钻石会员| 钻石会员1个月| 50||null|null| 钻石会员1个月| 50||null|null| null| 675|+----+----------+--------+-----+复制代码

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@,如有任何学术交流,可随时联系。

4 总结

Python技术栈与Spark交叉数据分析双向整合,让我们在大数据融合分析达到了通用,可以发现Spark SQL 其实很大部分功能和Pandas雷同

秦凯新 于深圳 12172352

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。