一、为什么需要Kafka和Redis一起工作

想象一下这样的场景:你正在开发一个电商平台,每当用户下单时,系统需要快速更新商品库存,同时还要把订单消息发给其他系统处理(比如物流、支付)。这时候,Kafka就像个高效的邮差,负责把订单消息准确送到各个系统;而Redis则像是个超快的记事本,能立刻记住最新的库存数据。

但问题来了:如果Kafka消息还没处理完,Redis里的数据就更新了,其他系统可能会读到不一致的数据。这就是我们要解决的"数据一致性"问题——确保所有系统看到的数据都是同一版本的真相。

二、Kafka和Redis各自的特点

Kafka:可靠的消息队列

  • 优点:消息按顺序存储,支持重试和回溯,即使系统崩溃也不会丢数据
  • 缺点:处理速度不如内存数据库快
// 技术栈:Java + Spring Kafka
// 生产者示例:发送订单消息
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendOrderEvent(Order order) {
    // 关键点:消息包含操作类型和数据版本
    String message = String.format("{\"type\":\"STOCK_UPDATE\",\"itemId\":%d,\"version\":%d}", 
        order.getItemId(), System.currentTimeMillis());
    kafkaTemplate.send("orders", message);
    
    // 注释:这里用时间戳作为简易版本号,实际项目建议用分布式ID
}

Redis:闪电般的内存缓存

  • 优点:读写速度极快,支持丰富的数据结构
  • 缺点:重启后数据可能丢失(除非开启持久化)
// 技术栈:Java + Lettuce (Redis客户端)
// 消费者示例:更新Redis缓存
@Autowired
private RedisCommands<String, String> redisCommands;

public void updateStockInRedis(long itemId, int quantity) {
    // 使用事务保证原子性
    redisCommands.multi();
    redisCommands.hset("inventory", String.valueOf(itemId), String.valueOf(quantity));
    redisCommands.set("inventory_version:" + itemId, String.valueOf(System.currentTimeMillis()));
    redisCommands.exec();
    
    // 注释:通过版本号标记数据更新状态
}

三、保障一致性的四种武器

1. 双写模式(适合对一致性要求不高的场景)

操作流程

  1. 先写Redis
  2. 再发Kafka消息
  3. 消费者收到消息后再次校验Redis
// 技术栈:Java
public void processOrder(Order order) {
    // 第一步:先更新Redis
    redisCommands.hincrby("inventory", String.valueOf(order.getItemId()), -1);
    
    // 第二步:发送Kafka消息(可能失败)
    try {
        sendOrderEvent(order);
    } catch (Exception e) {
        // 失败时回滚Redis
        redisCommands.hincrby("inventory", String.valueOf(order.getItemId()), 1);
        throw e;
    }
}

2. 消息驱动模式(推荐方案)

核心思想:所有写操作都通过Kafka流转

// 消费者服务示例
@KafkaListener(topics = "orders")
public void handleOrderEvent(String message) {
    OrderEvent event = parseMessage(message);
    
    // 关键检查:比较消息版本和Redis当前版本
    long messageVersion = event.getVersion();
    long currentVersion = Long.parseLong(
        redisCommands.get("inventory_version:" + event.getItemId()));
        
    if (messageVersion > currentVersion) {
        updateStockInRedis(event.getItemId(), event.getQuantity());
    }
}

3. 定时校对(最终一致性保障)

凌晨跑个定时任务,把数据库真实数据与Redis、Kafka的消费位点做对比:

// 校对任务示例
@Scheduled(cron = "0 0 3 * * ?")
public void inventoryReconciliation() {
    Map<String, String> dbStock = jdbcTemplate.queryForMap("SELECT id,stock FROM items");
    Map<String, String> redisStock = redisCommands.hgetall("inventory");
    
    // 找出差异项并修复
    dbStock.forEach((id, dbQty) -> {
        if (!dbQty.equals(redisStock.get(id))) {
            redisCommands.hset("inventory", id, dbQty);
        }
    });
}

4. 分布式锁方案(强一致性)

在关键操作时加锁:

// Redisson分布式锁示例
public void safeStockUpdate(long itemId, int delta) {
    RLock lock = redisson.getLock("lock:item:" + itemId);
    try {
        lock.lock();
        // 1. 更新数据库
        jdbcTemplate.update("UPDATE items SET stock=stock+? WHERE id=?", delta, itemId);
        // 2. 更新Redis
        redisCommands.hincrby("inventory", String.valueOf(itemId), delta);
        // 3. 发送Kafka消息
        sendStockUpdateEvent(itemId);
    } finally {
        lock.unlock();
    }
}

四、不同场景下的选择建议

高并发秒杀场景

推荐组合:

  • 先用Redis原子操作扣减库存
  • 通过Kafka异步落库
  • 配合定时对账
// 秒杀示例代码
public boolean seckill(long userId, long itemId) {
    // Redis原子操作判断库存
    Long remain = redisCommands.hincrby("seckill_stock", String.valueOf(itemId), -1);
    if (remain < 0) {
        redisCommands.hincrby("seckill_stock", String.valueOf(itemId), 1); // 回滚
        return false;
    }
    
    // 发消息到Kafka异步创建订单
    kafkaTemplate.send("seckill_orders", createSeckillMessage(userId, itemId));
    return true;
}

财务交易类系统

必须使用:

  • 分布式锁保证强一致性
  • 所有操作记录事务日志
  • 增加人工核对接口

五、避坑指南

  1. 版本号陷阱
    不要用简单的时间戳,推荐使用Snowflake等分布式ID生成器

  2. 消费者延迟
    监控Kafka消费延迟,设置合理的max.poll.interval.ms

  3. Redis持久化
    如果开启AOF,注意appendfsync配置对性能的影响

  4. 网络分区处理
    设计降级方案,比如在Redis不可用时直接读数据库

  5. 监控三要素

    • Redis内存使用率
    • Kafka堆积消息数
    • 端到端延迟时间

六、总结

就像做菜需要掌握火候,Kafka和Redis的配合也需要平衡速度和可靠性。对于大多数互联网应用,推荐采用"消息驱动+定期校对"的组合拳,既能扛住高并发,又能保证最终一致性。记住没有银弹,关键是根据你的业务特点选择合适的配方。