在Docker Swarm上构建支持动态租户隔离的TensorFlow服务架构


我们面临一个棘手的基础设施问题:多个算法团队需要独立部署和迭代他们的TensorFlow模型,但将整个集群的管理权限下放会引发安全和资源争抢的混乱。为每个团队维护一套独立的基础设施成本过高。我们需要的是一个轻量级、可靠的MLOps平台,它能在共享的Docker Swarm集群上为每个租户(团队)提供计算、网络和凭证的硬性隔离,同时保持操作的简便性。直接使用Kubernetes固然功能强大,但其复杂性和运维成本对于我们当前阶段的团队规模而言有些过度。

我们的目标是构建一个Python控制平面服务。它作为唯一入口,接收来自租户的部署请求,并动态地在Swarm集群中创建完全隔离的、生命周期由租户控制的TensorFlow Serving环境。租户不直接接触Docker命令,只与这个控制平面API交互。

架构设计与决策

经过权衡,最终的架构由三个核心部分组成:一个共享的Docker Swarm集群、一个中心化的Python FastAPI控制平面,以及为每个租户动态创建的隔离资源栈。

graph TD
    subgraph "租户 (Teams)"
        TenantA[团队 A]
        TenantB[团队 B]
    end

    subgraph "控制平面 (Python FastAPI Service)"
        API[API Endpoint: /deploy]
    end

    subgraph "Docker Swarm 集群"
        subgraph "Manager Node"
            SwarmManager[Swarm Manager]
            API -- "使用 Docker SDK" --> SwarmManager
        end
        subgraph "Worker Nodes"
            Worker1[Worker Node 1]
            Worker2[Worker Node 2]
            Worker3[Worker Node 3]
        end
    end

    subgraph "动态资源隔离"
        subgraph "租户 A 的资源栈"
            NetA[Overlay Network: tenant-a-net]
            SecretA[Secret: tenant-a-creds]
            ServiceA[TF Serving Service: tenant-a-model-v1]
        end
        subgraph "租户 B 的资源栈"
            NetB[Overlay Network: tenant-b-net]
            SecretB[Secret: tenant-b-creds]
            ServiceB[TF Serving Service: tenant-b-model-v2]
        end
    end
    
    TenantA -- "HTTP POST 请求" --> API
    TenantB -- "HTTP POST 请求" --> API
    
    SwarmManager -- "调度" --> ServiceA & ServiceB
    ServiceA -- "运行于" --> Worker1
    ServiceB -- "运行于" --> Worker2

    ServiceA -- "连接" --> NetA
    ServiceB -- "连接" --> NetB

技术选型原因:

  1. Docker Swarm: 它的原生Overlay网络提供了开箱即用的L2层网络隔离。docker secret机制是管理敏感数据(如私有模型仓库的凭证)的理想选择。最重要的是,其API和CLI足够简单,便于我们的Python控制平面进行编排,避免了引入庞大的K8s Client库和复杂的RBAC配置。
  2. Python (FastAPI & Docker SDK): FastAPI提供了现代、高性能的Web框架,其基于Pydantic的数据验证能力能确保API入口的健壮性。docker-py SDK则让我们能以编程方式精确控制Swarm中的每一个对象,从网络、密钥到服务定义,这是实现动态隔离的核心。
  3. 隔离策略:
    • 网络隔离: 为每个租户的每次部署创建一个独立的overlay网络。这意味着租户A的模型服务容器无法访问租户B的,除非显式地将它们连接到同一个网络。这是最关键的安全边界。
    • 凭证隔离: 租户通过API提交其私有模型库的访问凭证,控制平面将其创建为Swarm Secret,并只挂载到该租户的服务中。凭证绝不会以环境变量等明文形式暴露。
    • 计算资源隔离: 利用Docker服务的reservationslimits参数,为每个租户的服务设定CPU和内存的预留与上限,防止某个租户的资源滥用影响整个集群的稳定性。

控制平面的实现

控制平面的代码是整个系统的枢纽。它必须健壮、安全,并且能够将抽象的租户请求转化为精确的Docker Swarm指令。

1. API定义与数据模型

我们使用Pydantic来定义部署请求的结构,这不仅能自动生成API文档,还能在数据进入业务逻辑前进行严格校验。

# control_plane/main.py

import os
import uuid
import logging
import docker
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 初始化 FastAPI 应用
app = FastAPI(
    title="MLOps Control Plane for Docker Swarm",
    description="An API to manage multi-tenant TensorFlow Serving deployments."
)

# 初始化 Docker 客户端
# 在真实项目中,应该处理更复杂的Docker守护进程连接配置和异常
try:
    client = docker.from_env()
    client.ping()
    logging.info("Successfully connected to Docker daemon.")
except Exception as e:
    logging.error(f"Failed to connect to Docker daemon: {e}")
    # 在启动时就失败,这是一个致命错误
    exit(1)


class ResourceQuota(BaseModel):
    """定义计算资源配额"""
    cpu_limit: float = Field(0.5, gt=0, description="CPU核心数上限")
    cpu_reservation: float = Field(0.25, gt=0, description="CPU核心数预留")
    mem_limit_mb: int = Field(1024, gt=0, description="内存上限 (MB)")
    mem_reservation_mb: int = Field(512, gt=0, description="内存预留 (MB)")

class ModelCredentials(BaseModel):
    """定义模型仓库的凭证"""
    registry_url: str = Field(..., description="私有仓库地址")
    username: str = Field(..., description="用户名")
    password: str = Field(..., description="密码或访问令牌")

class DeploymentRequest(BaseModel):
    """定义租户的部署请求"""
    tenant_id: str = Field(..., max_length=50, pattern=r'^[a-zA-Z0-9_-]+$', description="租户ID,用于资源隔离")
    model_name: str = Field(..., max_length=100, description="模型名称")
    model_base_path: str = Field(..., description="TensorFlow Serving的模型基础路径 (e.g., /models/my_model)")
    image: str = Field("tensorflow/serving:latest", description="TensorFlow Serving的Docker镜像")
    replicas: int = Field(1, ge=1, le=5, description="服务副本数")
    resources: ResourceQuota = Field(default_factory=ResourceQuota, description="资源配额")
    # 在真实项目中,凭证不应该直接在请求体中传递,这里为了简化示例
    credentials: ModelCredentials | None = Field(None, description="访问私有模型仓库的凭证")

# 在真实项目中,租户信息和配额应该从数据库或配置中心加载
TENANT_CONFIG = {
    "team-alpha": {"max_cpu": 4.0, "max_mem_mb": 8192},
    "team-beta": {"max_cpu": 2.0, "max_mem_mb": 4096},
}

def check_tenant_quota(tenant_id: str, request: DeploymentRequest):
    """一个简化的租户配额检查函数"""
    if tenant_id not in TENANT_CONFIG:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid tenant ID.")
    
    # 这里的坑在于:这只是检查单次部署,没有检查租户已部署服务的总资源
    # 一个完整的实现需要查询该租户所有正在运行的服务,并累加它们的资源预留
    tenant_quota = TENANT_CONFIG[tenant_id]
    requested_cpu = request.resources.cpu_reservation * request.replicas
    requested_mem = request.resources.mem_reservation_mb * request.replicas

    if requested_cpu > tenant_quota["max_cpu"] or requested_mem > tenant_quota["max_mem_mb"]:
         raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Resource request exceeds tenant quota. Max CPU: {tenant_quota['max_cpu']}, Max Mem: {tenant_quota['max_mem_mb']}MB"
        )

2. 核心部署逻辑

deploy_model 端点是所有魔法发生的地方。它遵循一个严格的顺序:验证 -> 创建隔离资源 -> 部署服务。这里的错误处理至关重要,任何一步失败都必须尝试回滚已创建的资源,以避免产生孤儿资源。

# control_plane/main.py (续)

def cleanup_resources(network_id: str | None, secret_id: str | None):
    """
    在部署失败时尝试清理已创建的资源。
    这是一个尽力而为的操作,需要记录失败日志。
    """
    if secret_id:
        try:
            secret = client.secrets.get(secret_id)
            secret.remove()
            logging.info(f"Cleaned up secret: {secret_id}")
        except docker.errors.NotFound:
            pass # 已经不存在,忽略
        except Exception as e:
            logging.error(f"Failed to clean up secret {secret_id}: {e}")

    if network_id:
        try:
            network = client.networks.get(network_id)
            network.remove()
            logging.info(f"Cleaned up network: {network_id}")
        except docker.errors.NotFound:
            pass
        except Exception as e:
            logging.error(f"Failed to clean up network {network_id}: {e}")


@app.post("/deploy", status_code=status.HTTP_201_CREATED)
def deploy_model(request: DeploymentRequest):
    """
    接收部署请求,创建隔离环境并部署TensorFlow Serving服务。
    """
    # 步骤0: 验证租户配额
    check_tenant_quota(request.tenant_id, request)

    # 生成唯一的资源标识符,防止冲突
    deployment_id = str(uuid.uuid4())[:8]
    service_name = f"{request.tenant_id}-{request.model_name}-{deployment_id}"
    network_name = f"{service_name}-net"
    secret_name = f"{service_name}-creds"

    network_id, secret_id = None, None
    try:
        # 步骤1: 为本次部署创建隔离的Overlay网络
        logging.info(f"Creating isolated network '{network_name}' for service '{service_name}'")
        network = client.networks.create(
            name=network_name,
            driver="overlay",
            scope="swarm",
            attachable=True, # 允许调试时手动连接容器
            labels={"owner": request.tenant_id, "deployment_id": deployment_id}
        )
        network_id = network.id

        # 步骤2: 如果提供了凭证,创建Swarm Secret
        secrets = []
        if request.credentials:
            logging.info(f"Creating secret '{secret_name}' for service '{service_name}'")
            # Secret的数据必须是bytes
            secret_data = f'{{"username":"{request.credentials.username}","password":"{request.credentials.password}"}}'.encode('utf-8')
            secret = client.secrets.create(
                name=secret_name,
                data=secret_data,
                labels={"owner": request.tenant_id, "deployment_id": deployment_id}
            )
            secret_id = secret.id
            secrets.append(docker.types.SecretReference(secret_id=secret.id, secret_name=secret.name))
        
        # 步骤3: 定义服务参数
        # 资源限制单位是nanocpu和bytes
        resources = docker.types.Resources(
            cpu_limit=int(request.resources.cpu_limit * 1e9),
            mem_limit=request.resources.mem_limit_mb * 1024 * 1024,
            cpu_reservation=int(request.resources.cpu_reservation * 1e9),
            mem_reservation=request.resources.mem_reservation_mb * 1024 * 1024
        )
        
        # 定义服务重启策略,避免失败的服务无限重启耗尽资源
        restart_policy = docker.types.RestartPolicy(
            condition="on-failure",
            delay=5,
            max_attempts=3,
            window=120
        )

        # 这里的关键点:将服务连接到刚刚创建的专属网络
        endpoint_spec = docker.types.EndpointSpec(mode="vip")

        logging.info(f"Creating service '{service_name}'...")
        service = client.services.create(
            image=request.image,
            name=service_name,
            env=[f"MODEL_BASE_PATH={request.model_base_path}", f"MODEL_NAME={request.model_name}"],
            networks=[network_name],
            secrets=secrets,
            endpoint_spec=endpoint_spec,
            resources=resources,
            replicas=request.replicas,
            restart_policy=restart_policy,
            labels={"owner": request.tenant_id, "model": request.model_name, "deployment_id": deployment_id}
        )

        logging.info(f"Service '{service.name}' created successfully with ID: {service.id}")
        return {
            "message": "Deployment initiated successfully.",
            "service_id": service.id,
            "service_name": service.name,
            "network_id": network_id,
            "secret_id": secret_id
        }

    except docker.errors.APIError as e:
        logging.error(f"Docker API error during deployment for '{service_name}': {e}")
        # 部署失败,触发资源清理
        cleanup_resources(network_id, secret_id)
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to deploy service: {e}")
    except Exception as e:
        logging.error(f"An unexpected error occurred during deployment for '{service_name}': {e}")
        cleanup_resources(network_id, secret_id)
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred.")


@app.delete("/undeploy/{service_name}", status_code=status.HTTP_200_OK)
def undeploy_model(service_name: str):
    """
    根据服务名称卸载服务并清理相关网络和密钥。
    在真实项目中,需要添加租户权限校验,确保租户只能删除自己的服务。
    """
    try:
        service = client.services.get(service_name)
        service_labels = service.attrs['Spec']['Labels']
        owner = service_labels.get("owner")
        deployment_id = service_labels.get("deployment_id")
        
        # 这是一个关键的安全检查,防止跨租户删除
        # if owner != requesting_tenant_id:
        #     raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied.")

        # 开始清理
        service.remove()
        logging.info(f"Service '{service_name}' removed.")
        
        # 基于标签查找和清理关联资源
        if deployment_id:
            networks = client.networks.list(filters={"label": f"deployment_id={deployment_id}"})
            for net in networks:
                net.remove()
                logging.info(f"Removed associated network: {net.name}")
            
            secrets = client.secrets.list(filters={"label": f"deployment_id={deployment_id}"})
            for sec in secrets:
                sec.remove()
                logging.info(f"Removed associated secret: {sec.name}")

        return {"message": f"Service '{service_name}' and its resources have been undeployed."}

    except docker.errors.NotFound:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Service '{service_name}' not found.")
    except Exception as e:
        logging.error(f"Error during undeployment of '{service_name}': {e}")
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to undeploy service.")

部署与测试

将上述Python代码打包成一个Docker镜像,并作为服务部署在Swarm的Manager节点上。

control-plane.Dockerfile:

FROM python:3.11-slim

WORKDIR /app

RUN pip install --no-cache-dir fastapi "uvicorn[standard]" docker

COPY ./control_plane/ /app/

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

部署控制平面服务:
为了让控制平面能与Swarm Manager的Docker socket通信,我们需要将socket挂载到容器内。这是一个高权限操作,必须确保控制平面服务本身是安全的。

# 在 Swarm Manager 节点上执行
docker service create \
  --name control-plane \
  --publish published=8000,target=8000 \
  --mount type=bind,source=/var/run/docker.sock,target=/var/run/docker.sock \
  --constraint 'node.role == manager' \
  your-registry/control-plane:latest

模拟租户部署请求:

现在,我们可以模拟team-alpha团队发起一个部署请求。

curl -X POST "http://<MANAGER_IP>:8000/deploy" \
-H "Content-Type: application/json" \
-d '{
  "tenant_id": "team-alpha",
  "model_name": "resnet",
  "model_base_path": "/models/resnet",
  "image": "tensorflow/serving:latest",
  "replicas": 2,
  "resources": {
    "cpu_limit": 1.0,
    "cpu_reservation": 0.5,
    "mem_limit_mb": 2048,
    "mem_reservation_mb": 1024
  }
}'

验证隔离效果:

部署成功后,我们可以在Manager节点上验证资源是否被正确创建和隔离。

  1. 检查服务、网络和标签:

    # 列出所有服务,可以看到我们刚刚创建的服务,名字包含租户ID和UUID
    docker service ls | grep "team-alpha-resnet"
    
    # 检查其中一个服务的详细信息,关注网络和资源限制
    SERVICE_ID=$(docker service ls -q -f "name=team-alpha-resnet" | head -n 1)
    docker service inspect $SERVICE_ID --pretty
    
    # 你会看到类似这样的输出:
    # ...
    # Networks: team-alpha-resnet-xxxxxxxx-net
    # Resources:
    #  Reservations:
    #   NanoCPUs: 500000000 (0.5)
    #   MemoryBytes: 1073741824 (1 GiB)
    #  Limits:
    #   NanoCPUs: 1000000000 (1.0)
    #   MemoryBytes: 2147483648 (2 GiB)
    # ...
  2. 检查网络隔离:

    # 查看为部署创建的独立网络
    docker network ls | grep "team-alpha-resnet"
    
    # 启动一个临时容器并连接到该网络,尝试访问其他网络的服务(会失败)
    docker run -it --rm --network team-alpha-resnet-xxxxxxxx-net nicolaka/netshoot
    # 在netshoot容器内,你无法ping通其他租户的服务

局限性与未来迭代方向

这个架构虽然解决了核心的隔离和自动化部署问题,但在生产环境中,它仍有几个明显的局限性。

  1. GPU资源隔离: Docker Swarm对GPU的调度和隔离支持远不如Kubernetes成熟。虽然可以通过--generic-resource标志和节点标签实现初步的GPU分配,但要做到精细化的隔离(如MIG)和监控,会非常复杂。此方案目前更适用于CPU推理负载。

  2. 服务发现与流量入口: 方案解决了服务的部署和隔离,但没有解决外部流量如何安全、高效地访问到这些动态创建的服务。一个可行的演进方向是引入一个支持Docker作为Provider的API网关或反向代理(如Traefik),让它自动侦测带有特定标签的服务,并为它们创建路由规则。这样,租户就可以通过一个统一的入口,凭API Key或JWT访问自己的模型端点。

  3. 持久化状态与配额管理: 当前的控制平面是无状态的,配额检查逻辑也非常初级。一个生产级的系统需要将租户、部署历史、资源使用情况等信息持久化到数据库中。配额检查逻辑也需要重构,以实时计算每个租户当前已占用的总资源,而不仅仅是检查单次请求。

  4. 模型的热更新: TensorFlow Serving本身支持无需重启服务即可加载新版本的模型,但这要求模型文件存储在共享卷或对象存储上。当前的架构并未涉及模型文件的管理。后续可以集成一个存储解决方案(如NFS、S3),并将存储卷的挂载权限也作为隔离资源的一部分进行动态配置。


  目录