Rust gRPC server stream minimal example

This simple Rust example implements a gRPC server that provides a server-side streaming RPC. The server streams a sine wave signal to the client.

A client is provided for testing purposes. It connects to the server and prints the received sine wave values continously

src/main.rs

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use async_stream::try_stream;
use tonic::{Request, Response, Status};
use tokio::time::interval;

pub mod sine {
    pub mod v1 {
        tonic::include_proto!("sine.v1");
    }
}

use sine::v1::{sine_wave_server::{SineWave, SineWaveServer}, Sample, SubscribeRequest};

#[derive(Default)]
struct SineSvc;

use std::pin::Pin;
use futures_core::Stream;

#[tonic::async_trait]
impl SineWave for SineSvc {
    type SubscribeStream = Pin<Box<dyn Stream<Item = Result<Sample, Status>> + Send + 'static>>;

    async fn subscribe(
        &self,
        _request: Request<SubscribeRequest>,
    ) -> Result<Response<Self::SubscribeStream>, Status> {
        // Signal params
        const F_HZ: f64 = 5.0;         // 5 Hz
        const FS_HZ: f64 = 1000.0;     // 1 kHz -> 1 ms per sample
        const DT: Duration = Duration::from_millis(1);

        let mut n: i64 = 0;
        let mut ticker = interval(DT);

        let out = try_stream! {
            loop {
                ticker.tick().await;

                let t = n as f64 / FS_HZ; // seconds
                let val = (2.0_f64 * std::f64::consts::PI * F_HZ * t).sin();

                let now = SystemTime::now().duration_since(UNIX_EPOCH)
                    .map_err(|_| Status::internal("clock went backwards"))?;
                let sample = Sample {
                    value: val,
                    index: n,
                    timestamp_micros: now.as_micros() as i64,
                };

                n = n.saturating_add(1);
                yield sample;
            }
        };

        Ok(Response::new(Box::pin(out) as Self::SubscribeStream))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "0.0.0.0:50051".parse()?;
    let svc = SineSvc::default();

    println!("SineWave gRPC server streaming on {}", addr);
    tonic::transport::Server::builder()
        .add_service(SineWaveServer::new(svc))
        .serve(addr)
        .await?;

    Ok(())
}

examples/client.rs

use tonic::Request;

pub mod sine { pub mod v1 { tonic::include_proto!("sine.v1"); } }
use sine::v1::{sine_wave_client::SineWaveClient, SubscribeRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = SineWaveClient::connect("http://127.0.0.1:50051").await?;
    let mut stream = client.subscribe(Request::new(SubscribeRequest{})).await?.into_inner();

    let mut count = 0usize;
    while let Some(sample) = stream.message().await? {
        println!("n={} value={:.6} t_us={}", sample.index, sample.value, sample.timestamp_micros);
        count += 1;
    }
    Ok(())
}

proto/sine.proto

syntax = "proto3";

package sine.v1;

message SubscribeRequest {
  // You could put options here later (e.g., frequency, fs). Left empty for now.
}

message Sample {
  double value = 1;          // The sine value at this sample
  int64  index = 2;          // Sample index n (0,1,2,...)
  int64  timestamp_micros = 3; // Server timestamp when produced
}

service SineWave {
  // Server-streaming: client calls once, server streams samples forever.
  rpc Subscribe(SubscribeRequest) returns (stream Sample);
}

Cargo.toml

[package]
name = "sine-grpc"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.39", features = ["rt-multi-thread", "macros", "time"] }
tonic = { version = "0.12", features = ["transport"] }
prost = "0.13"
async-stream = "0.3"
futures-core = "0.3.31"

[build-dependencies]
tonic-build = "0.12"
prost-build = "0.13"

How to use

Server

To run the server:

cargo run

Example output

SineWave gRPC server streaming on 0.0.0.0:50051

Client

To run the example client:

cargo run --example client

Example output

n=0 value=0.000000 t_us=1757287065032360
n=1 value=0.031411 t_us=1757287065033631
n=2 value=0.062791 t_us=1757287065033730
n=3 value=0.094108 t_us=1757287065035088
n=4 value=0.125333 t_us=1757287065036387
n=5 value=0.156434 t_us=1757287065037730
n=6 value=0.187381 t_us=1757287065037736
n=7 value=0.218143 t_us=1757287065039026
n=8 value=0.248690 t_us=1757287065040332
n=9 value=0.278991 t_us=1757287065041552
n=10 value=0.309017 t_us=1757287065041749
n=11 value=0.338738 t_us=1757287065042964
n=12 value=0.368125 t_us=1757287065044132
n=13 value=0.397148 t_us=1757287065045363
n=14 value=0.425779 t_us=1757287065045722
n=15 value=0.453990 t_us=1757287065046977
n=16 value=0.481754 t_us=1757287065048292
n=17 value=0.509041 t_us=1757287065049629
n=18 value=0.535827 t_us=1757287065049802
n=19 value=0.562083 t_us=1757287065051120
n=20 value=0.587785 t_us=1757287065052458
n=21 value=0.612907 t_us=1757287065053733
n=22 value=0.637424 t_us=1757287065053738
n=23 value=0.661312 t_us=1757287065054902
n=24 value=0.684547 t_us=1757287065056231
n=25 value=0.707107 t_us=1757287065057441
n=26 value=0.728969 t_us=1757287065058684
n=27 value=0.750111 t_us=1757287065058833
n=28 value=0.770513 t_us=1757287065059946
n=29 value=0.790155 t_us=1757287065061166
n=30 value=0.809017 t_us=1757287065062399
n=31 value=0.827081 t_us=1757287065063649
n=32 value=0.844328 t_us=1757287065063854
n=33 value=0.860742 t_us=1757287065065065
n=34 value=0.876307 t_us=1757287065066258
n=35 value=0.891007 t_us=1757287065067465
n=36 value=0.904827 t_us=1757287065068682
n=37 value=0.917755 t_us=1757287065068776
n=38 value=0.929776 t_us=1757287065069970
n=39 value=0.940881 t_us=1757287065071149
n=40 value=0.951057 t_us=1757287065072325
n=41 value=0.960294 t_us=1757287065073503
n=42 value=0.968583 t_us=1757287065074746
n=43 value=0.975917 t_us=1757287065074752
n=44 value=0.982287 t_us=1757287065075958
n=45 value=0.987688 t_us=1757287065077126
n=46 value=0.992115 t_us=1757287065078362
n=47 value=0.995562 t_us=1757287065079626
n=48 value=0.998027 t_us=1757287065079777
n=49 value=0.999507 t_us=1757287065081107
n=50 value=1.000000 t_us=1757287065082357
[...]