Elixir Phoenix Channels 驱动的 WebRTC SFU 信令服务器与 Caddy TURN 服务集成实践


构建一个简单的双人 WebRTC 视频通话应用并不复杂。浏览器提供了 RTCPeerConnection API,似乎一切都水到渠成。然而,当需求从两人 P2P 扩展到多人会议,并且需要确保连接能在各种复杂的网络环境(特别是对称 NAT)下稳定建立时,真正的挑战才开始浮现。一个纯粹的 P2P 网状(Mesh)拓扑结构,每个参与者都需要与其他所有人建立连接,连接数会以 O(n²) 的速度增长,这会迅速耗尽客户端的上行带宽和 CPU 资源。

解决方案是引入一个中心化的媒体服务器,通常是 SFU (Selective Forwarding Unit)。SFU 接收每个参与者的媒体流,然后根据需要将其转发给房间内的其他参与者。客户端只需维持与 SFU 的一个上行连接和一个下行连接,极大地降低了客户端的负担。

但 SFU 只解决了媒体流转发的问题,还有两个核心问题需要处理:

  1. 信令 (Signaling): 参与者之间如何交换 SDP (Session Description Protocol) 和 ICE (Interactive Connectivity Establishment) 候选地址?这需要一个低延迟、高并发的消息通道。
  2. NAT 穿透 (NAT Traversal): 当参与者位于严格的防火墙或 NAT 之后时,P2P 连接可能无法直接建立。需要 STUN 服务器来发现公网地址,更重要的是,需要 TURN 服务器作为媒体流的中继。

本次构建日志的目标,就是从零开始,搭建一个生产级的、可扩展的 WebRTC 多人通信基础设施。技术选型的核心考量是稳定性、可维护性和极致的并发性能。

  • 信令服务器: Elixir + Phoenix Framework。BEAM 虚拟机的并发模型(轻量级进程、抢占式调度)和 Phoenix Channels 提供的 WebSocket 抽象,是构建高并发、有状态信令服务器的理想选择。相比 Node.js 的单线程事件循环,BEAM 在处理成千上万个并发 WebSocket 连接时,CPU 和内存的利用更加均衡,且容错性(Supervision Trees)是其与生俱来的优势。
  • 客户端: Next.js。一个成熟的 React 框架,足以应对 WebRTC 客户端复杂的 UI 和状态管理。
  • Web 服务器 & NAT 穿透: Caddy。这并非传统的 Nginx/Apache 选项。选择 Caddy 的原因有三:首先,其自动化的 HTTPS 功能对于 WebRTC 是硬性要求;其次,它拥有极为出色的反向代理和 WebSocket 支持;最后,也是最关键的一点,Caddy 可以通过标准模块或插件直接作为 TURN 服务器运行,将 Web 网关和 NAT 穿透中继服务整合在一起,极大地简化了部署和运维。

我们的架构图如下:

graph TD
    subgraph Browser Clients
        ClientA[Next.js Client A]
        ClientB[Next.js Client B]
        ClientC[Next.js Client C]
    end

    subgraph Infrastructure[Caddy Server]
        Caddy_HTTPS[HTTPS Termination]
        Caddy_Proxy[WebSocket Reverse Proxy]
        Caddy_TURN[TURN Relay]
        Caddy_Static[Static File Serving]
    end

    subgraph Backend
        Phoenix_App[Elixir/Phoenix SFU Signaling Server]
    end

    ClientA -- HTTPS/WSS --> Caddy_HTTPS
    ClientB -- HTTPS/WSS --> Caddy_HTTPS
    ClientC -- HTTPS/WSS --> Caddy_HTTPS

    Caddy_HTTPS -- Serves Next.js build --> Caddy_Static
    Caddy_Static --> ClientA
    Caddy_Static --> ClientB
    Caddy_Static --> ClientC

    Caddy_HTTPS -- Proxies /socket --> Caddy_Proxy
    Caddy_Proxy -- WebSocket --> Phoenix_App

    ClientA -- TURN (UDP/TCP) --> Caddy_TURN
    ClientB -- TURN (UDP/TCP) --> Caddy_TURN
    ClientC -- TURN (UDP/TCP) --> Caddy_TURN
    Caddy_TURN -- Relays Media --> Caddy_TURN

第一步:构建 Elixir 信令核心

我们的 Elixir 应用不处理媒体流,只负责信令交换。它需要管理房间、参与者以及在他们之间广播 WebRTC 信令消息。

首先,创建一个新的 Phoenix 项目:

mix phx.new webrtc_sfu --no-ecto --no-mailer --no-dashboard
cd webrtc_sfu

我们需要一个 RoomChannel 来处理所有与特定房间相关的 WebSocket 通信。

lib/webrtc_sfu_web/channels/room_channel.ex:

defmodule WebrtcSfuWeb.RoomChannel do
  use WebrtcSfuWeb, :channel
  require Logger

  # 客户端成功加入房间时调用
  def join("room:" <> room_id, payload, socket) do
    # 在真实项目中,这里应该有认证和授权逻辑
    Logger.info("Peer #{inspect(socket.assigns.peer_id)} joining room #{room_id} with payload #{inspect(payload)}")

    # 将新加入的 peer ID 广播给房间内的其他人
    broadcast_from!(socket, "peer:joined", %{peer_id: socket.assigns.peer_id})

    # 向当前 socket 发送房间内已存在的 peer 列表
    # 这里我们使用一个简单的 Agent 来跟踪房间状态
    # 在生产环境中,更健壮的选择是 GenServer 或分布式注册表
    peers_in_room = RoomState.get_peers(room_id)
    send(self(), {:after_join, peers_in_room})

    # 将当前 peer 添加到房间状态中
    RoomState.add_peer(room_id, socket.assigns.peer_id)

    {:ok, %{peers: peers_in_room}, assign(socket, :room_id, room_id)}
  end

  # 处理客户端发送的信令消息
  def handle_in("signal:" <> event, payload, socket) do
    # 这里的 event 会是 "offer", "answer", "candidate" 等
    # 我们需要将这个消息转发给 payload 中指定的目标 peer
    target_peer_id = payload["to"]
    room_id = socket.assigns.room_id

    Logger.debug(
      "Relaying signal '#{event}' from #{socket.assigns.peer_id} to #{target_peer_id} in room #{room_id}"
    )

    # 将消息直接推送到目标 peer 的 channel 进程
    # `broadcast/3` 是一个方便的工具,可以向特定 topic 发送消息
    # 我们为每个 peer 创建一个唯一的 topic
    WebrtcSfuWeb.Endpoint.broadcast(
      "peer:#{target_peer_id}",
      "signal:#{event}",
      %{
        from: socket.assigns.peer_id,
        # 消息体原样转发
        body: payload["body"]
      }
    )

    {:noreply, socket}
  end

  # 当客户端断开连接时,通知房间里的其他人
  def terminate(reason, socket) do
    Logger.warn("Peer #{socket.assigns.peer_id} leaving room. Reason: #{inspect(reason)}")
    room_id = socket.assigns.room_id
    peer_id = socket.assigns.peer_id

    if room_id do
      RoomState.remove_peer(room_id, peer_id)
      broadcast_from!(socket, "peer:left", %{peer_id: peer_id})
    end

    :ok
  end

  # 在 join 之后异步发送房间成员列表
  def handle_info({:after_join, peers}, socket) do
    push(socket, "room:peers", %{peers: peers})
    {:noreply, socket}
  end
end

为了管理每个房间的参与者列表,我们使用一个简单的 AgentAgent 是 Elixir 中用于维护状态的简单进程抽象。

lib/webrtc_sfu/room_state.ex:

defmodule WebrtcSfu.RoomState do
  use Agent

  def start_link(_opts) do
    # 初始状态是一个空的 map,key 是 room_id, value 是 peer_id 的 Set
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  # 向房间添加一个 peer
  def add_peer(room_id, peer_id) do
    Agent.update(__MODULE__, fn state ->
      # Map.update/4 提供了原子性的更新操作
      Map.update(state, room_id, MapSet.new([peer_id]), &MapSet.put(&1, peer_id))
    end)
  end

  # 从房间移除一个 peer
  def remove_peer(room_id, peer_id) do
    Agent.get_and_update(__MODULE__, fn state ->
      case Map.get(state, room_id) do
        nil ->
          {nil, state} # 房间不存在,什么都不做
        peers ->
          new_peers = MapSet.delete(peers, peer_id)
          # 如果房间空了,就从 state map 中移除
          new_state =
            if MapSet.size(new_peers) == 0 do
              Map.delete(state, room_id)
            else
              Map.put(state, room_id, new_peers)
            end
          {peer_id, new_state}
      end
    end)
  end

  # 获取房间的所有 peer
  def get_peers(room_id) do
    Agent.get(__MODULE__, fn state ->
      # 如果房间不存在,返回空列表
      Map.get(state, room_id, MapSet.new()) |> MapSet.to_list()
    end)
  end
end

别忘了在 application.ex 中启动 RoomState Agent:

# lib/webrtc_sfu/application.ex
def start(_type, _args) do
  children = [
    # ... Phoenix default children
    WebrtcSfu.RoomState
  ]
# ...

最后,在 user_socket.ex 中配置 channel 路由和连接时的 peer ID 分配。

lib/webrtc_sfu_web/channels/user_socket.ex:

defmodule WebrtcSfuWeb.UserSocket do
  use Phoenix.Socket

  # 房间 channel
  channel "room:*", WebrtcSfuWeb.RoomChannel
  # 每个 peer 监听的私有 channel,用于接收定向消息
  channel "peer:*", WebrtcSfuWeb.PeerChannel

  # 连接时进行身份验证
  def connect(_params, socket, _connect_info) do
    # 在生产环境中,这里应该从 token 中解析 user_id
    # 为简单起见,我们生成一个唯一的 peer_id
    peer_id = "peer_" <> Ecto.UUID.generate()
    {:ok, assign(socket, :peer_id, peer_id)}
  end

  # Socket ID 用于标识连接
  def id(socket), do: "users_socket:#{socket.assigns.peer_id}"
end

# 我们还需要一个非常简单的 PeerChannel 来让客户端可以监听自己的 topic
defmodule WebrtcSfuWeb.PeerChannel do
  use WebrtcSfuWeb, :channel

  # 验证客户端是否只能加入自己的 topic
  def join("peer:" <> requested_peer_id, _payload, socket) do
    if requested_peer_id == socket.assigns.peer_id do
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end
end

至此,我们的 Elixir 信令服务器已经准备就绪。它能够处理房间加入、离开,并在参与者之间中继信令消息。

第二步:实现 Next.js 客户端

客户端的逻辑相对复杂,需要管理多个 RTCPeerConnection 实例,处理与信令服务器的交互,以及渲染视频元素。我们将核心逻辑封装在一个 React Hook (useWebRTC) 中。

首先,安装 Phoenix.js 客户端库:

npm install phoenix

创建一个 hooks/useWebRTC.js 文件。这个 hook 会非常庞大,我们分块来看。

Hook 签名和状态管理:

// hooks/useWebRTC.js
import { useState, useEffect, useRef, useCallback } from 'react';
import { Socket } from 'phoenix';

export function useWebRTC(roomId) {
  const [peerId, setPeerId] = useState(null);
  const [peers, setPeers] = useState({}); // key: peerId, value: { pc, stream, ... }
  const [localStream, setLocalStream] = useState(null);

  const socketRef = useRef(null);
  const roomChannelRef = useRef(null);
  const peerChannelRef = useRef(null);
  const peerConnectionsRef = useRef({}); // 存储 RTCPeerConnection 实例

  // ... 后面将填充 effect 和回调函数
}

连接到 Phoenix Channel:

// ... inside useWebRTC hook
useEffect(() => {
  const initializeSocket = async () => {
    // 启动本地媒体流
    try {
      const stream = await navigator.mediaDevices.getUserMedia({
        video: true,
        audio: true,
      });
      setLocalStream(stream);
    } catch (error) {
      console.error("Error accessing media devices.", error);
      // 处理用户拒绝授权或没有设备的情况
      return;
    }

    // 创建并连接 Socket
    const socket = new Socket("/socket", { params: {} });
    socket.connect();
    socketRef.current = socket;

    socket.onOpen(() => {
      const assignedPeerId = socket.connection.transport.transport.conn.socket.assigns.peer_id;
      setPeerId(assignedPeerId);

      // 加入房间 Channel
      const roomChannel = socket.channel(`room:${roomId}`, {});
      roomChannelRef.current = roomChannel;

      roomChannel.join()
        .receive("ok", ({ peers: existingPeers }) => {
          console.log(`Joined room ${roomId} successfully. Existing peers:`, existingPeers);
          // 为已存在的 peer 创建连接
          existingPeers.forEach(pId => createPeerConnection(pId, true, stream));
        })
        .receive("error", resp => { console.error("Unable to join room", resp); });

      // 加入私有 Peer Channel
      const peerChannel = socket.channel(`peer:${assignedPeerId}`, {});
      peerChannelRef.current = peerChannel;
      peerChannel.join();
      
      // 设置信令消息监听
      setupSignalingListeners(peerChannel, stream);
      // 设置房间事件监听
      setupRoomEventListeners(roomChannel, stream);
    });

    socket.onError(() => console.error("Socket error"));
  };

  initializeSocket();

  return () => {
    // 清理逻辑
    socketRef.current?.disconnect();
    Object.values(peerConnectionsRef.current).forEach(pc => pc.close());
  };
}, [roomId]);

核心 WebRTC 逻辑 - 创建连接和信令处理:

// ... inside useWebRTC hook, this function is crucial
const createPeerConnection = useCallback((targetPeerId, isInitiator, stream) => {
  if (targetPeerId === peerId) return; // 不和自己建连
  console.log(`Creating PeerConnection to ${targetPeerId}, initiator: ${isInitiator}`);

  // 这里的 iceServers 配置是关键,指向我们的 Caddy TURN 服务器
  const configuration = {
    iceServers: [
      { urls: 'stun:stun.l.google.com:19302' }, // 公共 STUN
      {
        urls: 'turn:your-domain.com:3478', // 替换为你的域名
        username: 'your-turn-username',
        credential: 'your-turn-password',
      },
    ],
  };

  const pc = new RTCPeerConnection(configuration);
  peerConnectionsRef.current[targetPeerId] = pc;

  // 将本地媒体轨道添加到连接
  stream.getTracks().forEach(track => pc.addTrack(track, stream));

  // 监听远端媒体流
  pc.ontrack = (event) => {
    console.log(`Received remote track from ${targetPeerId}`);
    setPeers(prev => ({
      ...prev,
      [targetPeerId]: { ...prev[targetPeerId], stream: event.streams[0] }
    }));
  };

  // 监听 ICE candidate
  pc.onicecandidate = (event) => {
    if (event.candidate) {
      // 发送 ICE candidate 给对方
      roomChannelRef.current.push(`signal:candidate`, {
        to: targetPeerId,
        body: { candidate: event.candidate },
      });
    }
  };

  // 如果是发起方,创建 offer
  if (isInitiator) {
    pc.createOffer()
      .then(offer => pc.setLocalDescription(offer))
      .then(() => {
        roomChannelRef.current.push(`signal:offer`, {
          to: targetPeerId,
          body: { sdp: pc.localDescription },
        });
      })
      .catch(e => console.error("Error creating offer", e));
  }

  setPeers(prev => ({ ...prev, [targetPeerId]: { pc }}));

}, [peerId]);

const setupSignalingListeners = useCallback((channel, stream) => {
  // 监听定向的信令消息
  channel.on("signal:offer", ({ from, body }) => {
    console.log(`Received offer from ${from}`);
    const pc = createPeerConnection(from, false, stream);
    pc.setRemoteDescription(new RTCSessionDescription(body.sdp))
      .then(() => pc.createAnswer())
      .then(answer => pc.setLocalDescription(answer))
      .then(() => {
        roomChannelRef.current.push(`signal:answer`, {
          to: from,
          body: { sdp: pc.localDescription },
        });
      })
      .catch(e => console.error("Error handling offer", e));
  });

  channel.on("signal:answer", ({ from, body }) => {
    console.log(`Received answer from ${from}`);
    const pc = peerConnectionsRef.current[from];
    if (pc) {
      pc.setRemoteDescription(new RTCSessionDescription(body.sdp))
       .catch(e => console.error("Error setting remote description", e));
    }
  });

  channel.on("signal:candidate", ({ from, body }) => {
    const pc = peerConnectionsRef.current[from];
    if (pc) {
      pc.addIceCandidate(new RTCIceCandidate(body.candidate))
       .catch(e => console.error("Error adding ICE candidate", e));
    }
  });
}, [createPeerConnection]);

const setupRoomEventListeners = useCallback((channel, stream) => {
  // 监听房间事件
  channel.on("peer:joined", ({ peer_id: newPeerId }) => {
    console.log(`Peer ${newPeerId} joined the room.`);
    // 新 peer 加入,我们作为发起方创建连接
    createPeerConnection(newPeerId, true, stream);
  });

  channel.on("peer:left", ({ peer_id: leavingPeerId }) => {
    console.log(`Peer ${leavingPeerId} left the room.`);
    peerConnectionsRef.current[leavingPeerId]?.close();
    delete peerConnectionsRef.current[leavingPeerId];
    setPeers(prev => {
      const newPeers = { ...prev };
      delete newPeers[leavingPeerId];
      return newPeers;
    });
  });
}, [createPeerConnection]);

// ... 返回状态和函数
return { peerId, localStream, peers };

这个 Hook 已经相当完整。在 Next.js 页面组件中使用它就变得非常简单:

// pages/room/[roomId].js
import { useRouter } from 'next/router';
import { useWebRTC } from '../../hooks/useWebRTC';
import { useEffect, useRef } from 'react';

const Video = ({ stream }) => {
  const ref = useRef();
  useEffect(() => {
    if (stream) ref.current.srcObject = stream;
  }, [stream]);
  return <video ref={ref} autoPlay muted playsInline style={{ width: 300 }} />;
};

export default function Room() {
  const router = useRouter();
  const { roomId } = router.query;
  const { localStream, peers } = useWebRTC(roomId);

  if (!roomId) return <div>Loading room...</div>;

  return (
    <div>
      <h1>Room: {roomId}</h1>
      <div style={{ display: 'flex', flexWrap: 'wrap' }}>
        <div style={{ margin: 10 }}>
          <h3>You</h3>
          {localStream && <Video stream={localStream} />}
        </div>
        {Object.entries(peers).map(([peerId, peer]) => (
          peer.stream && (
            <div key={peerId} style={{ margin: 10 }}>
              <h3>{peerId}</h3>
              <Video stream={peer.stream} />
            </div>
          )
        ))}
      </div>
    </div>
  );
}

第三步:配置 Caddy 作为粘合层

现在,我们需要配置 Caddy 来将前端、后端和 TURN 服务串联起来。这就是 Caddy 优雅之处的体现。一个 Caddyfile 就能搞定一切。

假设我们的 Next.js 应用已经通过 next buildnext export 导出了静态文件到 out 目录,Elixir Phoenix 应用监听在 localhost:4000

Caddyfile:

# 你的域名,Caddy 会自动为其申请和续签 Let's Encrypt 证书
your-domain.com {
    # 启用 TURN 服务
    # 这是一个标准模块,无需额外插件
    turn {
        # 监听 UDP 端口 3478
        # Caddy 会自动处理 TCP 监听
        udp_port 3478
        
        # 定义 TURN 服务的 realm
        realm your-domain.com
        
        # 定义静态用户凭证
        # 在生产环境中,应该使用 user_auth_api 配置一个 API 端点来动态验证用户
        # 这可以与你的应用后端集成,实现临时凭证
        users your-turn-username=your-turn-password
    }

    # 根目录指向 Next.js 的静态导出目录
    root * /path/to/your/nextjs/project/out

    # 反向代理 WebSocket 连接到 Phoenix 服务器
    # 这是一个长连接,需要设置健康检查参数
    reverse_proxy /socket/* localhost:4000 {
        # 明确这是一个 WebSocket 代理
        header_up Host {host}
        header_up Upgrade {header.Upgrade}
        header_up Connection {header.Connection}
        
        # 对 WebSocket 连接进行健康检查
        health_uri      ws://localhost:4000/socket
        health_interval 30s
        health_timeout  5s
    }

    # 处理 Next.js 的客户端路由,所有未匹配到文件的请求都返回 index.html
    # Caddy v2.7+ 使用 try_files
    @not_file {
        not path /api/* /_next/*
        not file
    }
    rewrite @not_file /{path} /

    # 提供静态文件服务
    file_server
    
    # 日志配置
    log {
        output file /var/log/caddy/access.log {
            roll_size 10mb
            roll_keep 5
        }
        format json
    }
}

这份配置做了几件关键的事情:

  1. 自动 HTTPS: 只需要提供域名,Caddy 负责处理 TLS 证书,这是 WebRTC 的前提。
  2. TURN 服务: turn 指令块用几行代码就启动了一个功能完备的 TURN 服务器,解决了最棘手的 NAT 穿透问题。我们使用了静态凭证,这在生产环境中需要替换为动态验证方案。
  3. WebSocket 代理: reverse_proxy /socket/* 将所有信令流量安全地代理到后端的 Phoenix 应用。
  4. 静态文件服务: 为 Next.js 的前端应用提供服务,并正确处理了客户端路由。

局限性与未来迭代方向

我们已经构建了一个功能性的、架构上合理的 WebRTC SFU 基础设施。然而,在投入生产之前,还有几个方面需要深化:

  1. 媒体流处理: 当前的 Elixir 应用只是一个“信令 SFU”。它不转发媒体流。一个真正的 SFU 需要接收 WebRTC 媒体流,并将其转发给其他人。在 Elixir 生态中,Membrane Framework 是实现这一目标的强大工具,它提供了处理音视频流的底层能力,但集成它会带来新的复杂性。

  2. TURN 服务的安全性和可扩展性: Caddy 内置的 TURN 服务器对于中小型应用来说非常方便。但它使用的是静态凭证,安全性不足。生产级方案需要实现 STUN/TURN REST API,让 Phoenix 应用可以为每个会话动态生成有时效性的 TURN 凭证。此外,当流量巨大时,可能需要部署一个专用的、可水平扩展的 TURN 服务器集群(如 coturn)。

  3. 信令服务器的健壮性: 虽然 Agent 简单易用,但在分布式 Elixir 环境下,房间状态的管理需要更复杂的解决方案,例如使用 HordePhoenix.PubSub 提供的分布式注册表,以确保节点故障时房间状态不会完全丢失。

  4. 客户端体验优化: 客户端代码可以进一步优化,例如实现“Simulcast”(联播)或 SVC(可伸缩视频编码),允许 SFU 根据接收端的网络状况转发不同质量的视频流。同时,需要更精细的错误处理、连接状态监控和重连逻辑。

这个项目展示了如何将 Caddy、Elixir 和 Next.js 这三个现代技术栈的优势结合起来,解决一个复杂且有价值的工程问题。每项技术都在其最擅长的领域发挥作用,最终形成一个内聚、高效且易于维护的系统。


  目录