Rust 实时绘图:使用 gRPC 流源的最小示例
在我们之前的文章 Rust gRPC 服务端流最小示例 中,我们演示了如何在 Rust 中实现一个 gRPC 服务端,向客户端流式传输正弦波信号。
本文基于 Rust gRPC 服务端流最小示例 中的命令行客户端示例,使用 egui_plot 实现对传入正弦波采样数据的实时更新实时绘图。

此处未展示的所有文件请参见 Rust gRPC 服务端流最小示例。
examples/plot.rs
live_plot.rs
use std::{collections::VecDeque, time::Duration};
use eframe::{self, egui};
use egui_plot::{Plot, Line, Legend, PlotPoints};
use std::sync::mpsc::Receiver;
use std::thread;
// gRPC 客户端导入
use tonic::Request;
pub mod sine { pub mod v1 { tonic::include_proto!("sine.v1"); } }
use sine::v1::{sine_wave_client::SineWaveClient, SubscribeRequest, Sample as GrpcSample};
// 定义 ScopeApp 结构体
pub struct ScopeApp {
pub rx: Receiver<Sample>,
pub buffer: VecDeque<[f64; 2]>,
pub max_points: usize,
pub time_window: f64,
pub last_prune: std::time::Instant,
}
// 与 gRPC Sample 匹配的 Sample 结构体
#[derive(Debug, Clone)]
pub struct Sample {
pub index: u64,
pub value: f64,
pub timestamp_micros: i64,
}
impl eframe::App for ScopeApp {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
// 拉取所有可用的新采样
while let Ok(sample) = self.rx.try_recv() {
// 使用时间戳(秒)作为 t,值作为 y
let t = sample.timestamp_micros as f64 * 1e-6;
let y = sample.value;
self.buffer.push_back([t, y]);
if self.buffer.len() > self.max_points {
self.buffer.pop_front();
}
}
// 定期修剪,仅保留最近的时间窗口
if self.last_prune.elapsed() > Duration::from_millis(200) {
if let Some((&[t_latest, _], _)) = self.buffer.back().map(|x| (x, ())) {
let cutoff = t_latest - self.time_window;
while let Some(&[t, _]) = self.buffer.front() {
if t < cutoff {
self.buffer.pop_front();
} else {
break;
}
}
}
self.last_prune = std::time::Instant::now();
}
// 构建绘图点
let points: PlotPoints = self.buffer.iter().map(|&[t, y]| [t, y]).collect();
let line = Line::new("sine", points);
egui::TopBottomPanel::top("controls").show(ctx, |ui| {
ui.heading("Sine Scope");
ui.horizontal(|ui| {
ui.label("Time window (s):");
ui.add(egui::Slider::new(&mut self.time_window, 1.0..=60.0));
ui.label("Points cap:");
ui.add(egui::Slider::new(&mut self.max_points, 5_000..=200_000));
});
});
egui::CentralPanel::default().show(ctx, |ui| {
Plot::new("scope_plot")
.legend(Legend::default())
.allow_scroll(false)
.allow_zoom(true)
.allow_boxed_zoom(false)
.show(ui, |plot_ui| {
plot_ui.line(line);
});
});
// 持续重绘以获得实时感
ctx.request_repaint_after(Duration::from_millis(16));
}
}
// 添加 main 函数以启动应用并在后台启动 gRPC 客户端
fn main() {
let (tx, rx) = std::sync::mpsc::channel();
// 生成后台线程运行异步 gRPC 客户端
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let mut client = match SineWaveClient::connect("http://127.0.0.1:50051").await {
Ok(c) => c,
Err(e) => {
eprintln!("Failed to connect to gRPC server: {e}");
return;
}
};
let mut stream = match client.subscribe(Request::new(SubscribeRequest{})).await {
Ok(resp) => resp.into_inner(),
Err(e) => {
eprintln!("Failed to subscribe: {e}");
return;
}
};
while let Ok(Some(sample)) = stream.message().await {
let sample = Sample {
index: sample.index as u64,
value: sample.value,
timestamp_micros: sample.timestamp_micros,
};
if tx.send(sample).is_err() {
break;
}
}
});
});
let app = ScopeApp {
rx,
buffer: VecDeque::new(),
max_points: 10_000,
time_window: 10.0,
last_prune: std::time::Instant::now(),
};
let native_options = eframe::NativeOptions::default();
eframe::run_native(
"Sine Scope",
native_options,
Box::new(|_cc| Ok(Box::new(app))),
).unwrap();
}Cargo.toml
Cargo.toml
[package]
name = "sine_scope_client"
version = "0.1.0"
edition = "2021"
[dependencies]
# gRPC
tonic = { version = "0.12", features = ["tls"] }
prost = "0.13"
prost-types = "0.13"
# 异步运行时
tokio = { version = "1.39", features = ["rt-multi-thread", "macros", "sync"] }
# GUI
# egui_plot 在近期版本中作为 egui 的模块发布,但我们使用独立 crate 以便明确版本控制
# 用于异步任务与 UI 之间的通道
futures = "0.3"
eframe = "0.32.2"
egui = "0.32.2"
egui_plot = "0.33.0"
[build-dependencies]
tonic-build = "0.12"如何运行
run_live_plot.sh
cargo run --example plotIf this post helped you, please consider buying me a coffee or donating via PayPal to support research & publishing of new posts on TechOverflow