Rust tracé en temps réel utilisant une source de flux gRPC : exemple minimal

Dans notre article précédent Rust serveur gRPC en flux : exemple minimal nous avons illustré comment implémenter un serveur gRPC en Rust qui diffuse un signal sinusoïdal vers un client.

Cet article s’appuie sur l’exemple de client en ligne de commande de Rust serveur gRPC en flux : exemple minimal en implémentant un tracé en temps réel se mettant à jour en direct des échantillons du signal sinusoïdal entrant à l’aide d’egui_plot.

Rust realtime sine plot

Voir Rust serveur gRPC en flux : exemple minimal pour tous les fichiers non montrés ici.

examples/plot.rs

live_plot.rs
use std::{collections::VecDeque, time::Duration};
use eframe::{self, egui};
use egui_plot::{Plot, Line, Legend, PlotPoints};
use std::sync::mpsc::Receiver;
use std::thread;

// Imports du client gRPC
use tonic::Request;

pub mod sine { pub mod v1 { tonic::include_proto!("sine.v1"); } }
use sine::v1::{sine_wave_client::SineWaveClient, SubscribeRequest, Sample as GrpcSample};

// Définir la structure ScopeApp
pub struct ScopeApp {
    pub rx: Receiver<Sample>,
    pub buffer: VecDeque<[f64; 2]>,
    pub max_points: usize,
    pub time_window: f64,
    pub last_prune: std::time::Instant,
}

// Structure Sample correspondant au Sample gRPC
#[derive(Debug, Clone)]
pub struct Sample {
    pub index: u64,
    pub value: f64,
    pub timestamp_micros: i64,
}
impl eframe::App for ScopeApp {
    fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
        // Récupérer tous les nouveaux échantillons disponibles
        while let Ok(sample) = self.rx.try_recv() {
            // Utiliser l'horodatage (en secondes) comme t, la valeur comme y
            let t = sample.timestamp_micros as f64 * 1e-6;
            let y = sample.value;
            self.buffer.push_back([t, y]);
            if self.buffer.len() > self.max_points {
                self.buffer.pop_front();
            }
        }

        // Élaguer périodiquement pour ne conserver que la fenêtre de temps la plus récente
        if self.last_prune.elapsed() > Duration::from_millis(200) {
            if let Some((&[t_latest, _], _)) = self.buffer.back().map(|x| (x, ())) {
                let cutoff = t_latest - self.time_window;
                while let Some(&[t, _]) = self.buffer.front() {
                    if t < cutoff {
                        self.buffer.pop_front();
                    } else {
                        break;
                    }
                }
            }
            self.last_prune = std::time::Instant::now();
        }

    // Construire les points du tracé
    let points: PlotPoints = self.buffer.iter().map(|&[t, y]| [t, y]).collect();
    let line = Line::new("sine", points);

    egui::TopBottomPanel::top("controls").show(ctx, |ui| {
            ui.heading("Sine Scope");
            ui.horizontal(|ui| {
                ui.label("Time window (s):");
                ui.add(egui::Slider::new(&mut self.time_window, 1.0..=60.0));
                ui.label("Points cap:");
                ui.add(egui::Slider::new(&mut self.max_points, 5_000..=200_000));
            });
        });

        egui::CentralPanel::default().show(ctx, |ui| {
            Plot::new("scope_plot")
                .legend(Legend::default())
                .allow_scroll(false)
                .allow_zoom(true)
                .allow_boxed_zoom(false)
                .show(ui, |plot_ui| {
                    plot_ui.line(line);
                });
        });

        // Repeindre continuellement pour que cela paraisse en temps réel
    ctx.request_repaint_after(Duration::from_millis(16));
    }
}

// Ajouter une fonction main pour lancer l'application et démarrer le client gRPC en arrière-plan
fn main() {
    let (tx, rx) = std::sync::mpsc::channel();

    // Lancer un thread d'arrière-plan pour exécuter le client gRPC asynchrone
    thread::spawn(move || {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async move {
            let mut client = match SineWaveClient::connect("http://127.0.0.1:50051").await {
                Ok(c) => c,
                Err(e) => {
                    eprintln!("Failed to connect to gRPC server: {e}");
                    return;
                }
            };
            let mut stream = match client.subscribe(Request::new(SubscribeRequest{})).await {
                Ok(resp) => resp.into_inner(),
                Err(e) => {
                    eprintln!("Failed to subscribe: {e}");
                    return;
                }
            };
            while let Ok(Some(sample)) = stream.message().await {
                let sample = Sample {
                    index: sample.index as u64,
                    value: sample.value,
                    timestamp_micros: sample.timestamp_micros,
                };
                if tx.send(sample).is_err() {
                    break;
                }
            }
        });
    });

    let app = ScopeApp {
        rx,
        buffer: VecDeque::new(),
        max_points: 10_000,
        time_window: 10.0,
        last_prune: std::time::Instant::now(),
    };
    let native_options = eframe::NativeOptions::default();
    eframe::run_native(
        "Sine Scope",
        native_options,
        Box::new(|_cc| Ok(Box::new(app))),
    ).unwrap();
}

Cargo.toml

Cargo.toml
[package]
name = "sine_scope_client"
version = "0.1.0"
edition = "2021"

[dependencies]
# gRPC
tonic = { version = "0.12", features = ["tls"] }
prost = "0.13"
prost-types = "0.13"

# Runtime asynchrone
tokio = { version = "1.39", features = ["rt-multi-thread", "macros", "sync"] }

# GUI

# egui_plot est livré comme un module d'egui dans les versions récentes, mais nous utilisons le crate pour un versionnage explicite
# Pour les canaux entre la tâche asynchrone et l'UI
futures = "0.3"
eframe = "0.32.2"
egui = "0.32.2"
egui_plot = "0.33.0"

[build-dependencies]
tonic-build = "0.12"

Comment exécuter

run_live_plot.sh
cargo run --example plot

Check out similar posts by category: Rust, GRPC