一、为什么需要Kafka和Redis一起工作
想象一下这样的场景:你正在开发一个电商平台,每当用户下单时,系统需要快速更新商品库存,同时还要把订单消息发给其他系统处理(比如物流、支付)。这时候,Kafka就像个高效的邮差,负责把订单消息准确送到各个系统;而Redis则像是个超快的记事本,能立刻记住最新的库存数据。
但问题来了:如果Kafka消息还没处理完,Redis里的数据就更新了,其他系统可能会读到不一致的数据。这就是我们要解决的"数据一致性"问题——确保所有系统看到的数据都是同一版本的真相。
二、Kafka和Redis各自的特点
Kafka:可靠的消息队列
- 优点:消息按顺序存储,支持重试和回溯,即使系统崩溃也不会丢数据
- 缺点:处理速度不如内存数据库快
// 技术栈:Java + Spring Kafka
// 生产者示例:发送订单消息
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrderEvent(Order order) {
// 关键点:消息包含操作类型和数据版本
String message = String.format("{\"type\":\"STOCK_UPDATE\",\"itemId\":%d,\"version\":%d}",
order.getItemId(), System.currentTimeMillis());
kafkaTemplate.send("orders", message);
// 注释:这里用时间戳作为简易版本号,实际项目建议用分布式ID
}
Redis:闪电般的内存缓存
- 优点:读写速度极快,支持丰富的数据结构
- 缺点:重启后数据可能丢失(除非开启持久化)
// 技术栈:Java + Lettuce (Redis客户端)
// 消费者示例:更新Redis缓存
@Autowired
private RedisCommands<String, String> redisCommands;
public void updateStockInRedis(long itemId, int quantity) {
// 使用事务保证原子性
redisCommands.multi();
redisCommands.hset("inventory", String.valueOf(itemId), String.valueOf(quantity));
redisCommands.set("inventory_version:" + itemId, String.valueOf(System.currentTimeMillis()));
redisCommands.exec();
// 注释:通过版本号标记数据更新状态
}
三、保障一致性的四种武器
1. 双写模式(适合对一致性要求不高的场景)
操作流程:
- 先写Redis
- 再发Kafka消息
- 消费者收到消息后再次校验Redis
// 技术栈:Java
public void processOrder(Order order) {
// 第一步:先更新Redis
redisCommands.hincrby("inventory", String.valueOf(order.getItemId()), -1);
// 第二步:发送Kafka消息(可能失败)
try {
sendOrderEvent(order);
} catch (Exception e) {
// 失败时回滚Redis
redisCommands.hincrby("inventory", String.valueOf(order.getItemId()), 1);
throw e;
}
}
2. 消息驱动模式(推荐方案)
核心思想:所有写操作都通过Kafka流转
// 消费者服务示例
@KafkaListener(topics = "orders")
public void handleOrderEvent(String message) {
OrderEvent event = parseMessage(message);
// 关键检查:比较消息版本和Redis当前版本
long messageVersion = event.getVersion();
long currentVersion = Long.parseLong(
redisCommands.get("inventory_version:" + event.getItemId()));
if (messageVersion > currentVersion) {
updateStockInRedis(event.getItemId(), event.getQuantity());
}
}
3. 定时校对(最终一致性保障)
凌晨跑个定时任务,把数据库真实数据与Redis、Kafka的消费位点做对比:
// 校对任务示例
@Scheduled(cron = "0 0 3 * * ?")
public void inventoryReconciliation() {
Map<String, String> dbStock = jdbcTemplate.queryForMap("SELECT id,stock FROM items");
Map<String, String> redisStock = redisCommands.hgetall("inventory");
// 找出差异项并修复
dbStock.forEach((id, dbQty) -> {
if (!dbQty.equals(redisStock.get(id))) {
redisCommands.hset("inventory", id, dbQty);
}
});
}
4. 分布式锁方案(强一致性)
在关键操作时加锁:
// Redisson分布式锁示例
public void safeStockUpdate(long itemId, int delta) {
RLock lock = redisson.getLock("lock:item:" + itemId);
try {
lock.lock();
// 1. 更新数据库
jdbcTemplate.update("UPDATE items SET stock=stock+? WHERE id=?", delta, itemId);
// 2. 更新Redis
redisCommands.hincrby("inventory", String.valueOf(itemId), delta);
// 3. 发送Kafka消息
sendStockUpdateEvent(itemId);
} finally {
lock.unlock();
}
}
四、不同场景下的选择建议
高并发秒杀场景
推荐组合:
- 先用Redis原子操作扣减库存
- 通过Kafka异步落库
- 配合定时对账
// 秒杀示例代码
public boolean seckill(long userId, long itemId) {
// Redis原子操作判断库存
Long remain = redisCommands.hincrby("seckill_stock", String.valueOf(itemId), -1);
if (remain < 0) {
redisCommands.hincrby("seckill_stock", String.valueOf(itemId), 1); // 回滚
return false;
}
// 发消息到Kafka异步创建订单
kafkaTemplate.send("seckill_orders", createSeckillMessage(userId, itemId));
return true;
}
财务交易类系统
必须使用:
- 分布式锁保证强一致性
- 所有操作记录事务日志
- 增加人工核对接口
五、避坑指南
版本号陷阱:
不要用简单的时间戳,推荐使用Snowflake等分布式ID生成器消费者延迟:
监控Kafka消费延迟,设置合理的max.poll.interval.msRedis持久化:
如果开启AOF,注意appendfsync配置对性能的影响网络分区处理:
设计降级方案,比如在Redis不可用时直接读数据库监控三要素:
- Redis内存使用率
- Kafka堆积消息数
- 端到端延迟时间
六、总结
就像做菜需要掌握火候,Kafka和Redis的配合也需要平衡速度和可靠性。对于大多数互联网应用,推荐采用"消息驱动+定期校对"的组合拳,既能扛住高并发,又能保证最终一致性。记住没有银弹,关键是根据你的业务特点选择合适的配方。
评论