Skip to main content

harness_core/
audit.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2//! Append-only audit ledger.
3//!
4//! Day 1: in-memory `Vec` behind a `Mutex` so the binary can pass the smoke
5//! test without a SQLite dependency. The trait is shaped so Day 2 can drop in
6//! a `rusqlite` implementation with the same DDL + triggers from the Python
7//! reference (`LEDGER_DDL` in `harness.py`) without touching callers.
8
9use chrono::Utc;
10use rusqlite::{params, Connection};
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::path::{Path, PathBuf};
14use std::sync::Mutex;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct AuditEntry {
18    pub ts: String,
19    pub agent_did: Option<String>,
20    pub action: Option<String>,
21    pub target: Option<String>,
22    pub decision: String,
23    pub deciding_pack: Option<String>,
24    pub deciding_rule: Option<String>,
25    pub deciding_layer: Option<String>,
26    pub deny_reason: Option<String>,
27    pub decision_ms: u64,
28    pub payload_sha256: String,
29    /// Upstream receipt id (e.g. Velo's `rcp_…`) when --upstream-protect is
30    /// configured and the upstream POST succeeds. NULL otherwise.
31    #[serde(default)]
32    pub receipt_id: Option<String>,
33    /// True when --upstream-protect was configured but the upstream POST
34    /// failed (network error, 5xx, malformed response). The daemon falls
35    /// back to its local decision in that case; this column lets auditors
36    /// see the degraded path.
37    #[serde(default)]
38    pub upstream_error: bool,
39}
40
41impl AuditEntry {
42    /// Build an entry from the evaluation result + payload, computing the
43    /// canonical payload_sha256 the same way the Python ref does:
44    ///   sha256(json.dumps(entry, sort_keys=True, separators=(",", ":")))
45    pub fn build(
46        agent_did: Option<String>,
47        action: Option<String>,
48        target: Option<String>,
49        decision: String,
50        deciding_pack: Option<String>,
51        deciding_rule: Option<String>,
52        deciding_layer: Option<String>,
53        deny_reason: Option<String>,
54        decision_ms: u64,
55    ) -> Self {
56        Self::build_with_receipt(
57            agent_did,
58            action,
59            target,
60            decision,
61            deciding_pack,
62            deciding_rule,
63            deciding_layer,
64            deny_reason,
65            decision_ms,
66            None,
67            false,
68        )
69    }
70
71    /// Same as `build`, plus the upstream `receipt_id` and `upstream_error`
72    /// flag. The payload_sha256 covers the receipt_id too — auditors can
73    /// detect tampering on the upstream link.
74    #[allow(clippy::too_many_arguments)]
75    pub fn build_with_receipt(
76        agent_did: Option<String>,
77        action: Option<String>,
78        target: Option<String>,
79        decision: String,
80        deciding_pack: Option<String>,
81        deciding_rule: Option<String>,
82        deciding_layer: Option<String>,
83        deny_reason: Option<String>,
84        decision_ms: u64,
85        receipt_id: Option<String>,
86        upstream_error: bool,
87    ) -> Self {
88        let ts = Utc::now().to_rfc3339();
89        let canonical = serde_json::json!({
90            "ts": ts,
91            "agent_did": agent_did,
92            "action": action,
93            "target": target,
94            "decision": decision,
95            "deciding_pack": deciding_pack,
96            "deciding_rule": deciding_rule,
97            "deciding_layer": deciding_layer,
98            "deny_reason": deny_reason,
99            "decision_ms": decision_ms,
100            "receipt_id": receipt_id,
101            "upstream_error": upstream_error,
102        });
103        let bytes = serde_json::to_vec(&canonical).unwrap_or_default();
104        let mut hasher = Sha256::new();
105        hasher.update(&bytes);
106        let payload_sha256 = hex(&hasher.finalize());
107        Self {
108            ts,
109            agent_did,
110            action,
111            target,
112            decision,
113            deciding_pack,
114            deciding_rule,
115            deciding_layer,
116            deny_reason,
117            decision_ms,
118            payload_sha256,
119            receipt_id,
120            upstream_error,
121        }
122    }
123}
124
125fn hex(bytes: &[u8]) -> String {
126    const HEX: &[u8] = b"0123456789abcdef";
127    let mut s = String::with_capacity(bytes.len() * 2);
128    for b in bytes {
129        s.push(HEX[(b >> 4) as usize] as char);
130        s.push(HEX[(b & 0x0f) as usize] as char);
131    }
132    s
133}
134
135/// Audit-log writer trait.
136///
137/// Two append paths:
138///   * `append` — fire-and-forget, kept for the in-memory sink and callers
139///     that explicitly accept silent loss (tests, dry-run probes).
140///   * `try_append` — returns an error string when the write FAILS at the
141///     storage layer (SQLite locked, file unlinked, trigger-bypass detected,
142///     schema-tamper detected). The HTTP handler MUST use `try_append` and
143///     fail the request closed (DENY 503) if the ledger refuses the row,
144///     because a caller who sees ALLOW with no ledger entry has an
145///     un-governable action. Audit Agent D 2026-05-25.
146pub trait AuditSink: Send + Sync {
147    fn append(&self, entry: AuditEntry);
148    /// Strict append. Default delegates to `append` and reports success;
149    /// real backends override to surface real failures.
150    fn try_append(&self, entry: AuditEntry) -> Result<(), String> {
151        self.append(entry);
152        Ok(())
153    }
154    fn len(&self) -> usize;
155    fn is_empty(&self) -> bool {
156        self.len() == 0
157    }
158}
159
160/// Day-1 in-memory implementation. Day-2: swap for SQLite-backed sink.
161pub struct AuditLog {
162    rows: Mutex<Vec<AuditEntry>>,
163}
164
165impl AuditLog {
166    pub fn new() -> Self {
167        Self {
168            rows: Mutex::new(Vec::new()),
169        }
170    }
171
172    /// Test-only snapshot of all rows.
173    pub fn snapshot(&self) -> Vec<AuditEntry> {
174        self.rows.lock().unwrap().clone()
175    }
176}
177
178impl Default for AuditLog {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184impl AuditSink for AuditLog {
185    fn append(&self, entry: AuditEntry) {
186        self.rows.lock().unwrap().push(entry);
187    }
188    fn len(&self) -> usize {
189        self.rows.lock().unwrap().len()
190    }
191}
192
193/// SQLite-backed append-only audit ledger.
194///
195/// Direct port of `LEDGER_DDL` from `harness.py`: same table, same columns,
196/// same `BEFORE DELETE` and `BEFORE UPDATE` triggers that ABORT.
197/// Full DDL, kept as a single string for callers / tooling that want to
198/// inspect the canonical schema. Internally we split into two phases (see
199/// `LEDGER_DDL_CREATE` + `LEDGER_DDL_POST_MIGRATE`) so we can ALTER
200/// pre-v0.2 DBs in between.
201pub const LEDGER_DDL: &str = "\
202CREATE TABLE IF NOT EXISTS harness_audit_log (
203    seq             INTEGER PRIMARY KEY AUTOINCREMENT,
204    ts              TEXT NOT NULL,
205    agent_did       TEXT,
206    action          TEXT,
207    target          TEXT,
208    decision        TEXT NOT NULL,
209    deciding_pack   TEXT,
210    deciding_rule   TEXT,
211    deciding_layer  TEXT,
212    deny_reason     TEXT,
213    decision_ms     INTEGER,
214    payload_sha256  TEXT NOT NULL,
215    receipt_id      TEXT,
216    upstream_error  INTEGER NOT NULL DEFAULT 0
217);
218CREATE INDEX IF NOT EXISTS idx_audit_ts ON harness_audit_log(ts);
219CREATE INDEX IF NOT EXISTS idx_audit_agent ON harness_audit_log(agent_did);
220CREATE INDEX IF NOT EXISTS idx_audit_receipt ON harness_audit_log(receipt_id);
221
222CREATE TRIGGER IF NOT EXISTS trg_audit_no_delete
223BEFORE DELETE ON harness_audit_log
224BEGIN
225  SELECT RAISE(ABORT, 'harness_audit_log is append-only');
226END;
227
228CREATE TRIGGER IF NOT EXISTS trg_audit_no_update
229BEFORE UPDATE ON harness_audit_log
230BEGIN
231  SELECT RAISE(ABORT, 'harness_audit_log is append-only');
232END;
233";
234
235const LEDGER_DDL_CREATE: &str = "\
236CREATE TABLE IF NOT EXISTS harness_audit_log (
237    seq             INTEGER PRIMARY KEY AUTOINCREMENT,
238    ts              TEXT NOT NULL,
239    agent_did       TEXT,
240    action          TEXT,
241    target          TEXT,
242    decision        TEXT NOT NULL,
243    deciding_pack   TEXT,
244    deciding_rule   TEXT,
245    deciding_layer  TEXT,
246    deny_reason     TEXT,
247    decision_ms     INTEGER,
248    payload_sha256  TEXT NOT NULL,
249    receipt_id      TEXT,
250    upstream_error  INTEGER NOT NULL DEFAULT 0
251);
252CREATE INDEX IF NOT EXISTS idx_audit_ts ON harness_audit_log(ts);
253CREATE INDEX IF NOT EXISTS idx_audit_agent ON harness_audit_log(agent_did);
254";
255
256const LEDGER_DDL_POST_MIGRATE: &str = "\
257CREATE INDEX IF NOT EXISTS idx_audit_receipt ON harness_audit_log(receipt_id);
258
259CREATE TRIGGER IF NOT EXISTS trg_audit_no_delete
260BEFORE DELETE ON harness_audit_log
261BEGIN
262  SELECT RAISE(ABORT, 'harness_audit_log is append-only');
263END;
264
265CREATE TRIGGER IF NOT EXISTS trg_audit_no_update
266BEFORE UPDATE ON harness_audit_log
267BEGIN
268  SELECT RAISE(ABORT, 'harness_audit_log is append-only');
269END;
270";
271
272/// Additive schema migration for databases created by v0.1.0 (which
273/// lacked receipt_id / upstream_error columns). Safe-by-design: each
274/// `ALTER TABLE ADD COLUMN` is wrapped so missing-column reuses don't
275/// crash on already-migrated DBs. SQLite has no `ADD COLUMN IF NOT
276/// EXISTS`, so we probe via pragma.
277fn migrate_audit_log(conn: &Connection) -> Result<(), rusqlite::Error> {
278    let mut has_receipt = false;
279    let mut has_upstream_err = false;
280    {
281        let mut stmt = conn.prepare("PRAGMA table_info(harness_audit_log)")?;
282        let mut rows = stmt.query([])?;
283        while let Some(row) = rows.next()? {
284            let name: String = row.get(1)?;
285            if name == "receipt_id" {
286                has_receipt = true;
287            } else if name == "upstream_error" {
288                has_upstream_err = true;
289            }
290        }
291    }
292    if !has_receipt {
293        conn.execute("ALTER TABLE harness_audit_log ADD COLUMN receipt_id TEXT", [])?;
294    }
295    if !has_upstream_err {
296        conn.execute(
297            "ALTER TABLE harness_audit_log ADD COLUMN upstream_error INTEGER NOT NULL DEFAULT 0",
298            [],
299        )?;
300    }
301    conn.execute(
302        "CREATE INDEX IF NOT EXISTS idx_audit_receipt ON harness_audit_log(receipt_id)",
303        [],
304    )?;
305    Ok(())
306}
307
308pub struct SqliteAuditLog {
309    path: PathBuf,
310    conn: Mutex<Connection>,
311}
312
313impl SqliteAuditLog {
314    pub fn open(path: &Path) -> Result<Self, rusqlite::Error> {
315        let conn = Connection::open(path)?;
316        // Run DDL split in two phases so we can ALTER between them:
317        //   1. CREATE TABLE / CREATE INDEX (excluding receipt_id index).
318        //   2. Migrate (add receipt_id / upstream_error columns if missing).
319        //   3. CREATE INDEX on receipt_id + the triggers.
320        conn.execute_batch(LEDGER_DDL_CREATE)?;
321        migrate_audit_log(&conn)?;
322        conn.execute_batch(LEDGER_DDL_POST_MIGRATE)?;
323        Ok(Self {
324            path: path.to_path_buf(),
325            conn: Mutex::new(conn),
326        })
327    }
328
329    pub fn path(&self) -> &Path {
330        &self.path
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use std::env;
338
339    #[test]
340    fn sqlite_append_then_delete_aborts() {
341        let mut p = env::temp_dir();
342        p.push(format!(
343            "aiegis_test_audit_{}.db",
344            std::process::id()
345        ));
346        let _ = std::fs::remove_file(&p);
347        let sink = SqliteAuditLog::open(&p).expect("open");
348        sink.append(AuditEntry::build(
349            None,
350            Some("tool.read_file".into()),
351            None,
352            "ALLOW".into(),
353            None,
354            None,
355            None,
356            None,
357            1,
358        ));
359        assert_eq!(sink.len(), 1);
360        // DELETE must abort.
361        let conn = Connection::open(&p).unwrap();
362        let res = conn.execute("DELETE FROM harness_audit_log WHERE seq=1", []);
363        assert!(res.is_err(), "DELETE should have been rejected by trigger");
364        let _ = std::fs::remove_file(&p);
365    }
366}
367
368impl SqliteAuditLog {
369    /// Fail-closed integrity check. Verifies that:
370    ///   1. `harness_audit_log` exists (defends against `ALTER TABLE RENAME`).
371    ///   2. Both `BEFORE DELETE` + `BEFORE UPDATE` triggers are still present
372    ///      and target the canonical table (defends against `DROP TRIGGER`
373    ///      and the rename-then-recreate bypass found by Audit Agent D
374    ///      2026-05-25).
375    /// Cheap (two indexed sqlite_master lookups) — runs on every append.
376    fn schema_intact(conn: &Connection) -> Result<(), String> {
377        let table_count: i64 = conn
378            .query_row(
379                "SELECT COUNT(*) FROM sqlite_master \
380                 WHERE type='table' AND name='harness_audit_log'",
381                [],
382                |r| r.get(0),
383            )
384            .map_err(|e| format!("schema_probe_failed:{e}"))?;
385        if table_count != 1 {
386            return Err("schema_tamper:canonical_table_missing".to_string());
387        }
388        let trigger_count: i64 = conn
389            .query_row(
390                "SELECT COUNT(*) FROM sqlite_master \
391                 WHERE type='trigger' \
392                   AND tbl_name='harness_audit_log' \
393                   AND name IN ('trg_audit_no_delete','trg_audit_no_update')",
394                [],
395                |r| r.get(0),
396            )
397            .map_err(|e| format!("schema_probe_failed:{e}"))?;
398        if trigger_count != 2 {
399            return Err(format!(
400                "schema_tamper:append_only_triggers_missing:{trigger_count}/2"
401            ));
402        }
403        Ok(())
404    }
405}
406
407impl AuditSink for SqliteAuditLog {
408    fn append(&self, entry: AuditEntry) {
409        if let Err(e) = self.try_append(entry) {
410            tracing::error!("ledger_append_failed: {}", e);
411        }
412    }
413    fn try_append(&self, entry: AuditEntry) -> Result<(), String> {
414        let conn = self.conn.lock().unwrap();
415        // Detect schema-tamper (rename-then-recreate, dropped triggers).
416        SqliteAuditLog::schema_intact(&conn)?;
417        conn.execute(
418            "INSERT INTO harness_audit_log \
419             (ts, agent_did, action, target, decision, deciding_pack, \
420              deciding_rule, deciding_layer, deny_reason, decision_ms, \
421              payload_sha256, receipt_id, upstream_error) \
422             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
423            params![
424                entry.ts,
425                entry.agent_did,
426                entry.action,
427                entry.target,
428                entry.decision,
429                entry.deciding_pack,
430                entry.deciding_rule,
431                entry.deciding_layer,
432                entry.deny_reason,
433                entry.decision_ms as i64,
434                entry.payload_sha256,
435                entry.receipt_id,
436                if entry.upstream_error { 1i64 } else { 0i64 },
437            ],
438        )
439        .map(|_| ())
440        .map_err(|e| format!("insert_failed:{e}"))
441    }
442    fn len(&self) -> usize {
443        let conn = self.conn.lock().unwrap();
444        conn.query_row("SELECT COUNT(*) FROM harness_audit_log", [], |r| r.get::<_, i64>(0))
445            .map(|n| n as usize)
446            .unwrap_or(0)
447    }
448}