一、Spark内存溢出那些事儿

最近有个朋友跟我吐槽,说他们公司的Spark作业老是莫名其妙就挂了,报错信息里总出现"OOM"这个让人头疼的词。这种情况其实特别常见,就像你往一个小杯子里拼命倒水,水溢出来是迟早的事。

Spark的内存管理其实挺有意思的,它把内存分成两大块:执行内存和存储内存。执行内存用来做计算,存储内存用来缓存数据。当这两块内存打架的时候,问题就来了。

举个例子,我们有个数据处理任务(使用Spark 3.2.1版本):

# 错误示例:内存溢出的典型写法
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MemoryDemo").getOrCreate()

# 读取超大文件不做任何优化
df = spark.read.csv("hdfs://超大文件.csv")

# 直接做复杂的聚合操作
result = df.groupBy("user_id").agg(
    {"price":"sum", "quantity":"avg"}
)

# 收集结果到Driver
results = result.collect()  # 这里大概率会OOM

这段代码至少有3个致命问题:

  1. 读取大文件没有做分区控制
  2. 聚合操作前没有考虑数据倾斜
  3. 直接collect()大结果集

二、Shuffle的性能杀手

Shuffle是Spark中最昂贵的操作,就像搬家一样,要把数据从一个地方搬到另一个地方。我见过最夸张的情况,一个简单的join操作就让作业跑了3个小时。

来看个实际案例(还是Spark 3.2.1):

# 问题代码:shuffle性能低下
joined_df = big_df.join(
    small_df,
    big_df["id"] == small_df["id"],
    "inner"
)

# 更好的写法:广播小表
from pyspark.sql.functions import broadcast

joined_df = big_df.join(
    broadcast(small_df),  # 使用广播变量
    big_df["id"] == small_df["id"],
    "inner"
)

这里的关键点在于:

  • 当小表可以放进内存时,一定要用广播
  • 默认的join会引发shuffle,而广播join不会
  • 广播变量的上限可以通过spark.sql.autoBroadcastJoinThreshold配置

三、调优实战技巧

3.1 内存优化三板斧

第一招:合理分配内存比例

# 启动时配置内存
spark = SparkSession.builder \
    .appName("TuningDemo") \
    .config("spark.memory.fraction", "0.8") \  # 总内存比例
    .config("spark.memory.storageFraction", "0.5") \  # 存储内存占比
    .getOrCreate()

第二招:对付数据倾斜

# 处理倾斜的典型方案
from pyspark.sql import functions as F

# 方法1:加随机前缀
df_with_prefix = df.withColumn(
    "salted_key",
    F.concat(F.col("user_id"), F.lit("_"), (F.rand() * 10).cast("int"))
)

# 方法2:单独处理倾斜键
skewed_keys = ["user123", "user456"]  # 事先识别出的倾斜键
normal_data = df.filter(~df["user_id"].isin(skewed_keys))
skewed_data = df.filter(df["user_id"].isin(skewed_keys))

# 分别处理后再union

第三招:序列化优化

# 使用Kryo序列化
conf = SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses([MyCustomClass1, MyCustomClass2])

3.2 shuffle优化秘籍

# 调整shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", "200")  # 根据数据量调整

# 使用map-side聚合
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

四、避坑指南

  1. 监控很重要:一定要看Spark UI,重点关注:

    • Storage页签看缓存情况
    • Executors页签看内存使用
    • SQL页签看DAG执行计划
  2. 常见配置参数:

    # 内存相关
    spark.executor.memory = 8g
    spark.executor.memoryOverhead = 2g
    spark.driver.memory = 4g
    
    # shuffle相关
    spark.shuffle.file.buffer = 1MB
    spark.reducer.maxSizeInFlight = 48MB
    
  3. 数据序列化选择:

    • 默认Java序列化:兼容性好但性能差
    • Kryo序列化:性能好但要注册类
  4. 资源分配黄金法则:

    • 每个executor核心数不超过5个
    • 留给操作系统和HDFS的内存
    • 考虑YARN或其他资源调度器的开销

五、总结

经过这些年的实践,我发现Spark调优其实就三个关键点:

  1. 了解你的数据:大小、分布、特点
  2. 理解Spark内存模型:知道内存用在哪了
  3. 合理配置参数:不是越大越好,要恰到好处

记住,没有放之四海而皆准的配置,最好的调优策略就是:

  • 小规模测试
  • 监控分析
  • 迭代优化

最后送大家一个万能检查清单:

  1. 数据是否均匀分布?
  2. 是否可以用广播替代shuffle?
  3. 内存配置是否合理?
  4. 是否使用了合适的序列化?
  5. 分区数是否合理?