一、SQLite主从复制的那些事儿
SQLite是个轻量级的数据库,很多人觉得它就是个单机玩具,但其实通过一些技巧也能实现主从复制。不过官方没提供现成的方案,得靠我们自己折腾。今天咱们就来聊聊怎么用自定义脚本监控SQLite的主从同步状态,特别是怎么抓取那些烦人的延迟问题。
先说说典型场景:你有多个设备跑着SQLite,需要把数据同步到中央服务器。主库写入后,从库可能因为网络问题延迟同步,这时候就需要监控工具来告诉你:"嘿,从库落后了3分钟!"
二、手工打造监控脚本的核心思路
监控的核心是比对主从数据库的状态。这里给出Python的实现方案(技术栈:Python 3.8 + sqlite3标准库),主要做三件事:
- 在主库记录每次变更的时间戳
- 从库定期上报自己最新的时间戳
- 计算两者差值判断延迟
# -*- coding: utf-8 -*-
import sqlite3
from datetime import datetime
def setup_primary_db(db_path):
"""初始化主库,创建版本记录表"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建记录表(如果不存在)
cursor.execute('''
CREATE TABLE IF NOT EXISTS sync_meta (
id INTEGER PRIMARY KEY AUTOINCREMENT,
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
operation TEXT CHECK(operation IN ('INSERT','UPDATE','DELETE'))
)
''')
# 创建触发器自动记录变更
cursor.execute('''
CREATE TRIGGER IF NOT EXISTS track_changes
AFTER INSERT ON user_data -- 假设主表叫user_data
BEGIN
INSERT INTO sync_meta(operation) VALUES ('INSERT');
END
''')
conn.commit()
conn.close()
def get_replication_lag(primary_db, replica_db):
"""计算主从延迟(秒)"""
# 获取主库最后更新时间
primary_conn = sqlite3.connect(primary_db)
primary_time = primary_conn.execute(
"SELECT strftime('%s', last_modified) FROM sync_meta ORDER BY id DESC LIMIT 1"
).fetchone()[0]
# 获取从库最后同步时间
replica_conn = sqlite3.connect(replica_db)
replica_time = replica_conn.execute(
"SELECT strftime('%s', last_sync) FROM replica_status ORDER BY id DESC LIMIT 1"
).fetchone()[0]
return int(primary_time) - int(replica_time)
# 示例使用
if __name__ == "__main__":
setup_primary_db("master.db")
lag = get_replication_lag("master.db", "replica.db")
print(f"当前延迟: {lag}秒")
这个方案有几个关键点:
- 用
sync_meta表记录所有数据变更 - 通过SQLite的
strftime函数处理时间格式 - 触发器自动跟踪变更,避免漏记
三、进阶:处理网络中断的补偿机制
网络不稳定时,简单的比对会失效。这里给出增强版的断线重传逻辑:
def check_replica_health(replica_db, timeout=300):
"""检查从库是否健康"""
conn = sqlite3.connect(replica_db)
cursor = conn.cursor()
# 检查最后心跳时间
cursor.execute('''
SELECT strftime('%s','now') - strftime('%s', last_heartbeat)
FROM replica_health
''')
gap = cursor.fetchone()[0]
if gap > timeout:
# 触发重同步逻辑
resync_data(replica_db)
return False
return True
def resync_data(replica_db):
"""全量数据重同步"""
print(f"[{datetime.now()}] 开始重同步 {replica_db}")
# 这里应该是实际的重同步逻辑
# 比如把主库的完整数据导出再导入从库
if __name__ == "__main__":
if not check_replica_health("replica.db"):
print("从库异常,已触发修复流程")
四、实战中的坑与解决方案
时间同步问题:
如果主从服务器时间不同步,所有监控都会失准。建议在脚本开始时强制同步时间:import ntplib def sync_system_time(): try: client = ntplib.NTPClient() response = client.request('pool.ntp.org') os.system(f"date -s @{response.tx_time}") except: print("时间同步失败,使用本地时间")WAL模式的影响:
SQLite的WAL模式会延迟写入磁盘。如果从库用WAL模式,可能看到"假同步"。解决方法:PRAGMA journal_mode=DELETE; -- 在从库禁用WAL性能优化:
高频记录时间戳会导致sync_meta表膨胀。定期清理旧记录:def cleanup_meta(db_path, keep_days=7): """清理7天前的元数据""" conn = sqlite3.connect(db_path) conn.execute( "DELETE FROM sync_meta WHERE last_modified < datetime('now', ?)", (f"-{keep_days} days",) ) conn.commit()
五、这种方案的适用边界
适合场景:
- 设备数少于50个的中小型部署
- 同步延迟要求分钟级监控
- 没有专业运维团队的小项目
不适合场景:
- 需要秒级监控的金融系统
- 超过100个节点的分布式系统
- 有专业DBA团队的企业环境
替代方案对比:
使用专业的SQLite扩展如
litestream- 优点:真正的实时流复制
- 缺点:需要额外部署服务
换用PostgreSQL等内置复制的数据库
- 优点:功能完整稳定
- 缺点:资源消耗大
六、总结
这套自定义监控方案就像给SQLite装了个"倒车雷达",虽然比不上原厂导航系统,但足够帮你避开大多数坑。关键价值在于:
- 用20%的代码解决了80%的监控需求
- 全部使用标准库,零依赖
- 可以根据业务灵活定制
最后提醒:生产环境使用前,务必在测试环境跑满72小时,观察内存和CPU的使用情况。遇到过有人没做压力测试直接上线,结果监控脚本自己成了性能瓶颈的笑话。
评论