一、为什么需要把MongoDB和Kafka结合起来

现在很多系统都需要处理实时数据,比如电商平台的订单状态更新、物流跟踪、用户行为分析等。MongoDB擅长存储非结构化数据,Kafka则擅长处理实时数据流,把它们结合起来就能构建一个强大的实时数据处理管道。

举个例子,假设我们有个电商平台,用户下单后:

  1. 订单数据先存入MongoDB
  2. 同时通过Kafka通知库存系统扣减库存
  3. 物流系统通过订阅Kafka消息获取订单信息
  4. 客服系统也能实时看到订单状态

这样各个系统都能实时获取最新数据,而不是定时去数据库轮询。

二、MongoDB和Kafka各自的特点

先简单说说这两个技术的特点,方便理解后面的集成方案。

MongoDB:

  • 文档型数据库,存储JSON格式数据
  • 灵活的模式,字段可以动态增减
  • 适合存储非结构化或半结构化数据
  • 支持丰富的查询和聚合操作
  • 水平扩展能力强

Kafka:

  • 分布式消息系统,高吞吐量
  • 消息持久化存储,可回溯
  • 支持发布/订阅模式
  • 消息有序且不丢失
  • 支持流处理

三、集成方案详解

下面介绍几种常见的集成方式,我会用Java代码示例说明。

方案1:使用变更数据捕获(CDC)

MongoDB的变更流(Change Stream)功能可以捕获数据变更,我们可以用它把变更推送到Kafka。

// 技术栈: Java + MongoDB驱动 + Kafka客户端

// 1. 创建MongoDB变更流监听
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("orders");

// 监听orders集合的所有变更
ChangeStreamIterable<Document> changes = collection.watch();

// 2. 创建Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 3. 处理变更事件并发送到Kafka
changes.forEach(change -> {
    // 将变更文档转为JSON
    String changeJson = change.getFullDocument().toJson();
    
    // 发送到Kafka的orders主题
    ProducerRecord<String, String> record = new ProducerRecord<>(
        "orders", 
        change.getDocumentKey().toJson(), 
        changeJson
    );
    
    producer.send(record);
});

方案2:应用层双写

在应用代码中,同时写入MongoDB和Kafka。

// 技术栈: Java + Spring Boot

@Service
public class OrderService {
    
    @Autowired
    private MongoTemplate mongoTemplate;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void createOrder(Order order) {
        // 1. 保存到MongoDB
        mongoTemplate.save(order, "orders");
        
        // 2. 发送到Kafka
        kafkaTemplate.send(
            "orders", 
            order.getId(), 
            new ObjectMapper().writeValueAsString(order)
        );
    }
}

方案3:使用Kafka Connect

Kafka Connect是Kafka的集成框架,有现成的MongoDB连接器。

// 技术栈: Kafka Connect配置示例

{
  "name": "mongodb-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://localhost:27017",
    "database": "test",
    "collection": "orders",
    "topic.prefix": "mongo-"
  }
}

四、不同方案的优缺点比较

方案 优点 缺点
CDC 对应用透明,实时性好 需要MongoDB 3.6+,配置复杂
双写 实现简单,可控性强 需要修改业务代码,一致性难保证
Kafka Connect 无需编码,配置即用 性能开销大,灵活性差

五、实际应用中的注意事项

  1. 数据一致性: 双写方案要考虑事务问题,可以使用事务型消息表
  2. 性能考虑: Kafka批量发送可以提高吞吐量
  3. 错误处理: 要有重试机制和死信队列
  4. 监控: 监控消息延迟和积压情况
  5. Schema管理: MongoDB无固定schema,但Kafka消息最好有明确格式

六、完整示例演示

下面展示一个完整的电商订单处理流程:

// 技术栈: Java + Spring Boot

// 1. 订单实体
@Document(collection = "orders")
public class Order {
    @Id
    private String id;
    private String userId;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private String status;
    // 其他字段...
}

// 2. 订单服务
@Service
@Transactional
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepo;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public Order createOrder(OrderRequest request) {
        // 创建订单对象
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setItems(request.getItems());
        order.setStatus("CREATED");
        
        // 计算总金额
        BigDecimal total = request.getItems().stream()
            .map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
            .reduce(BigDecimal.ZERO, BigDecimal::add);
        order.setTotalAmount(total);
        
        // 保存到MongoDB
        order = orderRepo.save(order);
        
        // 发送到Kafka
        kafkaTemplate.send(
            "orders.created", 
            order.getId(),
            new ObjectMapper().writeValueAsString(order)
        );
        
        return order;
    }
}

// 3. 库存消费者
@Service
public class InventoryConsumer {
    
    @KafkaListener(topics = "orders.created")
    public void handleOrderCreated(String orderJson) {
        Order order = new ObjectMapper().readValue(orderJson, Order.class);
        
        // 扣减库存逻辑
        order.getItems().forEach(item -> {
            inventoryService.decreaseStock(
                item.getProductId(), 
                item.getQuantity()
            );
        });
        
        // 更新订单状态
        order.setStatus("PROCESSING");
        orderRepo.save(order);
    }
}

七、总结

把MongoDB和Kafka集成起来,可以构建强大的实时数据处理系统。根据业务需求选择合适的集成方案:

  1. 需要最小化代码改动 → 使用CDC或Kafka Connect
  2. 需要最大控制权 → 使用应用层双写
  3. 需要保证强一致性 → 考虑事务型消息

无论哪种方案,都要注意错误处理、性能优化和监控。这种组合特别适合需要实时处理非结构化数据的场景,比如用户行为分析、物联网数据处理、实时推荐系统等。