基于Elixir与ClickHouse构建CQRS模式的高吞吐量实时指标管道


系统监控面板上的 P99 延迟曲线开始变得陡峭,告警信息不断涌入。最初用于支撑业务运营的单体 PostgreSQL 数据库,在每秒数千个用户行为事件的写入压力下,已经不堪重负。更糟糕的是,运营团队需要近乎实时的复杂分析查询,例如“过去5分钟内,完成购买路径A但未点击推广B的iOS新用户群体画像”,这些查询在主库上执行,直接导致了整个系统的雪崩。问题已经非常明确:写入负载与分析负载相互干扰,传统架构无法同时满足高吞uto吞吐量写入和低延迟的复杂查询。

初步的构想是必须将命令(写入)与查询(读取)的路径彻底分离,这自然而然地引向了 CQRS (Command Query Responsibility Segregation) 架构模式。我们的目标是构建一个系统,它能:

  1. 无压力地处理每秒至少 10,000 个事件的写入。
  2. 在数十亿条记录的数据集上,对复杂聚合查询提供秒级响应。
  3. 整个数据管道具备高可用性和容错能力。
  4. 部署和变更流程必须完全自动化。

技术选型决策

选择合适的技术栈是成败的关键。我们没有选择主流的 Java/Spring 或 Go,而是进行了一系列更具针对性的评估。

  • 命令处理层 (Command Side): Elixir/OTP
    为什么是 Elixir?因为我们需要处理海量的并发连接和数据流,同时对容错性有极高的要求。Elixir 基于 BEAM 虚拟机,其轻量级进程(Actor模型)和内置的监督树(Supervision Tree)机制,是构建高并发、高韧性数据摄取服务的完美选择。一个事件处理失败,只会导致对应的进程重启,而不会影响到整个系统。这比在其他语言中手动管理线程池和异常恢复要优雅和可靠得多。

  • 查询模型存储 (Query Side): ClickHouse
    为什么是 ClickHouse?因为我们的查询场景是典型的 OLAP(在线分析处理)。我们需要对海量数据进行快速的聚合、筛选和统计。ClickHouse 作为列式存储数据库,其向量化查询引擎、数据压缩能力以及对时间序列数据的原生支持,使其在同类产品中脱颖而出。相比于 Elasticsearch(更侧重全文搜索)或传统数据仓库(延迟太高),ClickHouse 能在极大的数据量下提供我们所追求的亚秒级查询性能。

  • 命令模型持久化: PostgreSQL
    虽然事件是事实的来源,但我们依然需要一个地方存储聚合的当前状态,用于命令执行时的业务逻辑校验。PostgreSQL 在这里扮演一个稳定、可靠的事务性数据存储角色,它的任务很纯粹:只处理低并发的状态更新和校验读取,完全避免了分析类慢查询。

  • 自动化部署: Jenkins
    我们的系统是异构的:一个 Elixir 应用,一个 PostgreSQL 数据库,一个 ClickHouse 集群。管理这样一个系统的部署、数据库 schema 变更和版本迭代,必须自动化。Jenkins 及其 Pipeline as Code (Jenkinsfile) 功能提供了一个灵活且强大的平台,可以编排整个发布流程,从代码编译、测试到多阶段的数据库迁移和应用部署。

架构与数据流

核心的数据流设计如下,它清晰地体现了 CQRS 的分离思想。

graph TD
    subgraph "写入路径 (Command Path)"
        A[Client] -- HTTP POST /event --> B[Phoenix API Endpoint];
        B -- Async Dispatch --> C{Elixir Command Bus};
        C -- Command --> D[Aggregate GenServer];
        D -- Validate & Persist State --> E[PostgreSQL];
        D -- Emit Event --> F{Elixir Event Bus};
    end

    subgraph "数据投影 (Projection)"
        F -- Event --> G[ClickHouse Projector];
        G -- Batch Insert --> H[ClickHouse];
    end

    subgraph "查询路径 (Query Path)"
        I[Analytics Dashboard] -- HTTP GET /query --> J[Phoenix Query API];
        J -- SQL Query --> H;
        H -- Result Set --> J;
        J -- JSON Response --> I;
    end

步骤化实现:代码中的权衡与细节

1. Elixir 命令处理与事件生成

首先,我们定义命令和事件。在 Elixir 中,这通常是简单的 struct

# lib/metrics/tracking/commands.ex
defmodule Metrics.Tracking.Commands do
  defstruct [:track_event]
  defstruct type: nil, session_id: nil, user_id: nil, payload: %{}
end

# lib/metrics/tracking/events.ex
defmodule Metrics.Tracking.Events do
  defstruct EventTracked, session_id: nil, user_id: nil, event_type: nil, properties: %{}, timestamp: nil
end

接下来是核心的 Aggregate,我们使用 GenServer 来实现。每个 session_id 对应一个 GenServer 进程,这充分利用了 BEAM 的并发能力。

# lib/metrics/tracking/session_aggregate.ex
defmodule Metrics.Tracking.SessionAggregate do
  use GenServer
  alias Metrics.Tracking.Commands.TrackEvent
  alias Metrics.Tracking.Events.EventTracked
  alias Metrics.EventBus

  # Public API
  def start_link(session_id) do
    GenServer.start_link(__MODULE__, %{session_id: session_id}, name: via_tuple(session_id))
  end

  def track_event(session_id, command) do
    GenServer.cast(via_tuple(session_id), {:track_event, command})
  end

  # GenServer Callbacks
  @impl true
  def init(state) do
    # In a real project, we might load initial state from PostgreSQL here.
    # For this high-throughput scenario, we assume sessions are ephemeral.
    {:ok, state}
  end

  @impl true
  def handle_cast({:track_event, %TrackEvent{session_id: sid, user_id: uid, type: type, payload: payload}}, state) do
    # Here, we could have complex business logic.
    # For example, check against the current state from PostgreSQL
    # to prevent duplicate or out-of-order events.
    # For simplicity, we directly generate the event.
    
    event = %EventTracked{
      session_id: sid,
      user_id: uid,
      event_type: type,
      properties: payload,
      timestamp: DateTime.utc_now() |> DateTime.to_unix()
    }

    # Publish the event to our application-wide event bus
    # This is a critical decoupling point. The aggregate's job is done.
    EventBus.publish(:events_for_projection, event)
    
    # A common mistake is to do blocking IO here like writing to ClickHouse.
    # That would kill the aggregate's throughput. Always dispatch and move on.
    {:noreply, state}
  end

  defp via_tuple(session_id), do: {:via, Registry, {Metrics.SessionRegistry, session_id}}
end

这里的 EventBus 是一个简单的 Elixir Registry,用于解耦事件的生产者和消费者。

2. 数据投影:从 Elixir 到 ClickHouse

这是整个管道中最具挑战性的部分。我们需要一个健壮的消费者,能够批量、异步地将事件写入 ClickHouse,并处理可能的网络故障和数据库异常。

首先,ClickHouse 的表结构设计至关重要。一个糟糕的 schema 会让所有优化都付之东流。

-- ClickHouse DDL
CREATE TABLE metrics.events (
    `event_date` Date,
    `timestamp` DateTime,
    `event_type` LowCardinality(String),
    `session_id` UUID,
    `user_id` String,
    `properties` String -- Storing as JSON string
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_type, timestamp, user_id)
SETTINGS index_granularity = 8192;

关键设计点:

  • ENGINE = MergeTree(): 这是 ClickHouse 性能的核心。
  • PARTITION BY toYYYYMM(event_date): 按月分区,便于管理和删除旧数据。
  • ORDER BY (event_type, timestamp, user_id): 主键(排序键)的选择决定了查询性能。将低基数的 event_type 放在前面可以极大提升过滤性能。
  • LowCardinality(String): 对于基数不高的字符串(如事件类型),这个优化可以显著减少存储空间和提升查询速度。

现在是 Elixir 的 Projector 实现。我们使用 GenStage 来构建一个带背压处理能力的数据处理管道。

# lib/metrics/projection/clickhouse_projector.ex
defmodule Metrics.Projection.ClickhouseProjector do
  use GenStage
  require Logger

  # A batching and flushing mechanism is crucial for performance.
  # Writing events one-by-one to ClickHouse would be disastrous.
  @batch_size 5000
  @flush_interval_ms 2000 # 2 seconds

  def start_link(_) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @impl true
  def init(:ok) do
    Logger.info("Starting ClickHouseProjector...")
    # Subscribe to the event bus
    {:producer_consumer, %{buffer: [], timer: nil}, subscribe_to: [{Metrics.EventBus, selector: :events_for_projection}]}
  end

  @impl true
  def handle_events(events, _from, state) do
    new_buffer = state.buffer ++ events
    
    if count(new_buffer) >= @batch_size do
      flush(new_buffer)
      # Reset timer after a flush
      cancel_timer(state.timer)
      new_timer = new_timer()
      {:noreply, [], %{buffer: [], timer: new_timer}}
    else
      # If buffer is not full, ensure a timer is running for periodic flush
      timer = state.timer || new_timer()
      {:noreply, [], %{state | buffer: new_buffer, timer: timer}}
    end
  end

  @impl true
  def handle_info(:flush, state) do
    flush(state.buffer)
    {:noreply, [], %{buffer: [], timer: new_timer()}}
  end

  defp flush([]), do: :ok
  defp flush(events) do
    Logger.debug("Flushing #{length(events)} events to ClickHouse.")
    # The 'clickhousex' library supports inserting a list of maps/tuples.
    # It handles serialization efficiently.
    data_to_insert = Enum.map(events, &event_to_tuple/1)
    
    query = "INSERT INTO metrics.events (event_date, timestamp, event_type, session_id, user_id, properties) VALUES"

    # In a real project, error handling here is critical.
    # What if ClickHouse is down? The GenStage's supervisor should have a
    # restart strategy, and we might need a dead-letter queue.
    case Clickhousex.query(:clickhouse_pool, query, data_to_insert) do
      {:ok, _} -> :ok
      {:error, reason} -> 
        Logger.error("Failed to flush events to ClickHouse: #{inspect(reason)}")
        # Simple retry or more complex backoff strategy should be implemented.
    end
  end

  defp event_to_tuple(event) do
    dt = DateTime.from_unix!(event.timestamp)
    date = Date.from_erl!(DateTime.to_erl(dt) |> elem(0))
    
    {
      date,
      dt,
      event.event_type,
      event.session_id,
      event.user_id,
      Jason.encode!(event.properties)
    }
  end

  defp new_timer, do: Process.send_after(self(), :flush, @flush_interval_ms)
  defp cancel_timer(nil), do: :ok
  defp cancel_timer(timer), do: Process.cancel_timer(timer)
end

这个 ClickhouseProjector 是生产级的核心。它解决了批量写入、定时刷新和基本的日志记录。在真实项目中,错误处理部分需要扩展,例如集成一个指数退避重试库。

3. 查询 API:释放 ClickHouse 的力量

查询 API 相对简单,其核心是构建正确的 SQL 查询并将其发送到 ClickHouse。

# lib/metrics_web/controllers/query_controller.ex
defmodule MetricsWeb.QueryController do
  use MetricsWeb, :controller

  def funnel_analysis(conn, %{"steps" => steps}) when is_list(steps) do
    # A common pitfall is SQL injection. Always sanitize and validate inputs.
    # Here we assume steps are controlled strings.
    sql = """
    SELECT 
        step, 
        count(user_id) as total_users
    FROM (
        SELECT 
            user_id,
            arrayJoin(
                arrayFilter(
                    (x, y) -> y > 0, 
                    [#{Enum.map(steps, &"'#{&1}'") |> Enum.join(", ")}],
                    sequenceMatch('(?1).*(?2).*')
                        (timestamp, event_type = ?1, event_type = ?2)
                )
            ) as step
        FROM metrics.events
        WHERE event_date >= today() - 7
        GROUP BY user_id
    )
    GROUP BY step
    ORDER BY arrayFirstIndex(x -> x = step, [#{Enum.map(steps, &"'#{&1}'") |> Enum.join(", ")}])
    """

    case Clickhousex.query(:clickhouse_pool, sql, []) do
      {:ok, %{rows: rows}} ->
        # Format rows into a more friendly JSON structure
        json(conn, %{data: format_funnel(rows)})
      {:error, reason} ->
        conn
        |> put_status(:internal_server_error)
        |> json(%{error: "Query failed: #{inspect(reason)}"})
    end
  end
end

这个漏斗分析查询展示了 ClickHouse 的强大函数,如 sequenceMatch,它可以在用户事件序列中查找特定模式,这是传统 RDBMS 极难高效实现的。

4. Jenkins 自动化流水线

最后,我们需要一个 Jenkinsfile 来把所有部分串联起来,实现一键部署。

// Jenkinsfile
pipeline {
    agent any

    environment {
        // Use credentials store for sensitive data
        DOCKER_REGISTRY_URL = "your.registry.com"
        APP_NAME = "metrics_pipeline"
    }

    stages {
        stage('Checkout') {
            steps {
                checkout scm
            }
        }

        stage('Build & Test') {
            steps {
                script {
                    // Use a Docker container for a consistent build environment
                    docker.image('elixir:1.14-alpine').inside {
                        sh 'mix local.rebar --force'
                        sh 'mix local.hex --force'
                        sh 'mix deps.get'
                        sh 'mix test'
                        sh 'MIX_ENV=prod mix release'
                    }
                }
            }
        }

        stage('Build Docker Image') {
            steps {
                script {
                    def releaseVersion = sh(script: 'git rev-parse --short HEAD', returnStdout: true).trim()
                    def imageName = "${DOCKER_REGISTRY_URL}/${APP_NAME}:${releaseVersion}"
                    
                    // The release is self-contained. The Docker image can be very minimal.
                    sh """
                    docker build -t ${imageName} .
                    """
                }
            }
        }

        stage('Push Docker Image') {
            steps {
                script {
                    def releaseVersion = sh(script: 'git rev-parse --short HEAD', returnStdout: true).trim()
                    def imageName = "${DOCKER_REGISTRY_URL}/${APP_NAME}:${releaseVersion}"
                    docker.withRegistry("https://#{DOCKER_REGISTRY_URL}", 'docker-registry-credentials') {
                        sh "docker push ${imageName}"
                    }
                }
            }
        }

        stage('Apply DB Schemas') {
            // This stage is critical and requires careful handling
            // to avoid breaking production.
            parallel {
                stage('Apply PostgreSQL Schema') {
                    steps {
                        // This step should run migration scripts for the command side DB.
                        // Ideally, use a dedicated migration tool.
                        // For simplicity, we assume a script exists.
                        sh './scripts/migrate_postgres.sh'
                    }
                }
                stage('Apply ClickHouse Schema') {
                    steps {
                        // ClickHouse schema changes are not as straightforward.
                        // They often involve creating a new table and swapping it.
                        // This script should contain the necessary DDL commands.
                        sh 'cat ./db/clickhouse_schema.sql | clickhouse-client --host <clickhouse_host> --query -'
                    }
                }
            }
        }
        
        stage('Deploy to Production') {
            steps {
                // This could be a shell script that SSHs into servers and runs `docker-compose up`
                // or a more sophisticated deployment using Kubernetes manifests.
                sh './scripts/deploy_production.sh'
            }
        }
    }

    post {
        always {
            // Clean up workspace
            cleanWs()
        }
        success {
            echo 'Deployment Successful'
            // Send notification to Slack/Teams
        }
        failure {
            echo 'Deployment Failed'
            // Send failure notification
        }
    }
}

方案的局限性与未来迭代

这套架构并非银弹。首先,它引入了最终一致性。从事件产生到它在 ClickHouse 中可供查询,存在数秒的延迟(取决于批处理大小和间隔)。对于需要强一致性的读后写场景,此方案不适用。运营团队必须理解并接受这种数据延迟。

其次,运维复杂性增加了。我们现在需要维护一个 Elixir 应用集群、一个 PostgreSQL 实例和一个 ClickHouse 集群。监控、日志和告警必须覆盖所有这些组件,这对 SRE 团队提出了更高的要求。

未来的迭代方向可以包括:

  1. 引入 Kafka 或 Pulsar: 用一个真正的分布式日志系统替换应用内的 EventBus,可以提供更强的持久性保证、事件重放能力,并进一步解耦服务。这是解决 projector 故障后状态恢复问题的根本方法。
  2. 服务拆分: 如果命令端的业务逻辑变得非常复杂,可以将 SessionAggregate 拆分为一个独立的微服务,使其可以独立扩展。
  3. ClickHouse 集群化与副本: 为了实现高可用和更高的查询并发,需要将单点 ClickHouse 扩展为带副本和分片的集群。
  4. 更精细化的部署: 在 Jenkins 流水线中引入蓝绿部署或金丝雀发布策略,以降低发布风险。同时,使用 Terraform 或 Ansible 管理基础设施,实现真正的基础设施即代码。

  目录