Rust gRPC server stream minimal example
This simple Rust example implements a gRPC server that provides a server-side streaming RPC. The server streams a sine wave signal to the client.
A client is provided for testing purposes. It connects to the server and prints the received sine wave values continously
src/main.rs
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_stream::try_stream;
use tonic::{Request, Response, Status};
use tokio::time::interval;
pub mod sine {
pub mod v1 {
tonic::include_proto!("sine.v1");
}
}
use sine::v1::{sine_wave_server::{SineWave, SineWaveServer}, Sample, SubscribeRequest};
#[derive(Default)]
struct SineSvc;
use std::pin::Pin;
use futures_core::Stream;
#[tonic::async_trait]
impl SineWave for SineSvc {
type SubscribeStream = Pin<Box<dyn Stream<Item = Result<Sample, Status>> + Send + 'static>>;
async fn subscribe(
&self,
_request: Request<SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
// Signal params
const F_HZ: f64 = 5.0; // 5 Hz
const FS_HZ: f64 = 1000.0; // 1 kHz -> 1 ms per sample
const DT: Duration = Duration::from_millis(1);
let mut n: i64 = 0;
let mut ticker = interval(DT);
let out = try_stream! {
loop {
ticker.tick().await;
let t = n as f64 / FS_HZ; // seconds
let val = (2.0_f64 * std::f64::consts::PI * F_HZ * t).sin();
let now = SystemTime::now().duration_since(UNIX_EPOCH)
.map_err(|_| Status::internal("clock went backwards"))?;
let sample = Sample {
value: val,
index: n,
timestamp_micros: now.as_micros() as i64,
};
n = n.saturating_add(1);
yield sample;
}
};
Ok(Response::new(Box::pin(out) as Self::SubscribeStream))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50051".parse()?;
let svc = SineSvc::default();
println!("SineWave gRPC server streaming on {}", addr);
tonic::transport::Server::builder()
.add_service(SineWaveServer::new(svc))
.serve(addr)
.await?;
Ok(())
}
examples/client.rs
use tonic::Request;
pub mod sine { pub mod v1 { tonic::include_proto!("sine.v1"); } }
use sine::v1::{sine_wave_client::SineWaveClient, SubscribeRequest};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = SineWaveClient::connect("http://127.0.0.1:50051").await?;
let mut stream = client.subscribe(Request::new(SubscribeRequest{})).await?.into_inner();
let mut count = 0usize;
while let Some(sample) = stream.message().await? {
println!("n={} value={:.6} t_us={}", sample.index, sample.value, sample.timestamp_micros);
count += 1;
}
Ok(())
}
proto/sine.proto
syntax = "proto3";
package sine.v1;
message SubscribeRequest {
// You could put options here later (e.g., frequency, fs). Left empty for now.
}
message Sample {
double value = 1; // The sine value at this sample
int64 index = 2; // Sample index n (0,1,2,...)
int64 timestamp_micros = 3; // Server timestamp when produced
}
service SineWave {
// Server-streaming: client calls once, server streams samples forever.
rpc Subscribe(SubscribeRequest) returns (stream Sample);
}
Cargo.toml
[package]
name = "sine-grpc"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.39", features = ["rt-multi-thread", "macros", "time"] }
tonic = { version = "0.12", features = ["transport"] }
prost = "0.13"
async-stream = "0.3"
futures-core = "0.3.31"
[build-dependencies]
tonic-build = "0.12"
prost-build = "0.13"
How to use
Server
To run the server:
cargo run
Example output
SineWave gRPC server streaming on 0.0.0.0:50051
Client
To run the example client:
cargo run --example client
Example output
n=0 value=0.000000 t_us=1757287065032360
n=1 value=0.031411 t_us=1757287065033631
n=2 value=0.062791 t_us=1757287065033730
n=3 value=0.094108 t_us=1757287065035088
n=4 value=0.125333 t_us=1757287065036387
n=5 value=0.156434 t_us=1757287065037730
n=6 value=0.187381 t_us=1757287065037736
n=7 value=0.218143 t_us=1757287065039026
n=8 value=0.248690 t_us=1757287065040332
n=9 value=0.278991 t_us=1757287065041552
n=10 value=0.309017 t_us=1757287065041749
n=11 value=0.338738 t_us=1757287065042964
n=12 value=0.368125 t_us=1757287065044132
n=13 value=0.397148 t_us=1757287065045363
n=14 value=0.425779 t_us=1757287065045722
n=15 value=0.453990 t_us=1757287065046977
n=16 value=0.481754 t_us=1757287065048292
n=17 value=0.509041 t_us=1757287065049629
n=18 value=0.535827 t_us=1757287065049802
n=19 value=0.562083 t_us=1757287065051120
n=20 value=0.587785 t_us=1757287065052458
n=21 value=0.612907 t_us=1757287065053733
n=22 value=0.637424 t_us=1757287065053738
n=23 value=0.661312 t_us=1757287065054902
n=24 value=0.684547 t_us=1757287065056231
n=25 value=0.707107 t_us=1757287065057441
n=26 value=0.728969 t_us=1757287065058684
n=27 value=0.750111 t_us=1757287065058833
n=28 value=0.770513 t_us=1757287065059946
n=29 value=0.790155 t_us=1757287065061166
n=30 value=0.809017 t_us=1757287065062399
n=31 value=0.827081 t_us=1757287065063649
n=32 value=0.844328 t_us=1757287065063854
n=33 value=0.860742 t_us=1757287065065065
n=34 value=0.876307 t_us=1757287065066258
n=35 value=0.891007 t_us=1757287065067465
n=36 value=0.904827 t_us=1757287065068682
n=37 value=0.917755 t_us=1757287065068776
n=38 value=0.929776 t_us=1757287065069970
n=39 value=0.940881 t_us=1757287065071149
n=40 value=0.951057 t_us=1757287065072325
n=41 value=0.960294 t_us=1757287065073503
n=42 value=0.968583 t_us=1757287065074746
n=43 value=0.975917 t_us=1757287065074752
n=44 value=0.982287 t_us=1757287065075958
n=45 value=0.987688 t_us=1757287065077126
n=46 value=0.992115 t_us=1757287065078362
n=47 value=0.995562 t_us=1757287065079626
n=48 value=0.998027 t_us=1757287065079777
n=49 value=0.999507 t_us=1757287065081107
n=50 value=1.000000 t_us=1757287065082357
[...]
If this post helped you, please consider buying me a coffee or donating via PayPal to support research & publishing of new posts on TechOverflow