一、什么是变更数据捕获(CDC)

变更数据捕获(Change Data Capture,简称CDC)是一种用于捕获数据库中的数据变更的技术。简单来说,就是当数据库中的某张表发生增删改操作时,CDC能够实时捕获这些变化,并将它们传递到其他系统或服务中。这在数据同步、实时分析、ETL等场景中非常有用。

PostgreSQL提供了多种CDC实现方式,比如逻辑解码(Logical Decoding)、触发器(Triggers)等。其中,pg_logical 是PostgreSQL内置的逻辑解码插件,而 Debezium 是一个开源的分布式CDC平台,可以方便地集成Kafka等消息队列。

二、使用 pg_logical 实现CDC

pg_logical 是PostgreSQL 9.4+ 版本引入的逻辑解码插件,它允许我们以逻辑方式解析WAL(Write-Ahead Log)日志,从而捕获数据变更。

示例1:启用 pg_logical 并创建逻辑槽

-- 1. 修改 postgresql.conf 文件,启用逻辑解码
wal_level = logical
max_replication_slots = 5  -- 设置足够的复制槽数量

-- 2. 重启PostgreSQL服务
sudo systemctl restart postgresql

-- 3. 创建逻辑复制槽
SELECT * FROM pg_create_logical_replication_slot(
    'my_slot',  -- 复制槽名称
    'pgoutput'   -- 输出插件,PostgreSQL 10+ 默认支持
);

-- 4. 查看当前复制槽状态
SELECT * FROM pg_replication_slots;

注释说明:

  • wal_level = logical 是启用逻辑解码的关键配置。
  • pg_create_logical_replication_slot 用于创建逻辑复制槽,后续可以通过该槽读取变更数据。

示例2:使用 pg_logical 捕获变更

-- 1. 创建测试表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    price NUMERIC
);

-- 2. 向表中插入数据
INSERT INTO products (name, price) VALUES ('Laptop', 999.99);

-- 3. 使用 pg_logical 读取变更
-- 可以通过外部工具(如Python脚本)监听逻辑槽的变化
-- 以下是一个简化的逻辑解码输出示例(实际需要编程解析)
-- 输出可能类似于:
-- {"change": [{"kind": "insert", "schema": "public", "table": "products", "columnnames": ["id", "name", "price"], "columntypes": ["integer", "character varying(100)", "numeric"], "columnvalues": [1, "Laptop", 999.99]}]}

注释说明:

  • pg_logical 的输出是WAL日志的逻辑表示,通常需要额外工具解析。
  • 实际应用中,可以使用 pg_recvlogical 命令行工具或编程方式(如Python + psycopg2)读取变更。

三、使用 Debezium 实现CDC

Debezium 是一个基于Kafka的CDC工具,支持多种数据库,包括PostgreSQL。它通过连接PostgreSQL的逻辑解码功能,将变更事件发布到Kafka,供下游系统消费。

示例3:配置 Debezium PostgreSQL Connector

{
  "name": "products-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "password",
    "database.dbname": "mydb",
    "database.server.name": "mydb-server",
    "table.include.list": "public.products",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot"
  }
}

注释说明:

  • plugin.name 设置为 pgoutput,这是PostgreSQL 10+ 的默认逻辑解码插件。
  • slot.name 是Debezium使用的逻辑复制槽名称。
  • Debezium会将变更事件发送到Kafka,主题名默认为 serverName.schemaName.tableName

示例4:消费Debezium发布的Kafka消息

// 使用Kafka Consumer读取Debezium事件(Java示例)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "products-consumer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("mydb-server.public.products"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received change: " + record.value());
        // 示例输出:
        // {
        //   "before": null,
        //   "after": {"id": 1, "name": "Laptop", "price": 999.99},
        //   "op": "c"  // 'c'表示插入(create)
        // }
    }
}

注释说明:

  • Debezium的Kafka消息包含 before(变更前数据)、after(变更后数据)和 op(操作类型)。
  • op 可以是 c(插入)、u(更新)、d(删除)等。

四、应用场景与技术对比

应用场景

  1. 实时数据同步:将PostgreSQL的数据变更同步到数据仓库(如Redshift)或缓存(如Redis)。
  2. 事件驱动架构:基于数据库变更触发业务流程,如订单状态更新后发送通知。
  3. 数据审计:记录所有数据变更,用于合规性检查。

技术优缺点

方案 优点 缺点
pg_logical 原生支持,无需额外组件 需要自行解析WAL日志,开发成本较高
Debezium 集成Kafka,支持分布式消费,生态完善 依赖Kafka,架构复杂度较高

注意事项

  1. 性能影响:逻辑解码会增加数据库负载,需监控WAL日志增长情况。
  2. 数据一致性:确保逻辑槽的消费进度(LSN)正确记录,避免数据丢失。
  3. 网络稳定性:Debezium与Kafka之间的网络延迟可能影响实时性。

五、总结

PostgreSQL的CDC功能为实时数据处理提供了强大支持。pg_logical 适合轻量级场景,而 Debezium 更适合需要分布式集成的复杂架构。无论选择哪种方案,都应注意性能监控和数据一致性保障。