一、为什么需要把MongoDB和Kafka结合起来
现在很多系统都需要处理实时数据,比如电商平台的订单状态更新、物流跟踪、用户行为分析等。MongoDB擅长存储非结构化数据,Kafka则擅长处理实时数据流,把它们结合起来就能构建一个强大的实时数据处理管道。
举个例子,假设我们有个电商平台,用户下单后:
- 订单数据先存入MongoDB
- 同时通过Kafka通知库存系统扣减库存
- 物流系统通过订阅Kafka消息获取订单信息
- 客服系统也能实时看到订单状态
这样各个系统都能实时获取最新数据,而不是定时去数据库轮询。
二、MongoDB和Kafka各自的特点
先简单说说这两个技术的特点,方便理解后面的集成方案。
MongoDB:
- 文档型数据库,存储JSON格式数据
- 灵活的模式,字段可以动态增减
- 适合存储非结构化或半结构化数据
- 支持丰富的查询和聚合操作
- 水平扩展能力强
Kafka:
- 分布式消息系统,高吞吐量
- 消息持久化存储,可回溯
- 支持发布/订阅模式
- 消息有序且不丢失
- 支持流处理
三、集成方案详解
下面介绍几种常见的集成方式,我会用Java代码示例说明。
方案1:使用变更数据捕获(CDC)
MongoDB的变更流(Change Stream)功能可以捕获数据变更,我们可以用它把变更推送到Kafka。
// 技术栈: Java + MongoDB驱动 + Kafka客户端
// 1. 创建MongoDB变更流监听
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017");
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("orders");
// 监听orders集合的所有变更
ChangeStreamIterable<Document> changes = collection.watch();
// 2. 创建Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 3. 处理变更事件并发送到Kafka
changes.forEach(change -> {
// 将变更文档转为JSON
String changeJson = change.getFullDocument().toJson();
// 发送到Kafka的orders主题
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
change.getDocumentKey().toJson(),
changeJson
);
producer.send(record);
});
方案2:应用层双写
在应用代码中,同时写入MongoDB和Kafka。
// 技术栈: Java + Spring Boot
@Service
public class OrderService {
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void createOrder(Order order) {
// 1. 保存到MongoDB
mongoTemplate.save(order, "orders");
// 2. 发送到Kafka
kafkaTemplate.send(
"orders",
order.getId(),
new ObjectMapper().writeValueAsString(order)
);
}
}
方案3:使用Kafka Connect
Kafka Connect是Kafka的集成框架,有现成的MongoDB连接器。
// 技术栈: Kafka Connect配置示例
{
"name": "mongodb-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://localhost:27017",
"database": "test",
"collection": "orders",
"topic.prefix": "mongo-"
}
}
四、不同方案的优缺点比较
| 方案 | 优点 | 缺点 |
|---|---|---|
| CDC | 对应用透明,实时性好 | 需要MongoDB 3.6+,配置复杂 |
| 双写 | 实现简单,可控性强 | 需要修改业务代码,一致性难保证 |
| Kafka Connect | 无需编码,配置即用 | 性能开销大,灵活性差 |
五、实际应用中的注意事项
- 数据一致性: 双写方案要考虑事务问题,可以使用事务型消息表
- 性能考虑: Kafka批量发送可以提高吞吐量
- 错误处理: 要有重试机制和死信队列
- 监控: 监控消息延迟和积压情况
- Schema管理: MongoDB无固定schema,但Kafka消息最好有明确格式
六、完整示例演示
下面展示一个完整的电商订单处理流程:
// 技术栈: Java + Spring Boot
// 1. 订单实体
@Document(collection = "orders")
public class Order {
@Id
private String id;
private String userId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private String status;
// 其他字段...
}
// 2. 订单服务
@Service
@Transactional
public class OrderService {
@Autowired
private OrderRepository orderRepo;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public Order createOrder(OrderRequest request) {
// 创建订单对象
Order order = new Order();
order.setUserId(request.getUserId());
order.setItems(request.getItems());
order.setStatus("CREATED");
// 计算总金额
BigDecimal total = request.getItems().stream()
.map(i -> i.getPrice().multiply(BigDecimal.valueOf(i.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
order.setTotalAmount(total);
// 保存到MongoDB
order = orderRepo.save(order);
// 发送到Kafka
kafkaTemplate.send(
"orders.created",
order.getId(),
new ObjectMapper().writeValueAsString(order)
);
return order;
}
}
// 3. 库存消费者
@Service
public class InventoryConsumer {
@KafkaListener(topics = "orders.created")
public void handleOrderCreated(String orderJson) {
Order order = new ObjectMapper().readValue(orderJson, Order.class);
// 扣减库存逻辑
order.getItems().forEach(item -> {
inventoryService.decreaseStock(
item.getProductId(),
item.getQuantity()
);
});
// 更新订单状态
order.setStatus("PROCESSING");
orderRepo.save(order);
}
}
七、总结
把MongoDB和Kafka集成起来,可以构建强大的实时数据处理系统。根据业务需求选择合适的集成方案:
- 需要最小化代码改动 → 使用CDC或Kafka Connect
- 需要最大控制权 → 使用应用层双写
- 需要保证强一致性 → 考虑事务型消息
无论哪种方案,都要注意错误处理、性能优化和监控。这种组合特别适合需要实时处理非结构化数据的场景,比如用户行为分析、物联网数据处理、实时推荐系统等。
评论