一、为什么我们需要拆解单体应用
想象一下,你正在经营一家小餐馆。刚开始时,菜单只有几道菜,厨房一个人就能搞定所有事情——从接单到烹饪再到上菜。但随着生意越来越好,菜单越来越丰富,这个"全能型"厨房就开始力不从心了:订单堆积、上菜慢、某个环节出错整个系统就崩溃。这时候,把厨房拆分成冷菜间、热炒间、甜点档口才是正解。
在软件开发中,Django单体应用就像那个全能厨房。当你的用户量突破百万、功能模块超过20个时,这些问题会特别明显:
- 部署效率低下:改个用户头像功能就要全站重新部署
- 技术栈僵化:所有模块被迫使用相同的Python版本
- 扩展困难:明明只需要给订单模块加服务器,却不得不扩容整个应用
- 故障扩散:支付系统崩溃会导致连登录功能都用不了
去年我们重构一个电商平台时就遇到了这种情况。原本优雅的Django单体应用,在日订单量突破5万后,响应时间从200ms飙升到2秒。下面是我们用微服务解决的实战方案。
二、Django微服务拆分的三大策略
策略1:按业务功能垂直拆分
这是最自然的拆分方式,就像把餐馆分成前厅、后厨、仓库一样。我们给电商平台拆出了这些服务:
# 用户服务 (user_service/models.py)
class User(models.Model):
username = models.CharField(max_length=64, unique=True)
# 注意:密码字段需要加密存储
password = models.CharField(max_length=256)
mobile = models.CharField(max_length=11)
class Meta:
db_table = 'user' # 指定表名避免冲突
# 订单服务 (order_service/models.py)
class Order(models.Model):
ORDER_STATUS = (
(0, '未支付'),
(1, '已支付'),
(2, '已取消')
)
user_id = models.BigIntegerField() # 不再使用外键
amount = models.DecimalField(max_digits=10, decimal_places=2)
status = models.SmallIntegerField(choices=ORDER_STATUS)
关键点:
- 每个服务独占数据库(避免共享表)
- 使用ID关联替代外键约束
- 为常用查询字段单独建立索引
策略2:公共组件服务化
那些被多个模块调用的功能,比如短信发送、文件存储,应该独立成基础服务:
# 文件存储服务 (file_service/views.py)
from django.core.files.storage import default_storage
class FileUploadView(APIView):
def post(self, request):
file = request.FILES['file']
# 生成唯一文件名防止冲突
file_name = f"{uuid.uuid4()}{os.path.splitext(file.name)[1]}"
path = default_storage.save(file_name, file)
return Response({'url': f"/media/{path}"})
注意事项:
- 这类服务要保持无状态
- 接口版本要向前兼容
- 建议采用轻量级协议如gRPC
策略3:读写分离与CQRS
对于商品详情页这种读多写少的场景,我们单独拆出了查询服务:
# 商品查询服务 (product_query/views.py)
class ProductDetailView(APIView):
def get(self, request, product_id):
# 先从Redis缓存读取
cache_key = f"product:{product_id}"
data = cache.get(cache_key)
if not data:
# 缓存未命中则查询数据库
data = Product.objects.filter(id=product_id).values(
'id', 'name', 'price', 'stock'
).first()
# 设置缓存,有效期5分钟
cache.set(cache_key, data, 300)
return Response(data)
三、微服务通信的四种武器
1. REST API - 最通用的选择
# 订单服务调用支付服务 (order_service/services.py)
import requests
def create_payment(order_id, amount):
resp = requests.post(
'http://payment-service/api/payments',
json={'order_id': order_id, 'amount': amount},
timeout=3 # 重要!必须设置超时
)
resp.raise_for_status()
return resp.json()
适用场景:
- 跨语言调用
- 对性能要求不高的场景
2. gRPC - 高性能首选
先定义proto文件:
// user_service.proto
service UserService {
rpc GetUserInfo (UserRequest) returns (UserResponse);
}
message UserRequest {
int64 user_id = 1;
}
message UserResponse {
string username = 1;
string mobile = 2;
}
然后实现服务端:
# user_service/grpc_server.py
class UserService(user_service_pb2_grpc.UserServiceServicer):
def GetUserInfo(self, request, context):
user = User.objects.get(id=request.user_id)
return user_service_pb2.UserResponse(
username=user.username,
mobile=user.mobile
)
3. 消息队列 - 解耦利器
使用RabbitMQ实现订单创建后的异步处理:
# 订单服务发布消息 (order_service/signals.py)
from django.db.models.signals import post_save
from django.dispatch import receiver
import pika
@receiver(post_save, sender=Order)
def on_order_created(sender, instance, created, **kwargs):
if created:
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()
channel.basic_publish(
exchange='order_events',
routing_key='',
body=str(instance.id)
)
connection.close()
4. GraphQL - 灵活的数据聚合
当移动端需要同时获取用户信息和订单列表时:
# gateway_service/schema.py
import graphene
from graphene_django import DjangoObjectType
class UserType(DjangoObjectType):
class Meta:
model = User
class Query(graphene.ObjectType):
user_with_orders = graphene.Field(UserType, id=graphene.ID())
def resolve_user_with_orders(self, info, id):
# 调用用户服务
user = requests.get(f'http://user-service/api/users/{id}').json()
# 调用订单服务
orders = requests.get(
f'http://order-service/api/orders?user_id={id}'
).json()
user['orders'] = orders
return user
四、那些年我们踩过的坑
坑1:分布式事务
用户下单需要同时操作订单服务和库存服务,我们最终采用Saga模式:
# 订单事务协调器 (order_service/tasks.py)
@app.task
def create_order_flow(user_id, product_id):
try:
# 阶段1:创建订单
order = Order.objects.create(user_id=user_id, product_id=product_id)
# 阶段2:扣减库存
requests.post(
'http://inventory-service/api/inventory/decrease',
json={'product_id': product_id, 'quantity': 1},
timeout=5
)
# 阶段3:支付
payment = create_payment(order.id, order.amount)
except Exception as e:
# 补偿操作
Order.objects.filter(id=order.id).delete()
requests.post(
'http://inventory-service/api/inventory/increase',
json={'product_id': product_id, 'quantity': 1}
)
raise e
坑2:服务发现
我们最初用硬编码IP,结果运维改配置时全崩了。后来改用Consul:
# 服务消费者示例
from consul import Consul
def get_service_url(service_name):
consul = Consul(host='consul')
index, data = consul.catalog.service(service_name)
if not data:
raise Exception(f"Service {service_name} not found")
# 简单的负载均衡:随机选择一个实例
instance = random.choice(data)
return f"http://{instance['ServiceAddress']}:{instance['ServicePort']}"
坑3:日志追踪
为了排查一个跨5个服务的请求,我们引入了OpenTelemetry:
# settings.py 配置
OPENTELEMETRY_INSTRUMENTATION_DJANGO = {
'TRACER_PROVIDER': 'my_tracer_provider'
}
# 视图中使用
from opentelemetry import trace
def some_view(request):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("custom_operation"):
# 业务逻辑
pass
五、微服务不是银弹
经过两年实践,我们总结出这些经验:
适合场景:
- 团队规模超过10人
- 系统复杂度高,模块边界清晰
- 需要差异化的伸缩策略
不适合场景:
- 创业初期MVP阶段
- 团队没有DevOps能力
- 业务逻辑高度耦合
最后给个直观的数据:我们的电商平台经过拆分后,核心接口的P99延迟从1200ms降到了350ms,部署频率从每周1次提升到每天20+次。但运维成本确实增加了,需要权衡利弊。
评论