PySpark速查
1. 引言
PySpark是Apache Spark的Python API,支持大规模数据处理的分布式计算。其核心抽象包括RDD(弹性分布式数据集)和DataFrame。RDD提供底层API,适合需要精细控制的数据处理;DataFrame则提供更高层的抽象,具备性能优化和类似SQL的操作接口。本文从环境搭建开始,全面覆盖RDD与DataFrame的基础操作,并深入SQL查询、用户自定义函数、性能优化、机器学习及流处理等进阶内容,每个知识点均配有代码示例和实用评论,帮助读者快速掌握PySpark的核心用法。
2. 环境搭建与配置
安装PySpark并初始化SparkSession:1
pip install pyspark
1
2
3
4
5
6
7from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[2]") \
.appName("PySpark速查") \
.getOrCreate()
sc = spark.sparkContext # 获取SparkContext,RDD操作需要
评论:local[2]表示本地模式使用2个核心。生产环境通常通过--master指定集群管理器。SparkSession是Spark 2.0后统一入口,同时封装了SparkContext、SQLContext等。
3. RDD基础操作
3.1 创建RDD
RDD可从Python集合或外部文件创建。1
2
3
4
5
6
7
8# 从列表创建(parallelize)
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 从元组创建
rdd = sc.parallelize(('cat', 'dog', 'fish'))
# 从集合(set)创建(自动去重)
rdd = sc.parallelize({'cat', 'dog', 'fish', 'cat'})
# 从文件创建(textFile)
rdd_text = sc.textFile("wordCount.txt")
评论:parallelize用于测试,大数据场景应使用textFile或后续的DataFrame读取接口。collect()可将RDD内容拉取到驱动节点,仅限小数据调试。
3.2 单RDD转换
转换操作返回新RDD,惰性执行。
3.2.1 map()
对每个元素应用函数。1
2
3
4
5
6
7rdd = sc.parallelize([1, 2, 3])
rdd.map(lambda x: x + 1).collect() # [2, 3, 4]
# 字符串处理
rdd = sc.parallelize(['cat', 'dog'])
rdd.map(lambda x: 'object: ' + x).collect() # ['object: cat', 'object: dog']
# 生成键值对
rdd.map(lambda x: (x, 1)).collect() # [('cat', 1), ('dog', 1)]
3.2.2 filter()
返回使函数为真的元素。1
2rdd = sc.parallelize([3, 1, 2, 5, 5])
rdd.filter(lambda x: x > 2).collect() # [3, 5, 5]
3.2.3 distinct()
去重。1
2rdd = sc.parallelize([3, 1, 2, 5, 5, 5])
rdd.distinct().collect() # [1, 2, 3, 5](顺序可能变化)
3.2.4 randomSplit()
按权重随机切分RDD。1
2
3
4rdd = sc.parallelize(range(10))
splits = rdd.randomSplit([0.6, 0.4], seed=42)
splits[0].collect() # 第一部分
splits[1].collect() # 第二部分
3.2.5 groupBy()
根据函数返回值分组。1
2
3
4rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
grouped = rdd.groupBy(lambda x: x % 2)
result = [(k, list(v)) for k, v in grouped.collect()]
print(result) # [(1, [1,1,3,5]), (0, [2,8])]
3.2.6 flatMap()
每个输入元素可生成多个输出元素,并展平。1
2
3rdd = sc.parallelize(['hello world', 'hi', 'dog'])
rdd.flatMap(lambda x: x.split(' ')).collect()
# ['hello', 'world', 'hi', 'dog']
3.3 多RDD转换
3.3.1 union()
合并两个RDD(不改变分区数)。1
2
3rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3, 4])
rdd1.union(rdd2).collect() # [1, 2, 3, 4]
3.3.2 intersection()
返回两个RDD共有的元素(去重)。1
2
3rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
rdd1.intersection(rdd2).collect() # [3]
3.3.3 subtract()
返回在第一个RDD中但不在第二个中的元素。1
2
3rdd1 = sc.parallelize([1, 1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
rdd1.subtract(rdd2).collect() # [1, 1, 2]
3.3.4 cartesian()
计算两个RDD的笛卡尔积。1
2
3rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize([3, 4])
rdd1.cartesian(rdd2).collect() # [(1,3), (1,4), (2,3), (2,4)]
3.4 RDD动作
动作触发实际计算并返回结果到驱动。
3.4.1 first() / take(n)
1 | rdd = sc.parallelize([21, 1, 2, 3]) |
3.4.2 takeOrdered(n, key)
返回排序后的前n个元素。1
2
3rdd = sc.parallelize([10, 1, 2, 9, 3])
rdd.takeOrdered(3) # [1, 2, 3](升序)
rdd.takeOrdered(3, lambda x: -x) # [10, 9, 3](降序)
3.5 基本统计
RDD提供便捷的统计方法(需数值型)。1
2
3
4
5
6
7rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.min() # 1
rdd.max() # 5
rdd.mean() # 3.0
rdd.sum() # 15
rdd.stdev() # 样本标准差
rdd.count() # 5
3.6 键值对RDD(PairRDD)
当RDD元素为(key, value)对时,可调用专属方法。
3.6.1 获取键和值
1 | kv = sc.parallelize([(3,4), (3,6), (5,6), (1,2)]) |
3.6.2 filter() 基于键或值
1 | kv.filter(lambda kv: kv[0] < 5).collect() # 键小于5 |
3.6.3 sortBy() 排序
1 | kv.sortBy(lambda kv: kv[0]).collect() # 按键升序 |
3.6.4 mapValues() 仅对值操作
1 | kv.mapValues(lambda v: v**2).collect() # [(3,16), (3,36), (5,36), (1,4)] |
3.6.5 reduceByKey() 按键聚合
1 | from operator import add |
3.7 多键值对RDD转换
3.7.1 join()(内连接)
1 | kv1 = sc.parallelize([(1,2), (3,4), (3,6)]) |
3.7.2 leftOuterJoin() / rightOuterJoin()
1 | kv1.leftOuterJoin(kv2).collect() |
3.7.3 subtractByKey()
1 | kv1.subtractByKey(kv2).collect() # [(1,2)] |
3.8 键值对RDD动作
3.8.1 countByKey() / countByValue()
1 | kv = sc.parallelize([(1,2), (3,4), (3,6), (4,1), (5,6)]) |
3.9 RDD惰性求值
转换操作是惰性的,只有动作触发才执行。1
2rdd = sc.textFile("nonexistent.txt") # 此时不会报错
rdd.collect() # 此时报错文件不存在
4. DataFrame操作
DataFrame以列方式组织,类似于关系表,内置Catalyst优化器。
4.1 创建DataFrame
4.1.1 从文件读取
1 | # CSV(自动推断类型) |
4.1.2 从Python集合创建
1 | data = [['a', 1], ['b', 2]] |
4.1.3 从Pandas DataFrame创建
1 | import pandas as pd |
4.2 查看数据结构
1 | df.columns # 列名列表 |
4.3 基本操作
4.3.1 重命名列
1 | df = df.withColumnRenamed('letter', 'char') |
4.3.2 排序
1 | df.sort('number', ascending=False).show() |
4.3.3 删除列
1 | df.drop('number').show() |
4.3.4 过滤行
1 | df.filter(df.number > 1).show() |
4.3.5 选择列
1 | df.select('letter').show() |
4.3.6 去重
1 | df.select('number').distinct().show() |
4.4 聚合函数
使用agg或直接调用内置函数。1
2from pyspark.sql.functions import max, min, sum, avg
df.agg(max('number'), min('number')).show()
4.5 DataFrame与RDD转换
4.5.1 DataFrame转RDD
1 | rdd = df.rdd # 返回RDD[Row] |
4.5.2 在RDD上使用map
1 | rdd.map(lambda row: (row['letter'], row['number'])).collect() |
4.5.3 flatMapValues示例
1 | car_data = spark.createDataFrame([(2018, 'Mazda Ferrari')], ['year', 'brand']) |
4.5.4 countByKey / countByValue
1 | rdd.map(lambda x: (x.year, x.brand)).countByKey() |
4.6 合并与拆分列
4.6.1 合并多列
1 | from pyspark.sql import Row |
4.6.2 拆分列
1 | mtcars_rdd2 = df.rdd.map(lambda row: Row(model=row[0], x1=row[1:5], x2=row[5:])) |
4.7 保存与加载
4.7.1 保存为CSV
1 | df.coalesce(1).write.csv("output.csv", header=True, mode="overwrite") |
4.7.2 保存为Parquet(推荐)
1 | df.write.parquet("output.parquet", mode="overwrite") |
4.7.3 加载Parquet
1 | df = spark.read.parquet("output.parquet") |
5. SQL查询
DataFrame可注册为临时视图,通过SQL进行复杂分析。1
2
3df.createOrReplaceTempView("people")
result = spark.sql("SELECT department, AVG(salary) FROM people GROUP BY department")
result.show()
Spark SQL支持大部分HiveQL语法,可与Hive元数据集成。评论:对于熟悉SQL的团队,SQL查询可降低学习成本;而DataFrame API则在编译时检查错误,适合程序化操作。两者可混合使用,视场景而定。
6. 用户自定义函数(UDF)
当内置函数无法满足需求时,可定义UDF。
6.1 普通UDF(性能较差)
1 | from pyspark.sql.functions import udf |
6.2 Pandas UDF(矢量化,推荐)
1 | from pyspark.sql.functions import pandas_udf |
Pandas UDF使用Apache Arrow进行高效数据交换,性能提升显著。
7. 性能优化
7.1 缓存
1 | df.cache() # 或 df.persist(StorageLevel.MEMORY_AND_DISK) |
7.2 分区控制
1 | df.repartition(10) # 增加分区(产生shuffle) |
7.3 广播变量
将小表广播到各节点,避免shuffle:1
2from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), "key")
7.4 累加器
用于计数等聚合,但只能通过 SparkContext.accumulator 创建:1
2accum = sc.accumulator(0)
df.foreach(lambda row: accum.add(1) if row.value > 0 else None)
7.5 调整shuffle分区数
默认200,可通过配置调整:1
spark.conf.set("spark.sql.shuffle.partitions", 50)
8. 机器学习(MLlib)
MLlib提供分布式机器学习算法,API与scikit-learn类似:1
2
3
4
5
6
7
8
9
10
11
12
13from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# 准备特征向量
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
df = assembler.transform(df)
# 训练模型
lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(df)
# 预测
predictions = model.transform(df)
MLlib支持Pipeline,方便构建完整工作流。注意:数据量极大时才体现出MLlib优势;小数据集上scikit-learn更轻量。
9. 流处理(Structured Streaming)
Structured Streaming基于DataFrame的流处理模型,支持事件时间、水印等:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 从Kafka读取流
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host:9092") \
.option("subscribe", "topic") \
.load()
# 转换
result = stream_df.selectExpr("CAST(value AS STRING)")
# 输出到控制台(调试)
query = result.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
评论:Structured Streaming提供 exactly-once 语义,容错机制完善。生产环境可输出到Parquet、Kafka等,并配合检查点实现故障恢复。
10. 总结
本文全面覆盖了PySpark的核心操作:从RDD和DataFrame的基础创建、转换、动作,到SQL查询、UDF、性能优化、机器学习和流处理。建议在实际应用中优先使用DataFrame API,充分利用Catalyst优化;RDD适合需要底层控制的场景。希望本文能成为你日常开发中的速查手册,助你高效处理大数据。更多细节可参考Spark官方文档。
- 本文作者: Kylin
- 本文链接: https://kylinnnnn.github.io/2026/02/15/AI-Generated-PySpark速查指南/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!