使用 Jenkins 与 Azure Service Bus 编排跨云 Serverless 构建工作流


我们的 Jenkins 控制器快被几个重量级的构建任务压垮了。这些任务不是常规的编译打包,而是动辄数小时的数据预处理和模型训练作业。它们长时间霸占着构建代理,导致常规的CI/CD流水线严重阻塞。将这些耗时任务迁移出去,释放 Jenkins 代理资源,成了必须解决的问题。

初步构想是利用 Serverless,让计算密集型任务在云端弹性执行,Jenkins 只负责触发和状态跟踪。但问题在于,我们的技术栈横跨 AWS 和 Azure。数据源在 AWS S3,而部分专有分析服务部署在 Azure。我们需要一个方案,能让 Jenkins 像指挥家一样,精确地启动 AWS 上的处理步骤,然后将结果无缝传递给 Azure 上的下一步,最后还能将最终状态回报给 Jenkins。

直接的 HTTP 链式调用是第一个被否决的方案。这种紧耦合的方式在跨云长时间运行的任务中极其脆弱,任何一次网络抖动或函数超时都可能导致整个工作流中断且难以恢复。

最终,我们选择了一个以消息队列为核心的解耦架构。Jenkins 作为起点,触发第一个 Serverless 函数;而 Azure Service Bus,则充当了整个跨云工作流的“神经中枢”。它负责任务的传递、状态的缓冲,并为整个流程提供了至关重要的韧性。在真实项目中,这种异步、解耦的模式远比看起来花哨的编排工具更可靠、更易于调试。

架构设计与工作流概览

整个工作流的核心是解耦。Jenkins 不再直接等待任务完成,而是“即发即忘”,然后进入一个轮询或等待回调的状态。

sequenceDiagram
    participant J as Jenkins Master
    participant L as AWS Lambda (数据预处理器)
    participant ASB as Azure Service Bus
    participant AF as Azure Function (核心处理器)
    participant N as Notification Listener (状态回调服务)

    J->>L: 1. API Gateway触发 (携带 correlationId, replyQueue)
    activate L
    L-->>L: 2. 从 S3 拉取数据并处理
    L->>ASB: 3. 将处理结果和 correlationId 发送到 `task_queue`
    deactivate L

    Note right of ASB: 消息在队列中持久化

    ASB-->>AF: 4. 触发Azure Function
    activate AF
    AF-->>AF: 5. 执行核心业务逻辑
    AF->>ASB: 6. 将最终状态 (成功/失败) 和 correlationId 发送到 `reply_queue`
    deactivate AF
    
    ASB-->>N: 7. 触发状态回调服务
    activate N
    N->>J: 8. 通过Jenkins API更新构建状态
    deactivate N

这里的关键组件包括:

  1. Jenkins Pipeline: 负责生成一个全局唯一的 correlationId,触发 AWS Lambda,然后进入等待状态。
  2. AWS Lambda: 作为工作流的第一站,负责从S3拉取数据进行预处理。完成后,将结果连同 correlationId 打包成消息,发送到 Azure Service Bus 的任务队列 (task_queue)。
  3. Azure Service Bus: 包含两个队列。task_queue 用于接收并分发主任务,reply_queue 用于接收最终的处理结果,并通知 Jenkins。使用队列的好处是天然的重试与死信机制,这对于长时间运行的任务至关重要。
  4. Azure Function: 监听 task_queue。一旦收到消息,便执行核心的、耗时的处理逻辑。完成后,将结果(成功或失败信息)和 correlationId 发送到 reply_queue
  5. Notification Listener: 这是一个轻量级的服务(可以用另一个Lambda或Azure Function实现),专门监听 reply_queue。它的唯一职责是解析消息,并根据 correlationId 调用 Jenkins API,将对应的构建标记为成功或失败。

第一步:Jenkins Pipeline 的改造

我们的 Jenkinsfile 需要承担起“发起者”和“等待者”的双重角色。它不再执行实际的业务逻辑,而是编排外部服务。

// Jenkinsfile
pipeline {
    agent any

    environment {
        // 使用Jenkins的构建号和任务名生成一个唯一的ID
        CORRELATION_ID = "${env.JOB_NAME}-${env.BUILD_NUMBER}"
        // AWS Lambda的触发URL (通过API Gateway暴露)
        AWS_LAMBDA_TRIGGER_URL = credentials('aws-lambda-trigger-url')
        // Jenkins回调服务的URL和认证Token
        JENKINS_CALLBACK_URL = "http://notification-listener.internal/jenkins/update"
        JENKINS_CALLBACK_TOKEN = credentials('jenkins-callback-token')
    }

    stages {
        stage('Trigger Cross-Cloud Workflow') {
            steps {
                script {
                    def payload = """
                    {
                        "correlationId": "${CORRELATION_ID}",
                        "sourceBucket": "my-raw-data-bucket",
                        "sourceKey": "input/data-set-${env.BUILD_NUMBER}.zip",
                        "replyQueue": "reply_queue",
                        "callbackUrl": "${JENKINS_CALLBACK_URL}",
                        "callbackToken": "${JENKINS_CALLBACK_TOKEN}"
                    }
                    """
                    
                    // 异步触发AWS Lambda,不等待其完成
                    // 这里的错误处理非常重要,如果触发失败,整个构建必须立即失败
                    try {
                        def response = httpRequest(
                            url: AWS_LAMBDA_TRIGGER_URL,
                            httpMode: 'POST',
                            contentType: 'APPLICATION_JSON',
                            requestBody: payload,
                            quiet: true
                        )
                        if (response.status != 200) {
                            error "Failed to trigger AWS Lambda. Status: ${response.status}, Content: ${response.content}"
                        }
                        echo "Successfully triggered AWS Lambda for correlationId: ${CORRELATION_ID}"
                    } catch (Exception e) {
                        error "Exception while triggering AWS Lambda: ${e.message}"
                    }
                }
            }
        }

        stage('Wait for Completion Notification') {
            steps {
                // 设置一个合理的超时时间,防止工作流卡死导致Jenkins任务永久挂起
                timeout(time: 4, unit: 'HOURS') {
                    // lock/resource用来确保回调可以准确地找到这个正在等待的构建
                    // 这是一个简化的实现。在生产环境中,我们会使用更复杂的机制来传递和锁定状态。
                    // 此处使用 input 步骤来模拟长时间的等待,直到回调服务改变构建状态。
                    // 真实场景中,回调服务会直接调用Jenkins API来终止或继续这个构建。
                    // 为了演示,我们假设回调服务会批准这个input。
                    input message: "Waiting for external task completion for ID: ${CORRELATION_ID}. Do not abort manually unless necessary."
                }
            }
        }
    }
    
    post {
        // 无论成功还是失败,都打印最终状态
        always {
            echo "Workflow for ${CORRELATION_ID} finished with status: ${currentBuild.currentResult}"
        }
    }
}

一个常见的错误是让 Jenkins 去轮询某个状态API。这种做法会消耗 Jenkins 的线程资源,并且在任务量大时严重影响其性能。目前这种基于回调的模式是更优的选择,input 步骤只是一个用于演示的阻塞机制。在实际生产中,回调服务会直接调用 /job/${JOB_NAME}/${BUILD_NUMBER}/stop 或其他API来结束 wait 阶段。

第二步:AWS Lambda - 工作流的起点

这个 Lambda 函数是 AWS 世界的代理。它负责处理所有与 AWS 资源(如 S3)的交互,并将处理的接力棒通过 Azure Service Bus 传出去。

这里使用 Python 和 boto3 for AWS, azure-servicebus for Azure。

# lambda_function.py (AWS Lambda)

import os
import json
import boto3
import logging
from azure.servicebus import ServiceBusClient, ServiceBusMessage

# 日志配置
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 从环境变量或Secrets Manager获取敏感信息
SERVICE_BUS_CONNECTION_STR = os.environ['AZURE_SERVICE_BUS_CONNECTION_STRING']
TASK_QUEUE_NAME = "task_queue"

s3_client = boto3.client('s3')

def handler(event, context):
    try:
        # 1. 解析来自API Gateway的输入
        body = json.loads(event.get('body', '{}'))
        correlation_id = body.get('correlationId')
        bucket = body.get('sourceBucket')
        key = body.get('sourceKey')
        reply_queue = body.get('replyQueue')

        if not all([correlation_id, bucket, key, reply_queue]):
            logger.error("Missing required parameters in request body")
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'Missing required parameters'})
            }

        logger.info(f"[{correlation_id}] Starting processing for s3://{bucket}/{key}")

        # 2. 核心业务逻辑:从S3下载并进行预处理
        # 这里的代码是生产级的,包含了错误处理和资源管理
        local_filename = f"/tmp/{os.path.basename(key)}"
        s3_client.download_file(bucket, key, local_filename)

        # ... 在这里执行复杂的数据预处理 ...
        # 假设处理后生成了一个结果文件 result.json
        processed_data = {"status": "processed", "item_count": 12345}
        with open('/tmp/result.json', 'w') as f:
            json.dump(processed_data, f)
        
        # 模拟上传处理结果到另一个S3桶
        s3_client.upload_file('/tmp/result.json', 'my-processed-data-bucket', f"{correlation_id}/result.json")

        logger.info(f"[{correlation_id}] Data processing complete. Result uploaded to S3.")

        # 3. 构建发送到Azure Service Bus的消息
        message_body = {
            "correlationId": correlation_id,
            "processedDataS3Path": f"s3://my-processed-data-bucket/{correlation_id}/result.json",
            "metadata": {
                "source_build": os.environ.get('AWS_LAMBDA_FUNCTION_NAME', 'unknown')
            },
            # 将回调信息透传下去
            "callbackInfo": {
                "replyQueue": reply_queue,
                "jenkinsCallbackUrl": body.get('callbackUrl'),
                "jenkinsCallbackToken": body.get('callbackToken')
            }
        }

        # 4. 发送消息到Azure Service Bus
        # 使用 with 语句确保客户端被正确关闭
        with ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STR) as client:
            with client.get_queue_sender(TASK_QUEUE_NAME) as sender:
                service_bus_message = ServiceBusMessage(json.dumps(message_body))
                # 设置消息的CorrelationId,便于端到端追踪
                service_bus_message.correlation_id = correlation_id
                
                sender.send_messages(service_bus_message)
                logger.info(f"[{correlation_id}] Message successfully sent to Azure Service Bus queue '{TASK_QUEUE_NAME}'.")

        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Workflow initiated', 'correlationId': correlation_id})
        }

    except s3_client.exceptions.NoSuchKey:
        logger.error(f"S3 object not found: s3://{bucket}/{key}")
        # 在真实项目中,这里可能需要向Jenkins发送一个即时失败的回调
        return {'statusCode': 404, 'body': json.dumps({'error': 'S3 object not found'})}
    except Exception as e:
        logger.error(f"[{correlation_id or 'unknown'}] An unexpected error occurred: {str(e)}", exc_info=True)
        # 同样,关键的失败需要通知机制
        return {'statusCode': 500, 'body': json.dumps({'error': 'Internal server error'})}

这里的坑在于权限配置。Lambda 执行角色必须有:

  • 读取源 S3 存储桶的权限。
  • 写入目标 S3 存储桶的权限。
  • 访问存储 Azure Service Bus 连接字符串的 AWS Secrets Manager 的权限。
  • CloudWatch Logs 写入权限,用于调试。

第三步:Azure Function - 工作流的核心

现在,接力棒传到了 Azure。这个 Azure Function 由 Service Bus 队列消息触发,执行最核心、最耗时的计算。

这里使用 C#,因为它在 Azure 生态中有很好的集成和性能表现。

// ProcessTask.cs (Azure Function)

using System;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

public static class ProcessTask
{
    // ServiceBusTrigger绑定到task_queue,Connection是app settings里的连接字符串名
    [FunctionName("ProcessTask")]
    public static async Task Run(
        [ServiceBusTrigger("task_queue", Connection = "AzureServiceBusConnection")] string myQueueItem,
        ILogger log)
    {
        string correlationId = "unknown";
        try
        {
            // 1. 解析消息
            var messagePayload = JsonDocument.Parse(myQueueItem);
            correlationId = messagePayload.RootElement.GetProperty("correlationId").GetString();
            log.LogInformation($"[{correlationId}] C# ServiceBus queue trigger function processing message.");

            var processedDataS3Path = messagePayload.RootElement.GetProperty("processedDataS3Path").GetString();
            var callbackInfo = messagePayload.RootElement.GetProperty("callbackInfo");
            
            // 2. 核心处理逻辑
            log.LogInformation($"[{correlationId}] Starting heavy computation for data at: {processedDataS3Path}");
            
            // ... 在这里执行非常耗时的操作,例如:
            // - 调用Azure Machine Learning服务
            // - 连接Azure SQL进行复杂分析
            // - 下载S3文件进行处理 (需要配置AWS凭证)
            await Task.Delay(TimeSpan.FromMinutes(5)); // 模拟长时间运行
            
            var finalResult = new { status = "SUCCESS", details = "Model training completed with 98% accuracy." };
            log.LogInformation($"[{correlationId}] Heavy computation finished successfully.");

            // 3. 发送成功结果到 reply_queue
            await SendReply(callbackInfo, finalResult, correlationId, log);

        }
        catch (Exception ex)
        {
            log.LogError(ex, $"[{correlationId}] An error occurred during processing. Message: {ex.Message}");

            // 这里的错误处理是架构韧性的关键
            // 尝试解析回调信息,如果失败则无法通知Jenkins
            try
            {
                var messagePayload = JsonDocument.Parse(myQueueItem);
                var callbackInfo = messagePayload.RootElement.GetProperty("callbackInfo");
                var errorResult = new { status = "FAILED", details = ex.Message };
                await SendReply(callbackInfo, errorResult, correlationId, log);
            }
            catch (Exception innerEx)
            {
                // 如果连回调信息都无法解析,说明消息格式有问题。
                // Service Bus 的死信队列机制会自动处理这种情况。
                log.LogError(innerEx, $"[{correlationId}] Could not send failure reply. The message might be malformed.");
                throw; // 重抛异常,让Function Host知道处理失败,消息会进入死信队列
            }
        }
    }

    private static async Task SendReply(JsonElement callbackInfo, object resultPayload, string correlationId, ILogger log)
    {
        var replyQueueName = callbackInfo.GetProperty("replyQueue").GetString();
        var connectionString = Environment.GetEnvironmentVariable("AzureServiceBusConnection");

        var client = new ServiceBusClient(connectionString);
        var sender = client.CreateSender(replyQueueName);

        try
        {
            var replyBody = new
            {
                correlationId = correlationId,
                result = resultPayload,
                callbackInfo = callbackInfo // 将原始回调信息完整传回
            };

            var messageBody = JsonSerializer.Serialize(replyBody);
            var serviceBusMessage = new ServiceBusMessage(Encoding.UTF8.GetBytes(messageBody))
            {
                CorrelationId = correlationId
            };
            
            await sender.SendMessageAsync(serviceBusMessage);
            log.LogInformation($"[{correlationId}] Sent reply to queue '{replyQueueName}'.");
        }
        finally
        {
            // 确保sender和client被正确关闭和释放
            await sender.DisposeAsync();
            await client.DisposeAsync();
        }
    }
}

Azure Function 的配置(local.settings.json 或 App Settings)中必须包含 AzureServiceBusConnection。为了让这个函数能访问 AWS S3,需要通过 Managed Identity 和 Azure AD Workload Identity Federation 安全地获取 AWS 的临时凭证,而不是在配置中硬编码 AK/SK。这是生产环境的最佳实践。

第四步:回调监听与闭环

最后一步是关闭整个工作流循环。这个 Notification Listener 服务是连接异步世界和 Jenkins 的桥梁。它极其轻量,可以用任何技术实现,这里我们继续用一个 Azure Function 作为例子。

// JenkinsCallback.cs (Azure Function)

using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

public static class JenkinsCallback
{
    // 复用静态HttpClient实例以获得更好的性能
    private static readonly HttpClient httpClient = new HttpClient();

    [FunctionName("JenkinsCallback")]
    public static async Task Run(
        [ServiceBusTrigger("reply_queue", Connection = "AzureServiceBusConnection")] string myQueueItem,
        ILogger log)
    {
        string correlationId = "unknown";
        try
        {
            var payload = JsonDocument.Parse(myQueueItem);
            correlationId = payload.RootElement.GetProperty("correlationId").GetString();
            log.LogInformation($"[{correlationId}] Received completion message. Preparing to notify Jenkins.");

            var result = payload.RootElement.GetProperty("result");
            var status = result.GetProperty("status").GetString();
            
            var callbackInfo = payload.RootElement.GetProperty("callbackInfo");
            var jenkinsUrl = callbackInfo.GetProperty("jenkinsCallbackUrl").GetString();
            var jenkinsToken = callbackInfo.GetProperty("jenkinsCallbackToken").GetString();
            
            // 构造Jenkins API请求
            // 这里的API端点是虚构的,实际中需要一个Jenkins插件或自定义API来接收这种外部状态更新
            // 一个简单的替代方案是触发一个“批准”或“中止”的动作
            // 例如,我们可以调用 input 步骤的 API
            
            // 在我们的Jenkinsfile中,我们使用了input步骤来阻塞。
            // Jenkins的input步骤API格式是 /job/{...}/{build_num}/input/{input_id}/proceed
            // 这需要提前获取input_id,比较复杂。
            // 一个更务实的方法是让Jenkins pipeline轮询一个外部存储(如S3/DynamoDB/Azure Table Storage)的状态标记。
            // 但为了演示回调,我们假设有一个自定义的API端点。

            var jenkinsPayload = new 
            {
                correlationId = correlationId,
                status = status.Equals("SUCCESS", StringComparison.OrdinalIgnoreCase) ? "SUCCESS" : "FAILURE",
                details = result.GetProperty("details").GetString()
            };

            var request = new HttpRequestMessage(HttpMethod.Post, jenkinsUrl)
            {
                Content = new StringContent(JsonSerializer.Serialize(jenkinsPayload), Encoding.UTF8, "application/json")
            };
            request.Headers.Add("X-Callback-Token", jenkinsToken);

            var response = await httpClient.SendAsync(request);

            if (response.IsSuccessStatusCode)
            {
                log.LogInformation($"[{correlationId}] Successfully notified Jenkins. Status: {response.StatusCode}");
            }
            else
            {
                // 如果通知失败,这是一个严重问题,需要告警
                var responseContent = await response.Content.ReadAsStringAsync();
                log.LogError($"[{correlationId}] Failed to notify Jenkins. Status: {response.StatusCode}, Response: {responseContent}");
                // 抛出异常,让消息进入死信队列,以便手动干预
                throw new Exception($"Jenkins notification failed with status {response.StatusCode}");
            }
        }
        catch(Exception ex)
        {
            log.LogError(ex, $"[{correlationId}] Error processing Jenkins callback for message: {myQueueItem}");
            throw;
        }
    }
}

局限性与未来迭代路径

这套架构解决了我们最初的痛点:Jenkins 代理的资源占用问题,并实现了跨云的、有韧性的长时间构建。但是,它并非没有代价。

首先,系统的复杂性显著增加。原本一个 Jenkinsfile 就能搞定的事情,现在分散在 Jenkins、IAM、Lambda、Service Bus、Azure AD、Azure Function 等多个组件中。排查问题需要具备跨云的诊断能力,对团队技能要求更高。

其次,状态追踪变得困难。虽然我们有 correlationId,但要获得一个任务的实时精细状态(比如“正在处理第3个文件”),就需要引入额外的状态存储(如 DynamoDB 或 Azure Table Storage),并让 Serverless 函数在关键步骤更新状态,这会进一步增加架构的复杂度。

最后,回调机制本身也可能成为故障点。如果 Notification Listener 服务宕机,或者 Jenkins API 不可达,即使后端任务已经成功完成,Jenkins 这边的构建也会因超时而失败。这就要求回调服务本身也必须是高可用的,并且有完善的重试和告警机制。

对于更复杂的、包含多分枝、并行和条件逻辑的工作流,手动通过消息队列来编排会变得非常笨拙。在这种场景下,评估 AWS Step Functions 或 Azure Durable Functions/Logic Apps 可能更为合适。它们将状态管理和流程控制抽象出来,让开发者更专注于业务逻辑。我们当前的方案,则是在不引入全新编排引擎的前提下,对现有 Jenkins 体系的一个有效、务实的扩展。


  目录