一、SQLite主从复制的那些事儿

SQLite是个轻量级的数据库,很多人觉得它就是个单机玩具,但其实通过一些技巧也能实现主从复制。不过官方没提供现成的方案,得靠我们自己折腾。今天咱们就来聊聊怎么用自定义脚本监控SQLite的主从同步状态,特别是怎么抓取那些烦人的延迟问题。

先说说典型场景:你有多个设备跑着SQLite,需要把数据同步到中央服务器。主库写入后,从库可能因为网络问题延迟同步,这时候就需要监控工具来告诉你:"嘿,从库落后了3分钟!"

二、手工打造监控脚本的核心思路

监控的核心是比对主从数据库的状态。这里给出Python的实现方案(技术栈:Python 3.8 + sqlite3标准库),主要做三件事:

  1. 在主库记录每次变更的时间戳
  2. 从库定期上报自己最新的时间戳
  3. 计算两者差值判断延迟
# -*- coding: utf-8 -*-
import sqlite3
from datetime import datetime

def setup_primary_db(db_path):
    """初始化主库,创建版本记录表"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    # 创建记录表(如果不存在)
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS sync_meta (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            operation TEXT CHECK(operation IN ('INSERT','UPDATE','DELETE'))
        )
    ''')
    # 创建触发器自动记录变更
    cursor.execute('''
        CREATE TRIGGER IF NOT EXISTS track_changes 
        AFTER INSERT ON user_data  -- 假设主表叫user_data
        BEGIN
            INSERT INTO sync_meta(operation) VALUES ('INSERT');
        END
    ''')
    conn.commit()
    conn.close()

def get_replication_lag(primary_db, replica_db):
    """计算主从延迟(秒)"""
    # 获取主库最后更新时间
    primary_conn = sqlite3.connect(primary_db)
    primary_time = primary_conn.execute(
        "SELECT strftime('%s', last_modified) FROM sync_meta ORDER BY id DESC LIMIT 1"
    ).fetchone()[0]
    
    # 获取从库最后同步时间
    replica_conn = sqlite3.connect(replica_db)
    replica_time = replica_conn.execute(
        "SELECT strftime('%s', last_sync) FROM replica_status ORDER BY id DESC LIMIT 1"
    ).fetchone()[0]
    
    return int(primary_time) - int(replica_time)

# 示例使用
if __name__ == "__main__":
    setup_primary_db("master.db")
    lag = get_replication_lag("master.db", "replica.db")
    print(f"当前延迟: {lag}秒")

这个方案有几个关键点:

  • sync_meta表记录所有数据变更
  • 通过SQLite的strftime函数处理时间格式
  • 触发器自动跟踪变更,避免漏记

三、进阶:处理网络中断的补偿机制

网络不稳定时,简单的比对会失效。这里给出增强版的断线重传逻辑:

def check_replica_health(replica_db, timeout=300):
    """检查从库是否健康"""
    conn = sqlite3.connect(replica_db)
    cursor = conn.cursor()
    
    # 检查最后心跳时间
    cursor.execute('''
        SELECT strftime('%s','now') - strftime('%s', last_heartbeat) 
        FROM replica_health
    ''')
    gap = cursor.fetchone()[0]
    
    if gap > timeout:
        # 触发重同步逻辑
        resync_data(replica_db)
        return False
    return True

def resync_data(replica_db):
    """全量数据重同步"""
    print(f"[{datetime.now()}] 开始重同步 {replica_db}")
    # 这里应该是实际的重同步逻辑
    # 比如把主库的完整数据导出再导入从库
    
if __name__ == "__main__":
    if not check_replica_health("replica.db"):
        print("从库异常,已触发修复流程")

四、实战中的坑与解决方案

  1. 时间同步问题
    如果主从服务器时间不同步,所有监控都会失准。建议在脚本开始时强制同步时间:

    import ntplib
    def sync_system_time():
        try:
            client = ntplib.NTPClient()
            response = client.request('pool.ntp.org')
            os.system(f"date -s @{response.tx_time}")
        except:
            print("时间同步失败,使用本地时间")
    
  2. WAL模式的影响
    SQLite的WAL模式会延迟写入磁盘。如果从库用WAL模式,可能看到"假同步"。解决方法:

    PRAGMA journal_mode=DELETE;  -- 在从库禁用WAL
    
  3. 性能优化
    高频记录时间戳会导致sync_meta表膨胀。定期清理旧记录:

    def cleanup_meta(db_path, keep_days=7):
        """清理7天前的元数据"""
        conn = sqlite3.connect(db_path)
        conn.execute(
            "DELETE FROM sync_meta WHERE last_modified < datetime('now', ?)",
            (f"-{keep_days} days",)
        )
        conn.commit()
    

五、这种方案的适用边界

适合场景

  • 设备数少于50个的中小型部署
  • 同步延迟要求分钟级监控
  • 没有专业运维团队的小项目

不适合场景

  • 需要秒级监控的金融系统
  • 超过100个节点的分布式系统
  • 有专业DBA团队的企业环境

替代方案对比

  1. 使用专业的SQLite扩展如litestream

    • 优点:真正的实时流复制
    • 缺点:需要额外部署服务
  2. 换用PostgreSQL等内置复制的数据库

    • 优点:功能完整稳定
    • 缺点:资源消耗大

六、总结

这套自定义监控方案就像给SQLite装了个"倒车雷达",虽然比不上原厂导航系统,但足够帮你避开大多数坑。关键价值在于:

  1. 用20%的代码解决了80%的监控需求
  2. 全部使用标准库,零依赖
  3. 可以根据业务灵活定制

最后提醒:生产环境使用前,务必在测试环境跑满72小时,观察内存和CPU的使用情况。遇到过有人没做压力测试直接上线,结果监控脚本自己成了性能瓶颈的笑话。