Resonant Time Daemon main loop (Rust)

Purpose#

A minimal RTD that:

  • Reads dimensions.yml to determine cadence, harmonics, and phase offsets.
  • Emits WINDOW_TICK messages per dimension on schedule.
  • Supports “standalone mode” that also mirrors ticks directly to per‑dimension sockets for early testing, even without the orchestrator running.

Features#

  • Single clock: CLOCK_MONOTONIC_RAW as the timing source.
  • Harmonics: Base frequency f; harmonics define window sizes and phase offsets.
  • Emission: Sends JSON messages over Unix datagram sockets.
  • Config reload: SIGHUP to reload dimensions.yml..

Cargo skeleton#

# Cargo.toml
[package]
name = "rtd"
version = "0.1.0"
edition = "2021"
 
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
nix = "0.28"

Data structures#

// src/main.rs (excerpt)
use serde::Deserialize;
 
#[derive(Debug, Deserialize, Clone)]
struct Dimension {
    id: String,                // "D1".."D9"
    phase_offset_deg: f64,     // 0, 40, 80, ...
    harmonic: u32,             // 1, 2, 3
    qos_weight: f64,           // not used by RTD, passed through
    numa_node: i32,
    cpu_set: String,
    fs_type: String,
    size_gb: u64,
}
 
#[derive(Debug, Deserialize)]
struct Config { dimensions: Vec<Dimension> }

Main loop#

// src/main.rs
use serde_json::json;
use std::{fs, time::{Duration, Instant}};
use tokio::time::sleep;
use tokio::signal::unix::{signal, SignalKind};
use std::os::unix::net::UnixDatagram;
 
const RT_SOCK: &str = "/run/vcg/rt.sock";
const STANDALONE: bool = true; // mirror ticks to /run/vcg/dX.sock for early testing
 
fn deg_to_frac(deg: f64) -> f64 { deg / 360.0 }
 
fn sock_connect(path: &str) -> std::io::Result<UnixDatagram> {
    let sock = UnixDatagram::unbound()?;
    sock.connect(path)?;
    Ok(sock)
}
 
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    fs::create_dir_all("/run/vcg")?;
    let mut cfg = load_cfg()?;
    let mut hup = signal(SignalKind::hangup())?;
 
    // base frequency (Hz) for Gen1 testing; adjust as needed
    let mut base_hz: f64 = 10.0;
 
    let start = Instant::now();
    let mut seq: u64 = 0;
 
    loop {
        tokio::select! {
            _ = hup.recv() => {
                cfg = load_cfg()?;
                eprintln!("[RTD] reloaded dimensions.yml");
            }
            _ = tick_once(&cfg, start, seq, base_hz) => {
                seq += 1;
            }
        }
    }
}
 
fn load_cfg() -> anyhow::Result<Config> {
    let txt = fs::read_to_string("/src/gen1/vcg/config/dimensions.yml")?;
    let cfg: Config = serde_yaml::from_str(&txt)?;
    Ok(cfg)
}
 
async fn tick_once(cfg: &Config, start: Instant, seq: u64, base_hz: f64) {
    let base_period = Duration::from_secs_f64(1.0 / base_hz);
    let now = Instant::now();
    let next_boundary = start + base_period * (seq as u32 + 1);
    let sleep_dur = next_boundary.saturating_duration_since(now);
    sleep(sleep_dur).await;
 
    for d in &cfg.dimensions {
        let period = base_period / (d.harmonic as u32);
        let phase = period.mul_f64(deg_to_frac(d.phase_offset_deg));
        let t_start = Instant::now() + phase;
        let t_end = t_start + period;
 
        let msg = json!({
            "type": "WINDOW_TICK",
            "dimension_id": d.id,
            "seq": seq,
            "t_start_ns": to_ns(t_start, start),
            "t_end_ns": to_ns(t_end, start),
            "phase_deg": d.phase_offset_deg,
            "cadence_hz": (1.0/period.as_secs_f64())
        }).to_string();
 
        // Publish to RT socket (orchestrator subscribes)
        if let Ok(sock) = sock_connect(RT_SOCK) {
            let _ = sock.send(msg.as_bytes());
        }
 
        // Standalone: mirror directly to per-dimension socket
        if STANDALONE {
            let dim_sock = format!("/run/vcg/{}.sock", d.id.to_lowercase());
            if let Ok(sock) = sock_connect(&dim_sock) {
                let _ = sock.send(msg.as_bytes());
            }
        }
    }
}
 
fn to_ns(t: Instant, epoch: Instant) -> u128 {
    let d = t.duration_since(epoch);
    (d.as_secs() as u128) * 1_000_000_000u128 + (d.subsec_nanos() as u128)
}

Run#

  • Build: cargo build --release
  • Ensure /run/vcg exists: sudo mkdir -p /run/vcg
  • Start RTD: sudo ./target/release/rtd
  • Use your example workloads; they’ll receive WINDOW_TICK on /run/vcg/dX.sock in standalone mode.