Skip to main content

aiegis_harness/
main.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2//! AiEGIS Harness reference daemon — CLI + HTTP server.
3//!
4//! Ports the HTTP surface of `harness.py` (`_HarnessHandler` + `main`) onto
5//! axum + tokio. Endpoints + response shapes are byte-compatible with the
6//! Python reference; an agent integrating against either binary can swap
7//! `--harness-url` without code change.
8
9mod pack_fetcher;
10mod upstream;
11
12use axum::{
13    extract::State,
14    http::{HeaderMap, StatusCode},
15    response::{IntoResponse, Response},
16    routing::{get, post},
17    Json, Router,
18};
19use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
20use clap::Parser;
21use harness_core::{
22    audit::{AuditEntry, AuditSink, SqliteAuditLog},
23    eval::evaluate_packs,
24    load_pack, PolicyPack, DAEMON_VERSION, DEFAULT_PACK_ISSUER_PUBKEY_HEX, SPEC_VERSION,
25};
26use pack_fetcher::RemotePack;
27use serde_json::{json, Value};
28use std::collections::{HashMap, VecDeque};
29use std::path::PathBuf;
30use std::sync::{Arc, Mutex};
31use std::time::{Instant, SystemTime, UNIX_EPOCH};
32use upstream::UpstreamClient;
33
34/// CLI arguments. Mirror of the Python ref's `argparse` setup.
35#[derive(Parser, Debug)]
36#[command(
37    name = "aiegis-harness",
38    about = "AiEGIS Harness reference daemon (L2 / proxy mode).",
39    version
40)]
41struct Args {
42    /// Local HTTP port to listen on (default 8080).
43    #[arg(long, default_value_t = 8080)]
44    port: u16,
45
46    /// Path to a policy-pack JSON. Pass multiple times for multiple packs.
47    #[arg(long = "policy-pack", value_name = "PATH")]
48    policy_pack: Vec<PathBuf>,
49
50    /// SQLite file for the append-only audit log.
51    /// Day 1: accepted for CLI compatibility; the in-memory sink is used until
52    /// the rusqlite backend lands in Day 2.
53    #[arg(long, default_value = "harness_audit.db")]
54    audit_db: PathBuf,
55
56    /// (Reserved for v0.2) per-agent actions/minute cap.
57    #[arg(long, default_value_t = 60)]
58    rate_cap: u32,
59
60    /// Verbose logging (DEBUG level).
61    #[arg(long, short)]
62    verbose: bool,
63
64    /// Remote policy-pack source (directory URL containing index.json).
65    /// When set, the daemon fetches Nel-style signed packs from
66    /// `<URL>/index.json` on startup, verifies the Ed25519 signature over
67    /// sha256(tarball) against `--issuer-pubkey-hex`, and lists them in
68    /// /health. Loaded packs are INVENTORY-ONLY (visible + sig-verified
69    /// + cached) — they are NOT executed by the daemon's evaluator
70    /// because Nel publishes full OPA Rego v1, which the reference
71    /// subset evaluator does not interpret. See pack_fetcher.rs for the
72    /// needs-iteration note. Local --policy-pack JSON files still drive
73    /// the evaluator.
74    #[arg(long, value_name = "URL")]
75    pack_source: Option<String>,
76
77    /// Issuer Ed25519 public key (hex-encoded 32-byte raw) used to verify
78    /// remotely-fetched packs. Defaults to the live aiegis.ie harness
79    /// issuer key empirically captured on 2026-05-25.
80    #[arg(long, default_value = DEFAULT_PACK_ISSUER_PUBKEY_HEX)]
81    issuer_pubkey_hex: String,
82
83    /// Upstream `/v1/harness/receipt` URL (Velo-style). When set + the
84    /// local decision is ALLOW/WARN, the daemon POSTs the same payload
85    /// upstream, captures the returned `rid`, and surfaces it in both
86    /// the response body and the audit ledger. If upstream is
87    /// unreachable / 5xx, the daemon honestly falls back to its local
88    /// decision and flags `upstream_error: true` in the response.
89    #[arg(long, value_name = "URL")]
90    upstream_protect: Option<String>,
91}
92
93/// Per-agent sliding-window action timestamps for the rate-limit pack.
94#[derive(Default)]
95struct RateState {
96    buckets: Mutex<HashMap<String, VecDeque<f64>>>,
97}
98
99impl RateState {
100    fn observe(&self, agent_did: &str) -> usize {
101        let now = SystemTime::now()
102            .duration_since(UNIX_EPOCH)
103            .map(|d| d.as_secs_f64())
104            .unwrap_or(0.0);
105        let mut buckets = self.buckets.lock().unwrap();
106        let bucket = buckets.entry(agent_did.to_string()).or_default();
107        while let Some(&front) = bucket.front() {
108            if front < now - 60.0 {
109                bucket.pop_front();
110            } else {
111                break;
112            }
113        }
114        bucket.push_back(now);
115        bucket.len()
116    }
117}
118
119#[derive(Clone)]
120struct AppState {
121    packs: Arc<Vec<PolicyPack>>,
122    remote_packs: Arc<Vec<RemotePack>>,
123    pack_source_fingerprint: Arc<String>,
124    audit: Arc<dyn AuditSink>,
125    rate: Arc<RateState>,
126    upstream: Option<UpstreamClient>,
127    receipt_counter: Arc<std::sync::atomic::AtomicU64>,
128}
129
130#[tokio::main]
131async fn main() -> std::process::ExitCode {
132    let args = Args::parse();
133
134    let level = if args.verbose { "debug" } else { "info" };
135    tracing_subscriber::fmt()
136        .with_env_filter(
137            tracing_subscriber::EnvFilter::try_from_default_env()
138                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level)),
139        )
140        .with_target(false)
141        .init();
142
143    if args.policy_pack.is_empty() && args.pack_source.is_none() {
144        tracing::warn!("No --policy-pack or --pack-source supplied; daemon will ALLOW everything.");
145    }
146
147    // Remote pack fetch (inventory-only — see pack_fetcher module docs).
148    let remote_packs: Vec<RemotePack> = if let Some(base) = args.pack_source.as_deref() {
149        match pack_fetcher::fetch_all(base, &args.issuer_pubkey_hex).await {
150            Ok(v) => {
151                tracing::info!(
152                    "pack_source={} verified_packs={} (inventory-only; reference daemon does not execute OPA Rego v1)",
153                    base,
154                    v.len()
155                );
156                v
157            }
158            Err(e) => {
159                tracing::error!("pack_source_failed url={} err={}", base, e);
160                return std::process::ExitCode::from(2);
161            }
162        }
163    } else {
164        Vec::new()
165    };
166    let pack_source_fingerprint = pack_fetcher::fingerprint(&remote_packs);
167
168    // PACK-ID DEDUP (Audit-B SEV-LOW closure 2026-05-25): refuse to load two
169    // packs with the same pack_id. Spec's rollback-protection design depends
170    // on uniqueness; without dedup, second-loaded-of-same-id silently joins
171    // the eval set and pack_ids returned by /health are ambiguous.
172    let mut packs: Vec<PolicyPack> = Vec::new();
173    let mut seen_pack_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
174    for path in &args.policy_pack {
175        match load_pack(path) {
176            Ok(pack) => {
177                if !seen_pack_ids.insert(pack.pack_id.clone()) {
178                    tracing::error!(
179                        "pack_id_collision: refusing to load duplicate pack_id={} from {} (already loaded earlier)",
180                        pack.pack_id,
181                        path.display()
182                    );
183                    return std::process::ExitCode::from(2);
184                }
185                if pack
186                    .signature
187                    .value
188                    .as_deref()
189                    .map(|v| v == "REFERENCE_PACK_NOT_SIGNED_LOCAL_DEV_ONLY")
190                    .unwrap_or(false)
191                {
192                    tracing::warn!(
193                        "pack={} ships UNSIGNED — local dev only; prod must sign + verify against issuer JWKS",
194                        pack.pack_id
195                    );
196                }
197                tracing::info!(
198                    "loaded pack={} rules={}",
199                    pack.pack_id,
200                    pack.rules.len()
201                );
202                packs.push(pack);
203            }
204            Err(e) => {
205                tracing::error!("pack_load_failed path={} err={}", path.display(), e);
206                return std::process::ExitCode::from(2);
207            }
208        }
209    }
210
211    // SQLite-backed audit ledger with append-only triggers (BEFORE DELETE +
212    // BEFORE UPDATE both RAISE(ABORT)). Same DDL as the Python reference.
213    let audit: Arc<dyn AuditSink> = match SqliteAuditLog::open(&args.audit_db) {
214        Ok(s) => {
215            tracing::info!("audit ledger ready at {}", args.audit_db.display());
216            Arc::new(s)
217        }
218        Err(e) => {
219            tracing::error!(
220                "audit_db_open_failed path={} err={}",
221                args.audit_db.display(),
222                e
223            );
224            return std::process::ExitCode::from(2);
225        }
226    };
227
228    let upstream = args.upstream_protect.as_ref().map(|u| {
229        tracing::info!("upstream_protect_url={}", u);
230        UpstreamClient::new(u.clone())
231    });
232
233    let state = AppState {
234        packs: Arc::new(packs),
235        remote_packs: Arc::new(remote_packs),
236        pack_source_fingerprint: Arc::new(pack_source_fingerprint),
237        audit,
238        rate: Arc::new(RateState::default()),
239        upstream,
240        receipt_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
241    };
242
243    let app = Router::new()
244        .route("/health", get(health))
245        .route("/api/protect/health", get(health))
246        .route("/api/protect", post(protect))
247        .fallback(not_found)
248        .with_state(state);
249
250    let addr = format!("127.0.0.1:{}", args.port);
251    let listener = match tokio::net::TcpListener::bind(&addr).await {
252        Ok(l) => l,
253        Err(e) => {
254            tracing::error!("bind failed on {}: {}", addr, e);
255            return std::process::ExitCode::from(2);
256        }
257    };
258    tracing::info!(
259        "AiEGIS Harness daemon listening on http://{}  (POST /api/protect)",
260        addr
261    );
262
263    // Ctrl-C support so the smoke test's kill is clean.
264    let shutdown = async {
265        let _ = tokio::signal::ctrl_c().await;
266        tracing::info!("shutting down");
267    };
268
269    if let Err(e) = axum::serve(listener, app).with_graceful_shutdown(shutdown).await {
270        tracing::error!("server error: {}", e);
271        return std::process::ExitCode::from(1);
272    }
273    std::process::ExitCode::SUCCESS
274}
275
276async fn health(State(state): State<AppState>) -> impl IntoResponse {
277    let receipt_count = state
278        .receipt_counter
279        .load(std::sync::atomic::Ordering::Relaxed);
280    let body = json!({
281        "status": "ready",
282        "version": DAEMON_VERSION,
283        "spec_version": SPEC_VERSION,
284        "packs_loaded": state.packs.len(),
285        "pack_ids": state.packs.iter().map(|p| &p.pack_id).collect::<Vec<_>>(),
286        "remote_packs_loaded": state.remote_packs.len(),
287        "remote_pack_ids": state.remote_packs.iter()
288            .map(|p| format!("{}@{}", p.name, p.version)).collect::<Vec<_>>(),
289        "pack_source_fingerprint": state.pack_source_fingerprint.as_str(),
290        "remote_packs_inventory_only": true,
291        "upstream_protect_configured": state.upstream.is_some(),
292        "receipts_minted": receipt_count,
293    });
294    (StatusCode::OK, json_with_version_header(body))
295}
296
297async fn not_found() -> impl IntoResponse {
298    (
299        StatusCode::NOT_FOUND,
300        json_with_version_header(json!({"error": "not_found"})),
301    )
302}
303
304async fn protect(
305    State(state): State<AppState>,
306    headers: HeaderMap,
307    body: axum::body::Bytes,
308) -> Response {
309    let t0 = Instant::now();
310
311    let payload: Value = if body.is_empty() {
312        json!({})
313    } else {
314        match serde_json::from_slice::<Value>(&body) {
315            Ok(v) => v,
316            Err(_) => {
317                let err_body = json!({
318                    "decision": "DENY",
319                    "reason": "malformed_json",
320                    "layer": "L?",
321                    "decision_ms": 0,
322                });
323                return (StatusCode::BAD_REQUEST, json_with_version_header(err_body))
324                    .into_response();
325            }
326        }
327    };
328
329    let agent_tag = headers
330        .get("X-AEGIS-Tag")
331        .and_then(|h| h.to_str().ok())
332        .unwrap_or("")
333        .to_string();
334
335    // Lift agent_did from JWT-sub if a 3-part dot token is present, without
336    // verifying the signature (matches the Python reference's behaviour).
337    let agent_did = extract_agent_did(&agent_tag);
338
339    // Sliding-window counter for rate-limit pack.
340    let actions = state.rate.observe(agent_did.as_deref().unwrap_or("anonymous"));
341    let daemon_state = json!({"agent_actions_last_minute": actions});
342
343    let result = evaluate_packs(&state.packs, &payload, &daemon_state);
344
345    // Upstream receipt mint — only when --upstream-protect is configured
346    // AND the local decision is non-DENY (we never invite an upstream to
347    // override our DENY; per spec, DENY is terminal at the daemon).
348    let (receipt_id, upstream_error) = if let Some(client) = &state.upstream {
349        if result.decision != "DENY" {
350            match client.post(&payload, &agent_tag).await {
351                Ok(rcp) => {
352                    state
353                        .receipt_counter
354                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
355                    (Some(rcp.rid), false)
356                }
357                Err(e) => {
358                    tracing::warn!("upstream_post_failed err={}", e);
359                    (None, true)
360                }
361            }
362        } else {
363            (None, false)
364        }
365    } else {
366        (None, false)
367    };
368
369    let decision_ms = t0.elapsed().as_millis() as u64;
370
371    let entry = AuditEntry::build_with_receipt(
372        agent_did.clone(),
373        payload.get("action").and_then(|v| v.as_str()).map(String::from),
374        payload.get("target").and_then(|v| v.as_str()).map(String::from),
375        result.decision.to_string(),
376        result.deciding_pack.clone(),
377        result.deciding_rule.clone(),
378        result.deciding_layer.clone(),
379        result.reason.clone(),
380        decision_ms,
381        receipt_id.clone(),
382        upstream_error,
383    );
384    // Fail-closed on ledger write failure: a caller seeing ALLOW with no
385    // ledger row is un-governable. Switch to DENY 503 if the ledger refuses
386    // the row (schema-tamper, file unlinked, disk full, trigger-bypass).
387    // Audit Agent D 2026-05-25.
388    if let Err(ledger_err) = state.audit.try_append(entry) {
389        tracing::error!("ledger_append_failed_failing_closed: {}", ledger_err);
390        let resp = json!({
391            "decision": "DENY",
392            "reason": "ledger_unavailable",
393            "layer": "L0-AuditLedger",
394            "deciding_pack": null,
395            "deciding_rule": null,
396            "decision_ms": decision_ms,
397            "agent_did": agent_did,
398            "receipt_id": null,
399            "upstream_error": upstream_error,
400            "ledger_error": ledger_err,
401        });
402        return (StatusCode::SERVICE_UNAVAILABLE, json_with_version_header(resp))
403            .into_response();
404    }
405
406    let status = if result.decision == "ALLOW" {
407        StatusCode::OK
408    } else {
409        StatusCode::UNAUTHORIZED
410    };
411    let resp = json!({
412        "decision": result.decision,
413        "reason": result.reason,
414        "layer": result.deciding_layer,
415        "deciding_pack": result.deciding_pack,
416        "deciding_rule": result.deciding_rule,
417        "decision_ms": decision_ms,
418        "agent_did": agent_did,
419        "receipt_id": receipt_id,
420        "upstream_error": upstream_error,
421    });
422    (status, json_with_version_header(resp)).into_response()
423}
424
425fn extract_agent_did(tag: &str) -> Option<String> {
426    // RAV audit 2026-05-25: bound the inputs we touch from an untrusted
427    // X-AEGIS-Tag header.
428    //   - reject obviously-absurd-length tags (default JWTs are <2 KB; we
429    //     allow 8 KB headroom). This prevents an attacker from spamming
430    //     multi-MB X-AEGIS-Tag values that allocate a base64 buffer per
431    //     request.
432    //   - cap the returned `agent_did` (sub claim) at 256 chars. The
433    //     value is used as a HashMap key in `RateState::buckets` AND
434    //     persisted to the audit DB; unbounded values let an
435    //     unauthenticated attacker grow daemon RSS by spamming requests
436    //     with distinct large `sub` values.
437    const MAX_TAG_BYTES: usize = 8 * 1024;
438    const MAX_DID_CHARS: usize = 256;
439    if tag.len() > MAX_TAG_BYTES {
440        return None;
441    }
442    if tag.matches('.').count() != 2 {
443        return None;
444    }
445    let parts: Vec<&str> = tag.split('.').collect();
446    let mut payload_seg = parts[1].to_string();
447    while payload_seg.len() % 4 != 0 {
448        payload_seg.push('=');
449    }
450    // URL-safe base64, accept both padded and unpadded forms.
451    let decoded = URL_SAFE_NO_PAD
452        .decode(payload_seg.trim_end_matches('='))
453        .ok()?;
454    let claims: Value = serde_json::from_slice(&decoded).ok()?;
455    let sub = claims.get("sub").and_then(|v| v.as_str())?;
456    if sub.chars().count() > MAX_DID_CHARS {
457        return None;
458    }
459    Some(sub.to_string())
460}
461
462/// Helper: render JSON with the standard X-AEGIS-Harness-Version header.
463fn json_with_version_header(body: Value) -> Response {
464    let mut resp = Json(body).into_response();
465    if let Ok(val) = axum::http::HeaderValue::from_str(DAEMON_VERSION) {
466        resp.headers_mut().insert("X-AEGIS-Harness-Version", val);
467    }
468    resp
469}