我们面临的挑战是为机器学习模型提供实时特征,这些特征必须在事件发生后的几百毫秒内可供查询。数据源是我们存储在对象存储(如 GCS 或 S3)中的数据湖,每天有数十亿的原始用户行为事件涌入。批处理 ETL 每天更新一次特征,但对于实时推荐、反欺诈等场景,这种延迟是不可接受的。我们需要一个管道,能够以极低的延迟处理事件流,计算特征,并将其存储在高性能数据库中,同时还要保证系统的成本效益和运维简易性。
架构决策的十字路口
在设计这个实时特征管道时,我们评估了两种主流的架构方案。
方案 A: 传统的流处理集群(Flink/Spark Streaming)
这是业界非常成熟的方案。其典型架构如下:
graph TD
A[Data Lake Event Source] --> B(Kafka/Pulsar);
B --> C{Flink/Spark Streaming Cluster};
C --> D[Feature Aggregation Logic];
D --> E(High-Throughput Key-Value Store);
subgraph "传统方案"
B; C; D; E;
end
优势:
- 功能强大: Flink 提供了丰富的状态管理、窗口操作和 Exactly-Once 语义保障,处理复杂的事件序列分析(如会话窗口、滑动窗口聚合)能力极强。
- 生态成熟: 拥有庞大的社区和丰富的连接器,与各种数据源和存储系统集成非常方便。
劣势:
- 运维成本高昂: 维护一个高可用的 Flink 或 Spark Streaming 集群需要专门的团队。资源规划、任务调优、故障恢复都相当复杂。
- 资源利用率问题: 流量波峰波谷明显时,为了应对峰值,集群必须预留大量资源。在流量低谷期,这些资源被闲置,造成了巨大的成本浪费。即便有自动伸缩,其反应速度和粒度也远不如 Serverless 架构。
- 延迟链路较长: 数据从事件源进入消息队列,再由流处理引擎消费、处理、最终写入数据库,整个链路的延迟很难优化到极致。
方案 B: 基于 Knative 的事件驱动 Serverless 架构
这个方案拥抱了云原生的理念,试图用更轻量级、更具弹性的方式解决问题。
graph TD
subgraph "入口防护"
WAF[Cloud WAF] --> Ingress;
end
subgraph "事件源与路由"
S3[Data Lake: S3/GCS Event Notification] --> Broker(Knative Eventing Broker);
end
subgraph "Serverless 处理单元"
Broker -- Trigger --> Service(Kotlin Knative Service);
end
subgraph "高性能存储"
Service --> ScyllaDB[(ScyllaDB Cluster)];
end
优势:
- 极致的成本效益: Knative Service 可以根据请求量自动从零扩展到数百个实例,并在没有请求时缩容到零。这意味着我们只为实际处理的事件付费,没有闲置资源。
- 运维简化: Knative 屏蔽了底层 Kubernetes 的复杂性,开发者只需关注业务逻辑(Kotlin 函数),无需管理服务器、集群伸缩或部署策略。
- 低延迟潜力: 事件直接触发函数执行,链路短。配合高性能语言(Kotlin on JVM/GraalVM)和低延迟数据库(ScyllaDB),端到端延迟可以控制在非常低的水平。
劣势:
- 冷启动问题: 函数从零启动时会有延迟。对于 P999 延迟要求极高的场景,这可能是一个问题。
- 复杂状态管理: Serverless 函数本质上是无状态的。实现像 Flink 那样复杂的窗口聚合逻辑需要借助外部存储(如 ScyllaDB 本身或 Redis),增加了应用层逻辑的复杂性。
- 生态相对年轻: 虽然基于 CloudEvents 标准,但在复杂工作流编排方面,生态工具不如传统方案成熟。
最终选择与理由
我们最终选择了方案 B。决策的核心驱动力是成本和运维效率。我们的业务流量具有明显的周期性,方案 A 的资源浪费问题非常突出。方案 B 的按需付费模式与我们的业务模式完美契合。
对于其劣势,我们进行了如下权衡:
- 冷启动: 我们选择使用 Quarkus 作为 Kotlin 的运行时框架。Quarkus 专为云原生和 Serverless 设计,启动速度极快。对于延迟最敏感的特征,我们可以在 Knative Service 中配置
minScale: 1来保留一个热实例,以可接受的微小成本换取稳定的低延迟。 - 状态管理: 我们分析了业务需求,发现 80% 的实时特征是简单的计数、累加或最新值更新,这些都可以在 ScyllaDB 中通过高效的 Read-Modify-Write 原子操作完成。对于少数复杂的窗口计算,我们决定将其拆分为另一个独立的、可以容忍更高延迟的批处理任务。这是典型的架构权衡,用 80/20 法则解决核心问题。
- 安全: 管道的入口点(接收数据湖事件通知的 webhook 或直接的事件上传 API)必须暴露在公网,因此使用 WAF 进行前置防护是架构中的一个强制安全要求,用于抵御常见的 Web 攻击和非预期的流量洪峰。
核心实现概览
以下是这个架构中关键组件的代码实现和配置细节。
1. ScyllaDB 表结构设计
ScyllaDB 的性能高度依赖于数据模型。我们的目标是支持对特定实体(如用户 user_id)的特征进行快速读写。
-- cqlsh
-- 用于存储用户实时行为特征的表
-- 每一个特征是一个 map,可以动态增删,有很好的扩展性
CREATE KEYSPACE IF NOT EXISTS feature_store WITH REPLICATION = {
'class': 'NetworkTopologyStrategy',
'replication_factor': 3
};
USE feature_store;
CREATE TABLE IF NOT EXISTS user_realtime_features (
user_id text, -- 用户ID,作为分区键,保证同一用户数据落在同一节点
last_updated timestamp, -- 最后更新时间
counters map<text, counter>, -- 计数类特征,如:'clicks_last_1h', 'views_last_10m'
-- 使用 counter 类型可以安全地并发增减,无需读后写
latest_values map<text, text>, -- 最新值类特征,如:'last_viewed_category'
PRIMARY KEY (user_id)
);
-- 一个常见的错误是为每个特征创建一个单独的列。
-- 在真实项目中,特征数量会快速增长到数百个,频繁 ALTER TABLE 是不可接受的。
-- 使用 map 可以在不修改表结构的情况下灵活地支持新特征。
2. Kotlin 特征处理服务 (基于 Quarkus)
我们使用 Quarkus 和 Kotlin 来构建这个服务,以获得快速的启动时间和出色的开发体验。
build.gradle.kts 依赖配置:
plugins {
id("org.jetbrains.kotlin.jvm") version "1.9.20"
id("io.quarkus")
}
repositories {
mavenCentral()
}
val quarkusVersion = "3.5.0"
dependencies {
implementation(enforcedPlatform("io.quarkus.platform:quarkus-bom:$quarkusVersion"))
implementation("io.quarkus:quarkus-kotlin")
implementation("io.quarkus:quarkus-resteasy-reactive-jackson") // 用于处理 HTTP 事件
implementation("io.quarkus:quarkus-cassandra-client") // ScyllaDB Java 驱动
implementation("io.cloudevents:cloudevents-quarkus:2.0.1") // CloudEvents SDK for Quarkus
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
testImplementation("io.quarkus:quarkus-junit5")
testImplementation("io.rest-assured:rest-assured")
}
application.properties 配置:
# Quarkus HTTP server configuration for Knative
quarkus.http.port=8080
# ScyllaDB/Cassandra driver configuration
quarkus.cassandra.contact-points=scylla-node1.namespace.svc:9042,scylla-node2.namespace.svc:9042
quarkus.cassandra.local-datacenter=dc1
quarkus.cassandra.keyspace=feature_store
# Logging configuration for production
quarkus.log.level=INFO
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) %s%e%n
# 使用 JSON 格式的日志更利于集中式日志系统(如 ELK)进行解析
quarkus.log.console.json=true
quarkus.log.console.json.pretty-print=false
核心业务逻辑 FeatureProcessorService.kt:
这个服务接收一个 CloudEvent,解析其数据,并更新 ScyllaDB 中的特征。
package org.example.featurepipeline
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.SimpleStatement
import io.cloudevents.CloudEvent
import io.quarkus.logging.Log
import jakarta.enterprise.context.ApplicationScoped
import jakarta.ws.rs.Consumes
import jakarta.ws.rs.POST
import jakarta.ws.rs.Path
import jakarta.ws.rs.core.Response
import java.net.URI
import java.time.Instant
// 定义事件负载结构
data class UserActionEvent(
val userId: String,
val eventType: String, // e.g., "view", "click", "purchase"
val category: String,
val timestamp: Instant
)
@Path("/")
@ApplicationScoped
class FeatureProcessorService(private val session: CqlSession) {
// 预编译的 CQL 语句是性能优化的关键
// 驱动只需解析和准备一次,后续调用非常高效
private val incrementCounterStmt = session.prepare(
"UPDATE feature_store.user_realtime_features SET counters = counters + ? WHERE user_id = ?"
)
private val updateLatestValueStmt = session.prepare(
"UPDATE feature_store.user_realtime_features SET latest_values = latest_values + ?, last_updated = ? WHERE user_id = ?"
)
@POST
@Consumes("application/cloudevents+json")
fun processEvent(event: CloudEvent): Response {
// 这里的坑在于:CloudEvent 的 data 字段可能不是你期望的类型,
// 必须进行健壮的解析和错误处理。
val eventData = try {
// 这里假设 data 是 JSON 格式的字节数组
// 生产代码中需要更复杂的反序列化逻辑
val data = event.data?.toBytes()?.toString(Charsets.UTF_8)
// A simple placeholder for a real JSON parser
parseJson(data)
} catch (e: Exception) {
Log.errorf(e, "Failed to parse CloudEvent data. Event ID: %s", event.id)
// 返回 400 Bad Request,Knative Eventing 的 Broker 可以根据配置将其发送到死信队列
return Response.status(Response.Status.BAD_REQUEST).entity("Invalid event data").build()
}
if (data == null) {
Log.warnf("Received event with empty data. Event ID: %s", event.id)
return Response.status(Response.Status.BAD_REQUEST).entity("Empty event data").build()
}
try {
// 执行数据库更新
updateFeatures(data)
} catch (e: Exception) {
// 这里的错误通常是数据库连接问题或超时
// Knative 会根据默认策略自动重试,这是 Serverless 的一个优势
Log.errorf(e, "Failed to update features for user %s. Event ID: %s", data.userId, event.id)
// 返回 500 Internal Server Error,触发重试
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Database update failed").build()
}
Log.infof("Successfully processed event %s for user %s", event.id, data.userId)
return Response.ok().build()
}
private fun updateFeatures(data: UserActionEvent) {
// 核心业务逻辑:根据事件类型更新不同的特征
// 这里的操作是幂等的,即使事件被重传,结果也是一致的
when (data.eventType) {
"click" -> {
// 增加点击计数器
val counterToIncrement = mapOf("clicks_last_1h" to 1L)
session.execute(incrementCounterStmt.bind(counterToIncrement, data.userId))
Log.debugf("Incremented click counter for user %s", data.userId)
}
"view" -> {
// 更新最新查看的品类
val valueToUpdate = mapOf("last_viewed_category" to data.category)
session.execute(updateLatestValueStmt.bind(valueToUpdate, data.timestamp, data.userId))
Log.debugf("Updated last viewed category for user %s to %s", data.userId, data.category)
}
// ... 其他事件类型
}
}
// 伪代码,实际应用中应使用 Jackson, Gson 或 kotlinx.serialization
private fun parseJson(jsonString: String?): UserActionEvent? {
if (jsonString.isNullOrBlank()) return null
// In a real app, use a proper JSON library for robust parsing and validation.
val userId = jsonString.substringAfter("\"userId\":\"").substringBefore("\"")
val eventType = jsonString.substringAfter("\"eventType\":\"").substringBefore("\"")
val category = jsonString.substringAfter("\"category\":\"").substringBefore("\"")
return UserActionEvent(userId, eventType, category, Instant.now())
}
}
3. Knative 部署配置
将上述 Kotlin 服务部署为 Knative Service,并创建一个 Trigger 来订阅事件。
service.yaml:
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: feature-processor-service
namespace: default
spec:
template:
metadata:
annotations:
# 关键性能调优参数:允许每个 Pod 并发处理 100 个请求
# 这对于 IO 密集型应用(如我们的服务)非常重要,可以显著提高吞吐量
autoscaling.knative.dev/target: "100"
spec:
containers:
- image: your-registry/feature-processor:latest # 指向你构建的 Docker 镜像
ports:
- containerPort: 8080
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
env:
# 通过环境变量传递配置,而不是硬编码在代码或配置文件中
- name: QUARKUS_CASSANDRA_CONTACT_POINTS
value: "scylla.scylla.svc.cluster.local:9042"
- name: QUARKUS_CASSANDRA_LOCAL_DATACENTER
value: "dc1"
trigger.yaml:
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: feature-processor-trigger
namespace: default
spec:
broker: default # 订阅默认的 Broker
filter:
attributes:
# 只对特定类型的事件感兴趣,避免不必要的函数调用
# 这是一个常见的优化点,在 Broker 层面进行过滤
type: "com.example.useraction"
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: feature-processor-service
架构的扩展性与局限性
当前这套架构优雅地解决了我们 80% 的实时特征计算需求,但它并非万能。
局限性:
- 复杂状态聚合: 对于需要跨多个事件进行复杂状态计算的场景(例如,计算用户在30分钟会话窗口内的平均页面停留时间),纯粹的无状态函数和 K-V 存储组合会变得非常复杂且低效。这种场景下,Flink 仍然是更优的选择。
- 事件顺序: Knative Eventing 默认不保证事件的严格顺序。对于要求强顺序性的业务,需要在应用层面进行处理(例如,使用版本号或时间戳进行冲突检测),或者引入保证顺序的消息队列作为 Broker 的底层实现。
- 冷启动对 P999 延迟的影响: 尽管 Quarkus 很快,但对于极端流量突增,从零启动大量 Pod 仍可能导致部分请求的延迟尖峰。如果业务对 P999 延迟有金融级的严苛要求,需要通过
minScale预热实例或探索 GraalVM Native Image 编译来进一步优化。
未来的优化路径:
- 死信队列 (DLQ): 为 Trigger 配置
delivery.deadLetterSink,将处理失败的事件自动路由到另一个 Topic 或存储,以便进行离线分析和重放,确保数据不丢失。 - 多特征并行处理: 当前是一个服务处理所有特征。未来可以演变成多个独立的 Knative Service,每个服务负责计算一类特征,它们都订阅同一个 Broker。这种微服务化的方式使得团队可以独立开发、部署和扩展各自的特征工程逻辑,提高了整个系统的可维护性。
- GraalVM Native Image: 将 Kotlin/Quarkus 应用编译成原生可执行文件。这可以将启动时间从几秒缩短到几十毫秒,内存占用也显著降低,几乎可以完全消除冷启动问题,但代价是牺牲了 JVM 的一些动态优化能力和更长的编译时间。这是一个需要根据具体性能指标进行权衡的选择。