我们面临一个棘手的基础设施问题:多个算法团队需要独立部署和迭代他们的TensorFlow模型,但将整个集群的管理权限下放会引发安全和资源争抢的混乱。为每个团队维护一套独立的基础设施成本过高。我们需要的是一个轻量级、可靠的MLOps平台,它能在共享的Docker Swarm集群上为每个租户(团队)提供计算、网络和凭证的硬性隔离,同时保持操作的简便性。直接使用Kubernetes固然功能强大,但其复杂性和运维成本对于我们当前阶段的团队规模而言有些过度。
我们的目标是构建一个Python控制平面服务。它作为唯一入口,接收来自租户的部署请求,并动态地在Swarm集群中创建完全隔离的、生命周期由租户控制的TensorFlow Serving环境。租户不直接接触Docker命令,只与这个控制平面API交互。
架构设计与决策
经过权衡,最终的架构由三个核心部分组成:一个共享的Docker Swarm集群、一个中心化的Python FastAPI控制平面,以及为每个租户动态创建的隔离资源栈。
graph TD
subgraph "租户 (Teams)"
TenantA[团队 A]
TenantB[团队 B]
end
subgraph "控制平面 (Python FastAPI Service)"
API[API Endpoint: /deploy]
end
subgraph "Docker Swarm 集群"
subgraph "Manager Node"
SwarmManager[Swarm Manager]
API -- "使用 Docker SDK" --> SwarmManager
end
subgraph "Worker Nodes"
Worker1[Worker Node 1]
Worker2[Worker Node 2]
Worker3[Worker Node 3]
end
end
subgraph "动态资源隔离"
subgraph "租户 A 的资源栈"
NetA[Overlay Network: tenant-a-net]
SecretA[Secret: tenant-a-creds]
ServiceA[TF Serving Service: tenant-a-model-v1]
end
subgraph "租户 B 的资源栈"
NetB[Overlay Network: tenant-b-net]
SecretB[Secret: tenant-b-creds]
ServiceB[TF Serving Service: tenant-b-model-v2]
end
end
TenantA -- "HTTP POST 请求" --> API
TenantB -- "HTTP POST 请求" --> API
SwarmManager -- "调度" --> ServiceA & ServiceB
ServiceA -- "运行于" --> Worker1
ServiceB -- "运行于" --> Worker2
ServiceA -- "连接" --> NetA
ServiceB -- "连接" --> NetB
技术选型原因:
- Docker Swarm: 它的原生Overlay网络提供了开箱即用的L2层网络隔离。
docker secret机制是管理敏感数据(如私有模型仓库的凭证)的理想选择。最重要的是,其API和CLI足够简单,便于我们的Python控制平面进行编排,避免了引入庞大的K8s Client库和复杂的RBAC配置。 - Python (FastAPI & Docker SDK): FastAPI提供了现代、高性能的Web框架,其基于Pydantic的数据验证能力能确保API入口的健壮性。
docker-pySDK则让我们能以编程方式精确控制Swarm中的每一个对象,从网络、密钥到服务定义,这是实现动态隔离的核心。 - 隔离策略:
- 网络隔离: 为每个租户的每次部署创建一个独立的
overlay网络。这意味着租户A的模型服务容器无法访问租户B的,除非显式地将它们连接到同一个网络。这是最关键的安全边界。 - 凭证隔离: 租户通过API提交其私有模型库的访问凭证,控制平面将其创建为Swarm Secret,并只挂载到该租户的服务中。凭证绝不会以环境变量等明文形式暴露。
- 计算资源隔离: 利用Docker服务的
reservations和limits参数,为每个租户的服务设定CPU和内存的预留与上限,防止某个租户的资源滥用影响整个集群的稳定性。
- 网络隔离: 为每个租户的每次部署创建一个独立的
控制平面的实现
控制平面的代码是整个系统的枢纽。它必须健壮、安全,并且能够将抽象的租户请求转化为精确的Docker Swarm指令。
1. API定义与数据模型
我们使用Pydantic来定义部署请求的结构,这不仅能自动生成API文档,还能在数据进入业务逻辑前进行严格校验。
# control_plane/main.py
import os
import uuid
import logging
import docker
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 初始化 FastAPI 应用
app = FastAPI(
title="MLOps Control Plane for Docker Swarm",
description="An API to manage multi-tenant TensorFlow Serving deployments."
)
# 初始化 Docker 客户端
# 在真实项目中,应该处理更复杂的Docker守护进程连接配置和异常
try:
client = docker.from_env()
client.ping()
logging.info("Successfully connected to Docker daemon.")
except Exception as e:
logging.error(f"Failed to connect to Docker daemon: {e}")
# 在启动时就失败,这是一个致命错误
exit(1)
class ResourceQuota(BaseModel):
"""定义计算资源配额"""
cpu_limit: float = Field(0.5, gt=0, description="CPU核心数上限")
cpu_reservation: float = Field(0.25, gt=0, description="CPU核心数预留")
mem_limit_mb: int = Field(1024, gt=0, description="内存上限 (MB)")
mem_reservation_mb: int = Field(512, gt=0, description="内存预留 (MB)")
class ModelCredentials(BaseModel):
"""定义模型仓库的凭证"""
registry_url: str = Field(..., description="私有仓库地址")
username: str = Field(..., description="用户名")
password: str = Field(..., description="密码或访问令牌")
class DeploymentRequest(BaseModel):
"""定义租户的部署请求"""
tenant_id: str = Field(..., max_length=50, pattern=r'^[a-zA-Z0-9_-]+$', description="租户ID,用于资源隔离")
model_name: str = Field(..., max_length=100, description="模型名称")
model_base_path: str = Field(..., description="TensorFlow Serving的模型基础路径 (e.g., /models/my_model)")
image: str = Field("tensorflow/serving:latest", description="TensorFlow Serving的Docker镜像")
replicas: int = Field(1, ge=1, le=5, description="服务副本数")
resources: ResourceQuota = Field(default_factory=ResourceQuota, description="资源配额")
# 在真实项目中,凭证不应该直接在请求体中传递,这里为了简化示例
credentials: ModelCredentials | None = Field(None, description="访问私有模型仓库的凭证")
# 在真实项目中,租户信息和配额应该从数据库或配置中心加载
TENANT_CONFIG = {
"team-alpha": {"max_cpu": 4.0, "max_mem_mb": 8192},
"team-beta": {"max_cpu": 2.0, "max_mem_mb": 4096},
}
def check_tenant_quota(tenant_id: str, request: DeploymentRequest):
"""一个简化的租户配额检查函数"""
if tenant_id not in TENANT_CONFIG:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid tenant ID.")
# 这里的坑在于:这只是检查单次部署,没有检查租户已部署服务的总资源
# 一个完整的实现需要查询该租户所有正在运行的服务,并累加它们的资源预留
tenant_quota = TENANT_CONFIG[tenant_id]
requested_cpu = request.resources.cpu_reservation * request.replicas
requested_mem = request.resources.mem_reservation_mb * request.replicas
if requested_cpu > tenant_quota["max_cpu"] or requested_mem > tenant_quota["max_mem_mb"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Resource request exceeds tenant quota. Max CPU: {tenant_quota['max_cpu']}, Max Mem: {tenant_quota['max_mem_mb']}MB"
)
2. 核心部署逻辑
deploy_model 端点是所有魔法发生的地方。它遵循一个严格的顺序:验证 -> 创建隔离资源 -> 部署服务。这里的错误处理至关重要,任何一步失败都必须尝试回滚已创建的资源,以避免产生孤儿资源。
# control_plane/main.py (续)
def cleanup_resources(network_id: str | None, secret_id: str | None):
"""
在部署失败时尝试清理已创建的资源。
这是一个尽力而为的操作,需要记录失败日志。
"""
if secret_id:
try:
secret = client.secrets.get(secret_id)
secret.remove()
logging.info(f"Cleaned up secret: {secret_id}")
except docker.errors.NotFound:
pass # 已经不存在,忽略
except Exception as e:
logging.error(f"Failed to clean up secret {secret_id}: {e}")
if network_id:
try:
network = client.networks.get(network_id)
network.remove()
logging.info(f"Cleaned up network: {network_id}")
except docker.errors.NotFound:
pass
except Exception as e:
logging.error(f"Failed to clean up network {network_id}: {e}")
@app.post("/deploy", status_code=status.HTTP_201_CREATED)
def deploy_model(request: DeploymentRequest):
"""
接收部署请求,创建隔离环境并部署TensorFlow Serving服务。
"""
# 步骤0: 验证租户配额
check_tenant_quota(request.tenant_id, request)
# 生成唯一的资源标识符,防止冲突
deployment_id = str(uuid.uuid4())[:8]
service_name = f"{request.tenant_id}-{request.model_name}-{deployment_id}"
network_name = f"{service_name}-net"
secret_name = f"{service_name}-creds"
network_id, secret_id = None, None
try:
# 步骤1: 为本次部署创建隔离的Overlay网络
logging.info(f"Creating isolated network '{network_name}' for service '{service_name}'")
network = client.networks.create(
name=network_name,
driver="overlay",
scope="swarm",
attachable=True, # 允许调试时手动连接容器
labels={"owner": request.tenant_id, "deployment_id": deployment_id}
)
network_id = network.id
# 步骤2: 如果提供了凭证,创建Swarm Secret
secrets = []
if request.credentials:
logging.info(f"Creating secret '{secret_name}' for service '{service_name}'")
# Secret的数据必须是bytes
secret_data = f'{{"username":"{request.credentials.username}","password":"{request.credentials.password}"}}'.encode('utf-8')
secret = client.secrets.create(
name=secret_name,
data=secret_data,
labels={"owner": request.tenant_id, "deployment_id": deployment_id}
)
secret_id = secret.id
secrets.append(docker.types.SecretReference(secret_id=secret.id, secret_name=secret.name))
# 步骤3: 定义服务参数
# 资源限制单位是nanocpu和bytes
resources = docker.types.Resources(
cpu_limit=int(request.resources.cpu_limit * 1e9),
mem_limit=request.resources.mem_limit_mb * 1024 * 1024,
cpu_reservation=int(request.resources.cpu_reservation * 1e9),
mem_reservation=request.resources.mem_reservation_mb * 1024 * 1024
)
# 定义服务重启策略,避免失败的服务无限重启耗尽资源
restart_policy = docker.types.RestartPolicy(
condition="on-failure",
delay=5,
max_attempts=3,
window=120
)
# 这里的关键点:将服务连接到刚刚创建的专属网络
endpoint_spec = docker.types.EndpointSpec(mode="vip")
logging.info(f"Creating service '{service_name}'...")
service = client.services.create(
image=request.image,
name=service_name,
env=[f"MODEL_BASE_PATH={request.model_base_path}", f"MODEL_NAME={request.model_name}"],
networks=[network_name],
secrets=secrets,
endpoint_spec=endpoint_spec,
resources=resources,
replicas=request.replicas,
restart_policy=restart_policy,
labels={"owner": request.tenant_id, "model": request.model_name, "deployment_id": deployment_id}
)
logging.info(f"Service '{service.name}' created successfully with ID: {service.id}")
return {
"message": "Deployment initiated successfully.",
"service_id": service.id,
"service_name": service.name,
"network_id": network_id,
"secret_id": secret_id
}
except docker.errors.APIError as e:
logging.error(f"Docker API error during deployment for '{service_name}': {e}")
# 部署失败,触发资源清理
cleanup_resources(network_id, secret_id)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to deploy service: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during deployment for '{service_name}': {e}")
cleanup_resources(network_id, secret_id)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")
@app.delete("/undeploy/{service_name}", status_code=status.HTTP_200_OK)
def undeploy_model(service_name: str):
"""
根据服务名称卸载服务并清理相关网络和密钥。
在真实项目中,需要添加租户权限校验,确保租户只能删除自己的服务。
"""
try:
service = client.services.get(service_name)
service_labels = service.attrs['Spec']['Labels']
owner = service_labels.get("owner")
deployment_id = service_labels.get("deployment_id")
# 这是一个关键的安全检查,防止跨租户删除
# if owner != requesting_tenant_id:
# raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied.")
# 开始清理
service.remove()
logging.info(f"Service '{service_name}' removed.")
# 基于标签查找和清理关联资源
if deployment_id:
networks = client.networks.list(filters={"label": f"deployment_id={deployment_id}"})
for net in networks:
net.remove()
logging.info(f"Removed associated network: {net.name}")
secrets = client.secrets.list(filters={"label": f"deployment_id={deployment_id}"})
for sec in secrets:
sec.remove()
logging.info(f"Removed associated secret: {sec.name}")
return {"message": f"Service '{service_name}' and its resources have been undeployed."}
except docker.errors.NotFound:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Service '{service_name}' not found.")
except Exception as e:
logging.error(f"Error during undeployment of '{service_name}': {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to undeploy service.")
部署与测试
将上述Python代码打包成一个Docker镜像,并作为服务部署在Swarm的Manager节点上。
control-plane.Dockerfile:
FROM python:3.11-slim
WORKDIR /app
RUN pip install --no-cache-dir fastapi "uvicorn[standard]" docker
COPY ./control_plane/ /app/
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
部署控制平面服务:
为了让控制平面能与Swarm Manager的Docker socket通信,我们需要将socket挂载到容器内。这是一个高权限操作,必须确保控制平面服务本身是安全的。
# 在 Swarm Manager 节点上执行
docker service create \
--name control-plane \
--publish published=8000,target=8000 \
--mount type=bind,source=/var/run/docker.sock,target=/var/run/docker.sock \
--constraint 'node.role == manager' \
your-registry/control-plane:latest
模拟租户部署请求:
现在,我们可以模拟team-alpha团队发起一个部署请求。
curl -X POST "http://<MANAGER_IP>:8000/deploy" \
-H "Content-Type: application/json" \
-d '{
"tenant_id": "team-alpha",
"model_name": "resnet",
"model_base_path": "/models/resnet",
"image": "tensorflow/serving:latest",
"replicas": 2,
"resources": {
"cpu_limit": 1.0,
"cpu_reservation": 0.5,
"mem_limit_mb": 2048,
"mem_reservation_mb": 1024
}
}'
验证隔离效果:
部署成功后,我们可以在Manager节点上验证资源是否被正确创建和隔离。
检查服务、网络和标签:
# 列出所有服务,可以看到我们刚刚创建的服务,名字包含租户ID和UUID docker service ls | grep "team-alpha-resnet" # 检查其中一个服务的详细信息,关注网络和资源限制 SERVICE_ID=$(docker service ls -q -f "name=team-alpha-resnet" | head -n 1) docker service inspect $SERVICE_ID --pretty # 你会看到类似这样的输出: # ... # Networks: team-alpha-resnet-xxxxxxxx-net # Resources: # Reservations: # NanoCPUs: 500000000 (0.5) # MemoryBytes: 1073741824 (1 GiB) # Limits: # NanoCPUs: 1000000000 (1.0) # MemoryBytes: 2147483648 (2 GiB) # ...检查网络隔离:
# 查看为部署创建的独立网络 docker network ls | grep "team-alpha-resnet" # 启动一个临时容器并连接到该网络,尝试访问其他网络的服务(会失败) docker run -it --rm --network team-alpha-resnet-xxxxxxxx-net nicolaka/netshoot # 在netshoot容器内,你无法ping通其他租户的服务
局限性与未来迭代方向
这个架构虽然解决了核心的隔离和自动化部署问题,但在生产环境中,它仍有几个明显的局限性。
GPU资源隔离: Docker Swarm对GPU的调度和隔离支持远不如Kubernetes成熟。虽然可以通过
--generic-resource标志和节点标签实现初步的GPU分配,但要做到精细化的隔离(如MIG)和监控,会非常复杂。此方案目前更适用于CPU推理负载。服务发现与流量入口: 方案解决了服务的部署和隔离,但没有解决外部流量如何安全、高效地访问到这些动态创建的服务。一个可行的演进方向是引入一个支持Docker作为Provider的API网关或反向代理(如Traefik),让它自动侦测带有特定标签的服务,并为它们创建路由规则。这样,租户就可以通过一个统一的入口,凭API Key或JWT访问自己的模型端点。
持久化状态与配额管理: 当前的控制平面是无状态的,配额检查逻辑也非常初级。一个生产级的系统需要将租户、部署历史、资源使用情况等信息持久化到数据库中。配额检查逻辑也需要重构,以实时计算每个租户当前已占用的总资源,而不仅仅是检查单次请求。
模型的热更新: TensorFlow Serving本身支持无需重启服务即可加载新版本的模型,但这要求模型文件存储在共享卷或对象存储上。当前的架构并未涉及模型文件的管理。后续可以集成一个存储解决方案(如NFS、S3),并将存储卷的挂载权限也作为隔离资源的一部分进行动态配置。