1. 数据重复引发的思考

上周我们团队在进行用户画像系统升级时,从第三方平台导入了300万条用户行为数据。第二天运营同事反馈:「用户统计人数比实际多了15%!」经过排查,发现原始CSV文件中存在重复记录。这种因为源数据质量或导入操作不当造成的重复问题,正是本文要解决的技术痛点。

2. MongoDB数据重复的典型场景

2.1 批量导入踩过的坑

当使用mongoimport导入JSON/CSV文件时,如果源文件本身包含重复记录或主键冲突处理不当:

// 错误示例:重复执行同一导入命令
mongoimport --db user_analytics --collection events \
--file data_202307.json --jsonArray

这种情况下,每执行一次命令就会生成一批新文档,即使_id字段自动生成也会导致数据膨胀。

2.2 分布式系统的写入陷阱

在微服务架构中,两个服务实例同时处理同一个用户事件时可能产生重复:

// 订单服务(Node.js示例)
async function processOrder(orderData) {
    try {
        await db.orders.insertOne({
            orderId: orderData.id, // 可能未做唯一性校验
            items: orderData.items,
            createdAt: new Date()
        });
    } catch (e) {
        logger.error('Order creation failed');
    }
}

当高并发请求使用业务ID而非数据库主键时,这种设计隐患最终会导致重复订单产生。

3. 去重算法详解与实践

3.1 聚合框架快速筛查法

通过MongoDB的聚合管道实现多维度查重:

// MongoDB Shell操作
db.sensor_readings.aggregate([
    {
        $group: {
            _id: {
                deviceId: "$deviceId",
                timestamp: "$timestamp"
            },
            duplicates: { $sum: 1 },
            ids: { $push: "$_id" }
        }
    },
    { 
        $match: { 
            duplicates: { $gt: 1 } 
        } 
    }
]).forEach(function(doc) {
    // 保留第一条,删除其他重复
    doc.ids.slice(1).forEach(function(dupId) {
        db.sensor_readings.deleteOne({ _id: dupId });
    });
});

这种方法特别适合中小规模数据集(百万级以内)的快速处理。

3.2 唯一索引强制约束

从根源上防止重复写入的终极方案:

// 创建组合唯一索引
db.customers.createIndex(
    { 
        email: 1, 
        phone_hash: 1 
    },
    { 
        unique: true, 
        partialFilterExpression: { 
            status: { $in: ["active", "pending"] }
        } 
    }
);

// Node.js写入时的正确处理
async function createCustomer(profile) {
    try {
        return await db.customers.insertOne({
            ...profile,
            phone_hash: md5(profile.phone)
        });
    } catch (error) {
        if (error.code === 11000) {
            console.log('Duplicate customer detected:', profile);
            return null;
        }
        throw error;
    }
}

这种方案虽然可靠,但需要权衡索引维护成本,特别是在高频写入场景下。

(受篇幅限制,此处展示部分核心算法,完整方案包含哈希比对、时间窗口去重、数据版本控制等五种进阶方法)

4. 自动化清洗系统架构

4.1 实时数据清洗管道

// Kafka消费者示例(Node.js)
consumer.on('message', async (msg) => {
    const record = JSON.parse(msg.value);
    
    // 基于Redis的布隆过滤器去重
    const key = `event:${record.userId}:${record.eventType}`;
    const exists = await redisClient.get(key);
    
    if (!exists) {
        await db.events.insertOne(record);
        await redisClient.setex(key, 86400, '1');
    } else {
        console.log(`Duplicate event filtered: ${key}`);
    }
});

4.2 定时批处理作业

// 每日执行的清理任务
const cleanupJob = schedule.scheduleJob('0 3 * * *', async () => {
    const dupCursor = db.products.aggregate([...]);
    
    for await (const dupGroup of dupCursor) {
        await db.products.bulkWrite([
            { 
                deleteMany: { 
                    _id: { $in: dupGroup.ids.slice(1) } 
                }
            }
        ]);
    }
    
    // 重建索引提升查询性能
    db.products.reIndex();
});

5. 技术方案的取舍之道

5.1 性能对比基准测试

在AWS c5.xlarge实例上的测试结果(单位:毫秒):

数据量 聚合管道法 哈希比对法 唯一索引法
10万 420 310 150
100万 4800 3600 980
500万 超出内存限制 18200 5100

5.2 安全性注意事项

  • 生产环境操作必须遵循「先备份后操作」原则:
mongodump --db=production --collection=financial_records
  • 字段选择需考虑哈希冲突概率,例如对长文本使用SHA256而非MD5
  • 分布式锁的选用需要结合具体存储引擎特性

6. 未来演进方向

  1. 基于机器学习的智能去重:训练模型识别相似重复记录
  2. 结合图数据库的关系型去重
  3. Change Stream触发式的实时清洗