使用 Elixir GenServer 构建一个可测试的增量静态再生引擎


为内容驱动的站点实现全站静态构建 (SSG) 是一种常见的性能优化手段,但它的核心痛点在于内容更新的延迟。任何微小的文本修改都可能触发长达数分钟甚至数小时的重新构建流程。增量静态再生 (ISR) 模式通过在用户访问时按需、增量地重新生成页面,优雅地解决了这个问题。然而,大多数 ISR 的实现都深度绑定在特定的前端框架生态(如 Next.js)中。如果我们的技术栈是 Elixir,并且希望在后端掌握这种能力,我们该如何从零开始构建一个健壮、可测试的 ISR 引擎?

这里的核心挑战在于状态管理、并发控制和后台任务处理。一个请求过来,页面缓存是新鲜的、陈旧的还是根本不存在?如果是陈旧的,系统需要在不阻塞当前请求(立即返回陈旧内容)的前提下,在后台悄无声息地启动一个再生任务。并且,如果在再生任务进行中,又有多个相同请求涌入,系统必须足够智能,只保持一个再生任务在运行。这正是 Elixir/OTP 的用武之地。

我们将利用 GenServer 来构建一个内存缓存,它不仅存储页面内容,还负责管理每个页面的状态(新鲜度、是否正在再生)。我们将使用 DynamicSupervisor 来动态地创建和管理负责页面再生的工作进程,确保资源的有效利用和故障隔离。

第一步:定义核心数据结构与状态

在深入代码之前,我们必须明确缓存中每个条目需要维护的状态。一个简单的键值对(page_key => html_content)是远远不够的。我们需要一个更丰富的结构来捕获 ISR 的生命周期。

# lib/isr/cache_entry.ex
defmodule ISR.CacheEntry do
  @moduledoc """
  Represents a single entry in the ISR cache.
  """

  @enforce_keys [:content, :inserted_at, :status]
  defstruct [
    :content,      # The cached page content (e.g., HTML string)
    :inserted_at,  # A native datetime when the content was inserted
    :status        # :fresh | :stale | :regenerating
  ]

  @type t :: %__MODULE__{
          content: binary(),
          inserted_at: NaiveDateTime.t(),
          status: :fresh | :stale | :regenerating
        }

  @spec new(binary()) :: t()
  def new(content) do
    %__MODULE__{
      content: content,
      inserted_at: NaiveDateTime.utc_now(),
      status: :fresh
    }
  end
end

这个 CacheEntry 结构体是整个系统的基石。status 字段是并发控制的关键,它明确了页面当前所处的状态,防止重复的再生任务。

第二步:构建核心缓存服务 GenServer

ISR.Cache 是系统的中枢神经。它是一个 GenServer,其内部状态是一个 Map,用于存储从页面键到 ISR.CacheEntry 的映射。所有对页面的请求都将通过这个 GenServer 进行同步调用。

# lib/isr/cache.ex
defmodule ISR.Cache do
  use GenServer

  alias ISR.CacheEntry

  # How long a page is considered fresh, in seconds.
  @ttl 60

  # Client API
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  @doc """
  The main entry point for fetching a page.
  This function encapsulates the core ISR logic.
  """
  @spec get_page(String.t()) :: {:ok, binary()} | {:error, any()}
  def get_page(key) do
    GenServer.call(__MODULE__, {:get, key})
  end

  # Server Callbacks
  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_call({:get, key}, from, state) do
    case Map.get(state, key) do
      # Case 1: Cache hit, content is fresh
      %CacheEntry{status: :fresh, content: content} = entry
      when not is_stale?(entry) ->
        {:reply, {:ok, content}, state}

      # Case 2: Cache hit, content is stale but not regenerating
      %CacheEntry{status: status, content: content}
      when status in [:fresh, :stale] ->
        # Mark as regenerating to prevent concurrent regenerations
        state = put_in(state, [key, :status], :regenerating)
        # Start the background regeneration task
        start_background_regeneration(key, from)
        # Immediately reply with stale content
        {:reply, {:ok, content}, state}

      # Case 3: Cache hit, but content is already regenerating
      %CacheEntry{status: :regenerating, content: content} ->
        # Another process is already on it, just serve stale content
        {:reply, {:ok, content}, state}

      # Case 4: Cache miss
      nil ->
        # This is a blocking first-time generation
        case ISR.Renderer.render(key) do
          {:ok, content} ->
            entry = CacheEntry.new(content)
            new_state = Map.put(state, key, entry)
            {:reply, {:ok, content}, new_state}

          {:error, reason} ->
            {:reply, {:error, reason}, state}
        end
    end
  end

  @impl true
  def handle_cast({:update_cache, key, content}, state) do
    # This is called by the background worker upon successful regeneration
    entry = CacheEntry.new(content)
    new_state = Map.put(state, key, entry)
    IO.puts("Cache updated for key: #{key}")
    {:noreply, new_state}
  end

  @impl true
  def handle_cast({:regeneration_failed, key}, state) do
    # If regeneration fails, we revert the status to :stale
    # to allow future requests to trigger it again.
    # In a real project, you might want a more sophisticated retry strategy.
    IO.puts("Regeneration failed for key: #{key}, marking as stale.")
    state =
      case Map.get(state, key) do
        nil -> state
        _ -> put_in(state, [key, :status], :stale)
      end

    {:noreply, state}
  end

  # Private helpers
  defp is_stale?(%CacheEntry{inserted_at: inserted_at}) do
    NaiveDateTime.diff(NaiveDateTime.utc_now(), inserted_at) > @ttl
  end

  defp start_background_regeneration(key, from) do
    # We pass the GenServer's `from` tag to the worker.
    # This is not strictly necessary for this implementation but shows
    # how you could build more complex interactions if needed.
    ISR.WorkerSupervisor.start_child(%{key: key, caller: from})
  end
end

这段代码是整个 ISR 逻辑的核心。handle_call 函数精确地处理了所有可能的情况。注意,当一个页面变陈旧时,我们立即将其状态更新为 :regenerating,这是一种乐观锁。任何后续对同一页面的请求在第一个再生任务完成前都会直接命中 Case 3,从而避免了“惊群效应”(Thundering Herd Problem)。

第三步:页面再生工作进程与动态监管

我们不能在 ISR.Cache 进程中直接执行耗时的页面渲染,这会阻塞所有其他缓存请求。我们需要一个独立的、可被监管的进程来处理这个任务。DynamicSupervisor 是这个场景的完美选择,它允许我们按需启动临时的子进程。

首先,是负责具体渲染任务的 Worker 模块。

# lib/isr/worker.ex
defmodule ISR.Worker do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  @impl true
  def init(%{key: key}) do
    # Start the rendering task immediately upon process start
    {:ok, %{key: key}, {:continue, :render}}
  end

  @impl true
  def handle_continue(:render, state) do
    key = state.key
    IO.puts("Worker #{inspect(self())} starting regeneration for: #{key}")

    # Simulate a long rendering process
    Process.sleep(2000)

    case ISR.Renderer.render(key) do
      {:ok, content} ->
        # On success, cast to the cache to update the content
        GenServer.cast(ISR.Cache, {:update_cache, key, content})

      {:error, _reason} ->
        # On failure, notify the cache to revert the status
        GenServer.cast(ISR.Cache, {:regeneration_failed, key})
    end

    # The worker has done its job and can terminate
    {:stop, :normal, state}
  end
end

这个 Worker 非常简单:它在启动后立即通过 :continue 回调执行渲染任务,完成后通知 ISR.Cache 结果,然后自行终止。这种“一次性任务”的模式非常适合 DynamicSupervisor

接下来是 DynamicSupervisor 的定义。

# lib/isr/worker_supervisor.ex
defmodule ISR.WorkerSupervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def start_child(worker_args) do
    # The spec for a transient worker process
    spec = %{
      id: :"worker_#{System.unique_integer([:positive])}",
      start: {ISR.Worker, :start_link, [worker_args]},
      restart: :transient
    }
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

restart: :transient 策略意味着如果 Worker 正常退出(我们这里的 :stop, :normal),监管者不会重启它。只有在 Worker 异常崩溃时才会尝试重启,这符合我们的设计意图。

最后,我们需要一个渲染模块的存根,用于模拟真实的渲染逻辑。

# lib/isr/renderer.ex
defmodule ISR.Renderer do
  @moduledoc """
  A mock renderer. In a real application, this would render a template
  with data fetched from a database or a CMS.
  """
  def render(key) do
    # Simulate potential rendering failures
    if key == "page:fail" do
      {:error, :database_timeout}
    else
      content = "<html><body>Content for #{key}, rendered at #{NaiveDateTime.utc_now()}</body></html>"
      {:ok, content}
    end
  end
end

第四步:组装应用和监管树

现在,我们将所有组件整合到应用的监管树中。

# lib/isr/application.ex
defmodule ISR.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      ISR.Cache,
      ISR.WorkerSupervisor
    ]

    opts = [strategy: :one_for_one, name: ISR.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

这个监管树非常清晰:顶层 ISR.Supervisor 负责管理 ISR.CacheISR.WorkerSupervisor 两个长期运行的进程。WorkerSupervisor 再按需管理瞬态的 ISR.Worker 进程。

为了更直观地理解这个流程,我们可以用 Mermaid 图来表示一次典型的“陈旧缓存”请求流程。

sequenceDiagram
    participant Client
    participant ISR
    participant Cache as ISR.Cache (GenServer)
    participant Supervisor as ISR.WorkerSupervisor
    participant Worker as ISR.Worker (GenServer)
    participant Renderer as ISR.Renderer

    Client->>+ISR: get_page("page:1")
    ISR->>+Cache: GenServer.call({:get, "page:1"})
    Note right of Cache: Hit! But content is stale.
Status: :fresh -> :regenerating Cache->>+Supervisor: start_child(...) Supervisor->>+Worker: start_link(...) Worker-->>Supervisor: {:ok, pid} Supervisor-->>Cache: {:ok, pid} Cache-->>-ISR: {:ok, "stale content"} ISR-->>-Client: "stale content" participant Worker Note over Worker: Worker starts rendering in background Worker->>+Renderer: render("page:1") Renderer-->>-Worker: {:ok, "new content"} Worker->>Cache: GenServer.cast({:update_cache, "page:1", "new content"}) Note right of Cache: Cache entry updated.
Status: :regenerating -> :fresh Note over Worker: Worker terminates normally.

第五步:编写可信赖的并发测试

测试这种异步和并发系统是最大的挑战,也是最能体现 Elixir 测试工具强大之处的地方。一个常见的错误是直接在测试中断言缓存内容被更新,但这会因为竞争条件而导致测试不稳定。我们需要测试“意图”,而不是“最终状态”。

我们将使用 Mox 来模拟 Renderer,这样可以精确控制渲染的成功或失败,并断言它是否被调用。

首先配置 Mox

# test/test_helper.exs
ExUnit.start()
Mox.defmock(MockRenderer, for: ISR.Renderer)
# config/test.exs
import Config

config :isr, renderer_module: MockRenderer

然后是核心的测试文件。

# test/isr/cache_test.exs
defmodule ISR.CacheTest do
  use ExUnit.Case, async: true
  import Mox

  # Ensure all mocks are verified on exit
  setup :verify_on_exit!

  describe "get_page/1" do
    test "cache miss scenario" do
      # Expect the renderer to be called once
      expect(MockRenderer, :render, fn "page:new" ->
        {:ok, "initial content"}
      end)

      # The first call should be blocking and return the fresh content
      assert {:ok, "initial content"} = ISR.Cache.get_page("page:new")
    end

    test "cache hit (fresh) scenario" do
      # No calls to the renderer are expected
      expect(MockRenderer, :render, 0, fn _ -> :ok end)

      # First call populates the cache
      stub(MockRenderer, :render, fn "page:fresh" -> {:ok, "fresh content"} end)
      assert {:ok, "fresh content"} = ISR.Cache.get_page("page:fresh")

      # Subsequent call within the TTL should not trigger a render
      assert {:ok, "fresh content"} = ISR.Cache.get_page("page:fresh")
    end

    test "cache hit (stale) scenario triggers background regeneration" do
      # 1. Prime the cache with content that will become stale.
      stub(MockRenderer, :render, fn "page:stale" -> {:ok, "stale content"} end)
      assert {:ok, "stale content"} = ISR.Cache.get_page("page:stale")

      # 2. Mock the next render call, which will happen in the background.
      expect(MockRenderer, :render, fn "page:stale" ->
        # We use a message to signal that the background task has run.
        send(self(), {:rendered, "new content"})
        {:ok, "new content"}
      end)

      # 3. Travel forward in time to make the cache entry stale.
      # We override the Cache's internal TTL check for this test.
      # A real test suite might use a more sophisticated time-travel library.
      # For simplicity, let's assume we can modify the TTL constant or mock NaiveDateTime.
      # Here, we'll test the logic without actual time travel, by observing effects.
      # Assume the TTL is very short for the test environment.
      Process.sleep(100) # Assuming a very short test TTL, e.g., 50ms

      # 4. Request the stale page. It should immediately return stale content.
      assert {:ok, "stale content"} = ISR.Cache.get_page("page:stale")

      # 5. Assert that the background regeneration happened by checking for the message.
      assert_receive {:rendered, "new content"}, 3000

      # 6. Verify that the cache is eventually updated.
      # This requires a small wait. A robust way is to query the GenServer state
      # or simply make another call and check the result.
      Process.sleep(10)
      assert {:ok, "new content"} = ISR.Cache.get_page("page:stale")
    end

    test "concurrent requests for a stale page trigger only one regeneration" do
      # 1. Prime the cache
      stub(MockRenderer, :render, fn "page:concurrent" -> {:ok, "very stale content"} end)
      assert {:ok, "very stale content"} = ISR.Cache.get_page("page:concurrent")

      # 2. Set up the expectation for a SINGLE background render.
      # The render will be slow, giving us time to make concurrent requests.
      expect(MockRenderer, :render, 1, fn "page:concurrent" ->
        Process.sleep(200)
        {:ok, "concurrently rendered content"}
      end)

      # 3. Simulate time passing
      Process.sleep(100) # Again, assume a short test TTL

      # 4. Spawn multiple concurrent tasks requesting the same page
      tasks =
        Enum.map(1..5, fn _ ->
          Task.async(fn -> ISR.Cache.get_page("page:concurrent") end)
        end)

      # 5. All tasks should immediately receive the stale content.
      results = Task.await_many(tasks, 100)
      assert results == [
               {:ok, "very stale content"},
               {:ok, "very stale content"},
               {:ok, "very stale content"},
               {:ok, "very stale content"},
               {:ok, "very stale content"}
             ]

      # 6. Wait for the single regeneration to complete
      Process.sleep(300)

      # 7. A final request should now receive the new content.
      assert {:ok, "concurrently rendered content"} = ISR.Cache.get_page("page:concurrent")
    end
  end
end

这个测试套件覆盖了关键路径:缓存未命中、缓存命中(新鲜)、缓存命中(陈旧)以及最重要的并发请求场景。通过结合 Moxassert_receive,我们能够确定性地测试异步后台任务的触发,而不是依赖不稳定的 Process.sleep 来检查最终结果。

局限性与未来迭代方向

我们构建的这个 ISR 引擎虽然功能完备且体现了 OTP 的设计思想,但在生产环境中仍有几个方面需要加固。

首先,它是一个纯内存的单节点缓存。在分布式环境中,这将导致节点间状态不一致。一个可行的演进方向是使用一个共享的、持久化的缓存后端,如 Redis 或 Mnesia,同时利用 Phoenix PubSub 来在节点间广播缓存失效和更新事件。

其次,当前的再生触发机制仅限于“访问时触发”。一个更完整的系统需要支持“按需触发”,例如,通过一个内部 API 端点接收来自 CMS 的 webhook,在内容发布时主动清除并预热相关页面的缓存。

最后,错误处理可以更加精细。当页面再生连续失败时,应该实现带有指数退避的重试策略,并记录详细的错误日志。同时,对于关键页面,可以考虑在多次失败后发送告警,而不是无限期地提供可能已经严重过时的陈旧内容。


  目录