团队里的数据分析师和业务人员对数据湖的访问需求越来越频繁,但他们面临的现实困境是,每次查询都需要在本地配置复杂的 Spark 或 Trino 客户端,或者在功能受限的 BI 工具和过于灵活但对非技术人员不友好的原生 Jupyter Notebook 之间挣扎。这直接导致了数据驱动决策的效率瓶颈。我们需要的是一个统一、稳定且对用户友好的 Web 查询界面,一个数据湖的“云端 IDE”。
初步构想是构建一个基于 React 的单页应用(SPA),UI 层面,Material-UI (MUI) 的组件丰富度和设计系统能保证我们快速构建出专业级的界面。真正的挑战在于后端:如何为每个用户提供一个隔离的、有状态的、支持长时间运行查询的执行环境?
一个简单的无状态 REST API 显然行不通。每次请求都需要重新初始化 Spark Session,这对于交互式探索是灾难性的。我们需要的是会话(Session)级别的状态保持。自然而然,我们想到了 Jupyter。但直接上一个完整的 JupyterHub 体系又过于笨重,我们并不需要 Notebook 的文件管理和完整UI,我们只需要它最核心的能力——可被远程调用的、语言无关的计算内核(Kernel)。
Jupyter Kernel Gateway 成了最终的技术选型。它将 Jupyter Kernel 的能力通过 REST 和 WebSocket API 暴露出来,完美契合我们的需求:轻量、专注、易于集成。前端通过 WebSocket 与后端唯一的 Kernel Gateway 建立长连接,为每个用户会话(Web App Session)启动一个专用的 Kernel 进程(例如 PySpark Kernel)。所有代码执行、状态更新、结果返回都在这条通道上完成。
sequenceDiagram
participant User as 用户 (浏览器)
participant MUI_Frontend as MUI 前端 (React App)
participant Kernel_Gateway as Jupyter Kernel Gateway
participant Spark_Kernel as PySpark Kernel 进程
participant Data_Lake as 数据湖 (S3/HDFS)
User->>MUI_Frontend: 打开查询控制台
MUI_Frontend->>Kernel_Gateway: HTTP POST /api/kernels (请求启动新内核)
Kernel_Gateway->>Spark_Kernel: fork() 新的内核进程
Kernel_Gateway-->>MUI_Frontend: 返回 Kernel ID 和 WebSocket URL
MUI_Frontend->>Kernel_Gateway: 建立 WebSocket 连接
User->>MUI_Frontend: 在编辑器输入 PySpark 代码并点击执行
MUI_Frontend->>Kernel_Gateway: WebSocket 发送 execute_request 消息
Kernel_Gateway->>Spark_Kernel: 通过 ZMQ 转发执行请求
Spark_Kernel->>Data_Lake: 执行 Spark SQL 查询
Data_Lake-->>Spark_Kernel: 返回查询数据
Spark_Kernel-->>Kernel_Gateway: 通过 ZMQ 返回 execute_result/stream 消息
Kernel_Gateway-->>MUI_Frontend: WebSocket 推送结果/状态消息
MUI_Frontend->>User: 渲染数据表格或状态更新
这套架构既能利用 MUI 构建出现代化的交互界面,又能复用 Jupyter 成熟的内核生态系统来执行针对数据湖的复杂查询,形成了一个高效的闭环。
第一步:部署可依赖的 Kernel Gateway
在真实项目中,稳定性压倒一切。Kernel Gateway 不能是临时启动的脚本,它必须是一个可配置、可监控的长期运行服务。我们选择用 Docker 将其容器化。
首先,我们需要一个包含特定内核的自定义镜像。这里我们选择 spylon-kernel,它能很好地将 Spark 和 Python 环境结合起来。
Dockerfile
# 使用官方 jupyter/kernel-gateway 镜像作为基础
FROM jupyter/kernel-gateway:2.6.0
USER root
# 安装 Java,因为 Spark 需要它
RUN apt-get update && \
apt-get install -y --no-install-recommends openjdk-11-jre-headless && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
USER ${NB_USER}
# 安装 spylon-kernel 和 pyspark
# 在生产环境中,应固定版本号
RUN pip install --no-cache-dir \
spylon-kernel==0.4.1 \
pyspark==3.3.0
# 创建内核规范文件,让 Gateway 知道这个内核的存在
RUN python -m spylon_kernel.install
# 暴露 Gateway 端口
EXPOSE 8888
# 默认启动命令
CMD ["jupyter", "kernelgateway", "--KernelGatewayApp.ip=0.0.0.0", "--KernelGatewayApp.port=8888"]
为了便于本地开发和线上部署,使用 docker-compose.yml 管理配置。这里的坑在于跨域(CORS)和认证。前端应用和 Kernel Gateway 通常不在同一个域,必须显式配置 CORS。同时,为了最基本的安全,启用 Token 认证。
docker-compose.yml
version: '3.8'
services:
kernel-gateway:
build:
context: .
dockerfile: Dockerfile
container_name: interactive-query-gateway
ports:
- "8888:8888"
environment:
# 设置一个固定的认证 Token,生产环境应使用更安全的动态 Token 机制
- KG_AUTH_TOKEN=my-secret-production-token
# 配置 Spark Driver 的内存等参数
- SPARK_OPTS=--driver-java-options="-Xms1024M -Xmx2048M"
command: >
jupyter kernelgateway
--KernelGatewayApp.ip=0.0.0.0
--KernelGatewayApp.port=8888
--KernelGatewayApp.allow_origin='http://localhost:3000' # 允许前端开发服务器访问
--KernelGatewayApp.allow_credentials='true'
--KernelGatewayApp.allow_methods='GET,POST,PUT,DELETE,OPTIONS'
--KernelGatewayApp.allow_headers='Content-Type,Authorization,X-XSRFToken'
--KernelGatewayApp.auth_token_env_var='KG_AUTH_TOKEN'
volumes:
# 挂载数据湖的配置文件,如 core-site.xml, hdfs-site.xml
# - ./conf/hadoop:/etc/hadoop/conf
# 挂载 Spark 配置文件
# - ./conf/spark:/usr/local/spark/conf
restart: always
这个配置确保了 Gateway 服务的健壮性。运行 docker-compose up --build -d,一个可靠的后端执行环境就绪了。
第二步:构建前端骨架与内核通信服务
前端的核心是与 Kernel Gateway 的 WebSocket 通信。这是一个复杂的过程,涉及到会话管理、消息序列化与反序列化,必须将其封装成一个独立、可复用的服务。我们不造轮子,直接使用 Jupyter 官方提供的 @jupyterlab/services 库,它已经处理好了底层的复杂性。
src/services/JupyterKernelService.ts
import {
KernelManager,
KernelMessage,
SessionManager,
ServerConnection,
} from '@jupyterlab/services';
import { IKernelConnection } from '@jupyterlab/services/lib/kernel/kernel';
import { ISessionConnection } from '@jupyterlab/services/lib/session/session';
import { v4 as uuidv4 } from 'uuid';
// 定义配置常量
const BASE_URL = process.env.REACT_APP_KERNEL_GATEWAY_URL || 'http://localhost:8888';
const KERNEL_NAME = 'spylon'; // 与 Dockerfile 中安装的内核对应
const AUTH_TOKEN = process.env.REACT_APP_KERNEL_GATEWAY_TOKEN || 'my-secret-production-token';
/**
* 一个常见的错误是直接在组件中处理 WebSocket 连接和消息。
* 这会导致状态管理混乱和代码重复。必须将所有与 Jupyter Kernel 的交互
* 封装在一个专用的服务类中。
*/
export class JupyterKernelService {
private serverSettings: ServerConnection.ISettings;
private kernelManager: KernelManager;
private sessionManager: SessionManager;
private kernelConnection: IKernelConnection | null = null;
private sessionConnection: ISessionConnection | null = null;
constructor() {
this.serverSettings = ServerConnection.makeSettings({
baseUrl: BASE_URL,
token: AUTH_TOKEN,
wsUrl: BASE_URL.replace(/^http/, 'ws'),
});
this.kernelManager = new KernelManager({ serverSettings: this.serverSettings });
this.sessionManager = new SessionManager({
serverSettings: this.serverSettings,
kernelManager: this.kernelManager,
});
// 优雅地处理页面关闭,销毁内核会话,避免资源泄露
window.addEventListener('beforeunload', async () => {
await this.shutdown();
});
}
/**
* 初始化服务,启动一个新的内核会话。
* 这是所有操作的入口点。
*/
public async initialize(): Promise<void> {
if (this.sessionConnection && !this.sessionConnection.isDisposed) {
console.log('Session already initialized.');
return;
}
try {
console.log('Starting new Jupyter session...');
this.sessionConnection = await this.sessionManager.startNew({
path: `session-${uuidv4()}`,
type: 'notebook', // 'console' or 'notebook'
name: `web-query-session`,
kernel: {
name: KERNEL_NAME,
},
});
this.kernelConnection = this.sessionConnection.kernel;
console.log(`Session started with kernel ID: ${this.kernelConnection?.id}`);
} catch (error) {
console.error('Failed to initialize Jupyter kernel session:', error);
// 在真实项目中,这里应该触发一个全局错误状态,通知 UI
throw new Error('Kernel initialization failed.');
}
}
/**
* 执行代码并处理返回的消息流。
* 这是整个系统的核心交互逻辑。
* @param code 要执行的代码字符串
* @param onMessage 回调函数,用于处理内核返回的每条消息
* @returns a Future object that can be used to control execution
*/
public execute(
code: string,
onMessage: (msg: KernelMessage.IMessage) => void
): KernelMessage.IExecuteRequestMsg | null {
if (!this.kernelConnection) {
console.error('Kernel not connected. Cannot execute code.');
throw new Error('Kernel not connected.');
}
const future = this.kernelConnection.requestExecute({
code,
stop_on_error: true,
store_history: true,
});
future.onIOPub = (msg) => {
// 这里的坑在于,内核会返回多种类型的消息,必须对 msg_type 进行判断
// 并进行相应的处理,否则 UI 将无法正确展示结果、错误或日志。
console.debug('Received IOPub message:', msg);
onMessage(msg);
};
future.onReply = (msg) => {
console.debug('Received ExecuteReply message:', msg);
onMessage(msg);
};
// 等待执行完成
future.done.then(reply => {
console.log('Execution finished with status:', reply.content.status);
}).catch(e => {
console.error('Execution future failed:', e);
});
return future.msg;
}
/**
* 关闭并销毁内核会话,释放服务器资源。
*/
public async shutdown(): Promise<void> {
if (this.sessionConnection && !this.sessionConnection.isDisposed) {
try {
await this.sessionConnection.shutdown();
console.log('Jupyter session shut down successfully.');
} catch (error) {
console.error('Failed to shut down Jupyter session:', error);
} finally {
this.sessionConnection = null;
this.kernelConnection = null;
}
}
}
public get isReady(): boolean {
return !!this.kernelConnection && !this.kernelConnection.isDisposed;
}
}
// 创建一个单例,供整个应用使用
export const kernelService = new JupyterKernelService();
第三步:用MUI构建IDE式布局与核心组件
一个好的查询工具,其布局至关重要。我们采用经典的 IDE 三栏布局:左侧是数据湖的模式(Schema)浏览器,中间是查询编辑器,下方是结果展示区和日志。使用 react-resizable-panels 库可以轻松实现可拖拽调整大小的面板。
src/components/QueryConsole.tsx
import React, { useState, useEffect, useRef } from 'react';
import { Box, Paper, Typography, CircularProgress } from '@mui/material';
import { Panel, PanelGroup, PanelResizeHandle } from 'react-resizable-panels';
import { kernelService } from '../services/JupyterKernelService';
import { MonacoEditor } from './MonacoEditor'; // Monaco 编辑器组件(封装)
import { ResultDataGrid } from './ResultDataGrid'; // MUI DataGrid 结果展示组件
import { StatusBar } from './StatusBar'; // 状态栏组件
import { KernelMessage } from '@jupyterlab/services';
type KernelStatus = 'initializing' | 'idle' | 'busy' | 'error';
export const QueryConsole: React.FC = () => {
const [status, setStatus] = useState<KernelStatus>('initializing');
const [results, setResults] = useState<Record<string, any>[]>([]);
const [columns, setColumns] = useState<any[]>([]);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
const initKernel = async () => {
try {
await kernelService.initialize();
setStatus('idle');
} catch (e) {
setStatus('error');
setError('Failed to connect to the execution kernel.');
}
};
initKernel();
return () => {
kernelService.shutdown();
};
}, []);
const handleExecute = (code: string) => {
if (!kernelService.isReady) {
setError('Kernel is not ready. Please refresh the page.');
return;
}
// 清空上次执行结果
setResults([]);
setColumns([]);
setError(null);
setStatus('busy');
// 这里的实现是关键。我们需要一个消息处理器来解析 Jupyter 的协议。
const handleMessage = (msg: KernelMessage.IMessage) => {
const msgType = msg.header.msg_type;
switch (msgType) {
case 'status':
const newStatus = (msg.content as any).execution_state;
if (newStatus === 'idle') {
setStatus('idle');
}
break;
case 'execute_result':
// 最重要的消息类型:包含执行结果
const data = (msg.content as any).data['application/json'];
if (data && Array.isArray(data.data) && data.data.length > 0) {
const newColumns = data.schema.fields.map((field: any) => ({
field: field.name,
headerName: field.name,
flex: 1,
}));
const newRows = data.data.map((row: any[], index: number) => {
const rowObj: Record<string, any> = { id: index }; // DataGrid 需要一个 id
data.schema.fields.forEach((field: any, i: number) => {
rowObj[field.name] = row[i];
});
return rowObj;
});
setColumns(newColumns);
setResults(newRows);
} else {
// 处理空结果集
setColumns([]);
setResults([]);
}
break;
case 'stream':
// 处理 stdout/stderr 输出
const streamContent = (msg.content as any).text;
console.log('Kernel Stream:', streamContent); // 可以显示在日志面板
break;
case 'error':
// 处理执行时错误
const { ename, evalue, traceback } = msg.content as any;
setError(`${ename}: ${evalue}\n\nTraceback:\n${traceback.join('\n')}`);
setStatus('idle'); // 错误后也应变为空闲
break;
default:
break;
}
};
// PySpark DataFrame.show() 默认只打印到 stdout。
// 为了获得结构化的 JSON 结果,我们必须让用户执行能返回数据的代码。
// 在真实项目中,我们会封装一个函数,比如 display(df),它会自动将 DataFrame 转为 JSON。
// 这里为了演示,我们假设用户的代码最后一行是一个返回 JSON 字符串的表达式。
const wrappedCode = `
import json
from pyspark.sql import DataFrame
# 假设用户代码在 'user_code' 变量中
user_code = """
${code}
"""
# 找到最后一行表达式并尝试将其转换为 DataFrame to JSON
# 这是一个简化的实现,生产环境需要更健壮的 AST 解析或约定
lines = [line for line in user_code.strip().split('\\n') if line.strip()]
if lines:
last_line = lines[-1]
# 尝试将最后一行作为 DataFrame 执行并转换为 JSON
final_code = f"""
{user_code}
if isinstance({last_line}, DataFrame):
# .limit(1000) 是一个保护机制,防止返回过大的结果集导致前端崩溃
data_list = [row.asDict() for row in {last_line}.limit(1000).collect()]
schema_json = {last_line}.schema.jsonValue()
print(json.dumps({{'schema': schema_json, 'data': data_list}}))
"""
else:
final_code = user_code
kernelService.execute(final_code, handleMessage);
`;
kernelService.execute(wrappedCode, handleMessage);
};
if (status === 'initializing') {
return <Box sx={{ display: 'flex', justifyContent: 'center', alignItems: 'center', height: '100vh' }}><CircularProgress /> <Typography sx={{ml: 2}}>Connecting to Kernel...</Typography></Box>;
}
return (
<Box sx={{ height: '100vh', display: 'flex', flexDirection: 'column' }}>
<PanelGroup direction="vertical">
<Panel defaultSize={60}>
<MonacoEditor onExecute={handleExecute} />
</Panel>
<PanelResizeHandle style={{ height: '8px', background: '#eee' }} />
<Panel>
<Paper sx={{ height: '100%', overflow: 'auto' }}>
{error && <pre style={{ color: 'red', padding: '10px', whiteSpace: 'pre-wrap' }}>{error}</pre>}
{status === 'busy' && <CircularProgress sx={{margin: 2}}/>}
{columns.length > 0 && <ResultDataGrid columns={columns} rows={results} />}
</Paper>
</Panel>
</PanelGroup>
<StatusBar status={status} />
</Box>
);
};
第四步:优化大数据量下的结果展示
一个常见的坑是,当查询返回上万行数据时,直接渲染一个巨大的 HTML <table> 会导致浏览器卡死甚至崩溃。MUI 的 DataGrid 组件原生支持虚拟化(Virtualization),这是解决此问题的关键。它只渲染可视区域内的行,当用户滚动时,动态替换行,从而能以极高的性能展示海量数据。
src/components/ResultDataGrid.tsx
import React from 'react';
import { Box } from '@mui/material';
import { DataGrid, GridColDef, GridRowsProp } from '@mui/x-data-grid';
interface ResultDataGridProps {
columns: GridColDef[];
rows: GridRowsProp;
}
export const ResultDataGrid: React.FC<ResultDataGridProps> = ({ columns, rows }) => {
if (rows.length === 0) {
return <Box sx={{ p: 2, color: 'text.secondary' }}>No results to display.</Box>;
}
// DataGrid 的性能在于它默认启用了行和列的虚拟化。
// 我们需要做的就是正确地提供 columns 和 rows prop。
// 关键是确保每一行数据 (row) 都有一个唯一的 `id` 字段。
// 在上一步的 handleMessage 中我们已经为每行生成了 id。
return (
<Box sx={{ height: '100%', width: '100%' }}>
<DataGrid
rows={rows}
columns={columns}
density="compact"
// 开启分页,进一步控制 DOM 元素数量
pagination
initialState={{
pagination: {
paginationModel: {
pageSize: 100,
},
},
}}
pageSizeOptions={[100, 500, 1000]}
// 禁用行选择,除非有此需求
disableRowSelectionOnClick
// 加载状态
loading={!rows.length && columns.length > 0}
// 提供自定义的无数据覆盖层
// slots={{ noRowsOverlay: CustomNoRowsOverlay }}
sx={{
// 移除边框,使其更像嵌入式组件
border: 'none',
'& .MuiDataGrid-cell': {
borderBottom: '1px solid #eee',
},
}}
/>
</Box>
);
};
通过这种方式,我们构建了一个功能完备、体验流畅的交互式数据湖查询控制台。它解决了核心痛点,为数据分析师提供了强大的工具,同时其技术架构也具备良好的扩展性。
局限与未来迭代
当前这套方案作为一个内部工具的原型是成功的,但在投入更大规模生产前,仍有几个关键点需要解决。
首先,内核管理过于简单。当前的 Kernel Gateway 是单点的,所有用户的内核都在同一个进程中启动,这在多租户场景下存在资源抢占和安全风险。真正的生产级系统需要引入 Jupyter Enterprise Gateway 或自研一套基于 Kubernetes 的 Kernel Manager,为每个用户在隔离的 Pod 中动态启动内核,实现资源的精细化管理和隔离。
其次,安全性需要加固。目前仅依赖固定的 Token 认证,这远远不够。需要与企业内部的身份认证系统(如 OAuth2/OIDC)集成,实现单点登录。此外,内核中执行的代码权限过高,必须通过容器的 cgroups、seccomp 等机制进行严格的沙箱化,限制其文件系统访问、网络请求和资源使用,防止恶意代码对数据湖或基础设施造成破坏。
最后,用户体验可以进一步提升。例如,实现查询历史记录、结果集导出(CSV/Excel)、基于返回数据的简单图表可视化、以及通过解析内核返回的元数据实现代码自动补全。这些功能的迭代,将使这个内部平台从一个“可用的工具”演变为一个“高效的生产力平台”。