一、Spark内存溢出那些事儿
最近有个朋友跟我吐槽,说他们公司的Spark作业老是莫名其妙就挂了,报错信息里总出现"OOM"这个让人头疼的词。这种情况其实特别常见,就像你往一个小杯子里拼命倒水,水溢出来是迟早的事。
Spark的内存管理其实挺有意思的,它把内存分成两大块:执行内存和存储内存。执行内存用来做计算,存储内存用来缓存数据。当这两块内存打架的时候,问题就来了。
举个例子,我们有个数据处理任务(使用Spark 3.2.1版本):
# 错误示例:内存溢出的典型写法
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MemoryDemo").getOrCreate()
# 读取超大文件不做任何优化
df = spark.read.csv("hdfs://超大文件.csv")
# 直接做复杂的聚合操作
result = df.groupBy("user_id").agg(
{"price":"sum", "quantity":"avg"}
)
# 收集结果到Driver
results = result.collect() # 这里大概率会OOM
这段代码至少有3个致命问题:
- 读取大文件没有做分区控制
- 聚合操作前没有考虑数据倾斜
- 直接collect()大结果集
二、Shuffle的性能杀手
Shuffle是Spark中最昂贵的操作,就像搬家一样,要把数据从一个地方搬到另一个地方。我见过最夸张的情况,一个简单的join操作就让作业跑了3个小时。
来看个实际案例(还是Spark 3.2.1):
# 问题代码:shuffle性能低下
joined_df = big_df.join(
small_df,
big_df["id"] == small_df["id"],
"inner"
)
# 更好的写法:广播小表
from pyspark.sql.functions import broadcast
joined_df = big_df.join(
broadcast(small_df), # 使用广播变量
big_df["id"] == small_df["id"],
"inner"
)
这里的关键点在于:
- 当小表可以放进内存时,一定要用广播
- 默认的join会引发shuffle,而广播join不会
- 广播变量的上限可以通过spark.sql.autoBroadcastJoinThreshold配置
三、调优实战技巧
3.1 内存优化三板斧
第一招:合理分配内存比例
# 启动时配置内存
spark = SparkSession.builder \
.appName("TuningDemo") \
.config("spark.memory.fraction", "0.8") \ # 总内存比例
.config("spark.memory.storageFraction", "0.5") \ # 存储内存占比
.getOrCreate()
第二招:对付数据倾斜
# 处理倾斜的典型方案
from pyspark.sql import functions as F
# 方法1:加随机前缀
df_with_prefix = df.withColumn(
"salted_key",
F.concat(F.col("user_id"), F.lit("_"), (F.rand() * 10).cast("int"))
)
# 方法2:单独处理倾斜键
skewed_keys = ["user123", "user456"] # 事先识别出的倾斜键
normal_data = df.filter(~df["user_id"].isin(skewed_keys))
skewed_data = df.filter(df["user_id"].isin(skewed_keys))
# 分别处理后再union
第三招:序列化优化
# 使用Kryo序列化
conf = SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses([MyCustomClass1, MyCustomClass2])
3.2 shuffle优化秘籍
# 调整shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", "200") # 根据数据量调整
# 使用map-side聚合
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
四、避坑指南
监控很重要:一定要看Spark UI,重点关注:
- Storage页签看缓存情况
- Executors页签看内存使用
- SQL页签看DAG执行计划
常见配置参数:
# 内存相关 spark.executor.memory = 8g spark.executor.memoryOverhead = 2g spark.driver.memory = 4g # shuffle相关 spark.shuffle.file.buffer = 1MB spark.reducer.maxSizeInFlight = 48MB数据序列化选择:
- 默认Java序列化:兼容性好但性能差
- Kryo序列化:性能好但要注册类
资源分配黄金法则:
- 每个executor核心数不超过5个
- 留给操作系统和HDFS的内存
- 考虑YARN或其他资源调度器的开销
五、总结
经过这些年的实践,我发现Spark调优其实就三个关键点:
- 了解你的数据:大小、分布、特点
- 理解Spark内存模型:知道内存用在哪了
- 合理配置参数:不是越大越好,要恰到好处
记住,没有放之四海而皆准的配置,最好的调优策略就是:
- 小规模测试
- 监控分析
- 迭代优化
最后送大家一个万能检查清单:
- 数据是否均匀分布?
- 是否可以用广播替代shuffle?
- 内存配置是否合理?
- 是否使用了合适的序列化?
- 分区数是否合理?
评论