1use chrono::Utc;
10use rusqlite::{params, Connection};
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::path::{Path, PathBuf};
14use std::sync::Mutex;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct AuditEntry {
18 pub ts: String,
19 pub agent_did: Option<String>,
20 pub action: Option<String>,
21 pub target: Option<String>,
22 pub decision: String,
23 pub deciding_pack: Option<String>,
24 pub deciding_rule: Option<String>,
25 pub deciding_layer: Option<String>,
26 pub deny_reason: Option<String>,
27 pub decision_ms: u64,
28 pub payload_sha256: String,
29 #[serde(default)]
32 pub receipt_id: Option<String>,
33 #[serde(default)]
38 pub upstream_error: bool,
39}
40
41impl AuditEntry {
42 pub fn build(
46 agent_did: Option<String>,
47 action: Option<String>,
48 target: Option<String>,
49 decision: String,
50 deciding_pack: Option<String>,
51 deciding_rule: Option<String>,
52 deciding_layer: Option<String>,
53 deny_reason: Option<String>,
54 decision_ms: u64,
55 ) -> Self {
56 Self::build_with_receipt(
57 agent_did,
58 action,
59 target,
60 decision,
61 deciding_pack,
62 deciding_rule,
63 deciding_layer,
64 deny_reason,
65 decision_ms,
66 None,
67 false,
68 )
69 }
70
71 #[allow(clippy::too_many_arguments)]
75 pub fn build_with_receipt(
76 agent_did: Option<String>,
77 action: Option<String>,
78 target: Option<String>,
79 decision: String,
80 deciding_pack: Option<String>,
81 deciding_rule: Option<String>,
82 deciding_layer: Option<String>,
83 deny_reason: Option<String>,
84 decision_ms: u64,
85 receipt_id: Option<String>,
86 upstream_error: bool,
87 ) -> Self {
88 let ts = Utc::now().to_rfc3339();
89 let canonical = serde_json::json!({
90 "ts": ts,
91 "agent_did": agent_did,
92 "action": action,
93 "target": target,
94 "decision": decision,
95 "deciding_pack": deciding_pack,
96 "deciding_rule": deciding_rule,
97 "deciding_layer": deciding_layer,
98 "deny_reason": deny_reason,
99 "decision_ms": decision_ms,
100 "receipt_id": receipt_id,
101 "upstream_error": upstream_error,
102 });
103 let bytes = serde_json::to_vec(&canonical).unwrap_or_default();
104 let mut hasher = Sha256::new();
105 hasher.update(&bytes);
106 let payload_sha256 = hex(&hasher.finalize());
107 Self {
108 ts,
109 agent_did,
110 action,
111 target,
112 decision,
113 deciding_pack,
114 deciding_rule,
115 deciding_layer,
116 deny_reason,
117 decision_ms,
118 payload_sha256,
119 receipt_id,
120 upstream_error,
121 }
122 }
123}
124
125fn hex(bytes: &[u8]) -> String {
126 const HEX: &[u8] = b"0123456789abcdef";
127 let mut s = String::with_capacity(bytes.len() * 2);
128 for b in bytes {
129 s.push(HEX[(b >> 4) as usize] as char);
130 s.push(HEX[(b & 0x0f) as usize] as char);
131 }
132 s
133}
134
135pub trait AuditSink: Send + Sync {
147 fn append(&self, entry: AuditEntry);
148 fn try_append(&self, entry: AuditEntry) -> Result<(), String> {
151 self.append(entry);
152 Ok(())
153 }
154 fn len(&self) -> usize;
155 fn is_empty(&self) -> bool {
156 self.len() == 0
157 }
158}
159
160pub struct AuditLog {
162 rows: Mutex<Vec<AuditEntry>>,
163}
164
165impl AuditLog {
166 pub fn new() -> Self {
167 Self {
168 rows: Mutex::new(Vec::new()),
169 }
170 }
171
172 pub fn snapshot(&self) -> Vec<AuditEntry> {
174 self.rows.lock().unwrap().clone()
175 }
176}
177
178impl Default for AuditLog {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184impl AuditSink for AuditLog {
185 fn append(&self, entry: AuditEntry) {
186 self.rows.lock().unwrap().push(entry);
187 }
188 fn len(&self) -> usize {
189 self.rows.lock().unwrap().len()
190 }
191}
192
193pub const LEDGER_DDL: &str = "\
202CREATE TABLE IF NOT EXISTS harness_audit_log (
203 seq INTEGER PRIMARY KEY AUTOINCREMENT,
204 ts TEXT NOT NULL,
205 agent_did TEXT,
206 action TEXT,
207 target TEXT,
208 decision TEXT NOT NULL,
209 deciding_pack TEXT,
210 deciding_rule TEXT,
211 deciding_layer TEXT,
212 deny_reason TEXT,
213 decision_ms INTEGER,
214 payload_sha256 TEXT NOT NULL,
215 receipt_id TEXT,
216 upstream_error INTEGER NOT NULL DEFAULT 0
217);
218CREATE INDEX IF NOT EXISTS idx_audit_ts ON harness_audit_log(ts);
219CREATE INDEX IF NOT EXISTS idx_audit_agent ON harness_audit_log(agent_did);
220CREATE INDEX IF NOT EXISTS idx_audit_receipt ON harness_audit_log(receipt_id);
221
222CREATE TRIGGER IF NOT EXISTS trg_audit_no_delete
223BEFORE DELETE ON harness_audit_log
224BEGIN
225 SELECT RAISE(ABORT, 'harness_audit_log is append-only');
226END;
227
228CREATE TRIGGER IF NOT EXISTS trg_audit_no_update
229BEFORE UPDATE ON harness_audit_log
230BEGIN
231 SELECT RAISE(ABORT, 'harness_audit_log is append-only');
232END;
233";
234
235const LEDGER_DDL_CREATE: &str = "\
236CREATE TABLE IF NOT EXISTS harness_audit_log (
237 seq INTEGER PRIMARY KEY AUTOINCREMENT,
238 ts TEXT NOT NULL,
239 agent_did TEXT,
240 action TEXT,
241 target TEXT,
242 decision TEXT NOT NULL,
243 deciding_pack TEXT,
244 deciding_rule TEXT,
245 deciding_layer TEXT,
246 deny_reason TEXT,
247 decision_ms INTEGER,
248 payload_sha256 TEXT NOT NULL,
249 receipt_id TEXT,
250 upstream_error INTEGER NOT NULL DEFAULT 0
251);
252CREATE INDEX IF NOT EXISTS idx_audit_ts ON harness_audit_log(ts);
253CREATE INDEX IF NOT EXISTS idx_audit_agent ON harness_audit_log(agent_did);
254";
255
256const LEDGER_DDL_POST_MIGRATE: &str = "\
257CREATE INDEX IF NOT EXISTS idx_audit_receipt ON harness_audit_log(receipt_id);
258
259CREATE TRIGGER IF NOT EXISTS trg_audit_no_delete
260BEFORE DELETE ON harness_audit_log
261BEGIN
262 SELECT RAISE(ABORT, 'harness_audit_log is append-only');
263END;
264
265CREATE TRIGGER IF NOT EXISTS trg_audit_no_update
266BEFORE UPDATE ON harness_audit_log
267BEGIN
268 SELECT RAISE(ABORT, 'harness_audit_log is append-only');
269END;
270";
271
272fn migrate_audit_log(conn: &Connection) -> Result<(), rusqlite::Error> {
278 let mut has_receipt = false;
279 let mut has_upstream_err = false;
280 {
281 let mut stmt = conn.prepare("PRAGMA table_info(harness_audit_log)")?;
282 let mut rows = stmt.query([])?;
283 while let Some(row) = rows.next()? {
284 let name: String = row.get(1)?;
285 if name == "receipt_id" {
286 has_receipt = true;
287 } else if name == "upstream_error" {
288 has_upstream_err = true;
289 }
290 }
291 }
292 if !has_receipt {
293 conn.execute("ALTER TABLE harness_audit_log ADD COLUMN receipt_id TEXT", [])?;
294 }
295 if !has_upstream_err {
296 conn.execute(
297 "ALTER TABLE harness_audit_log ADD COLUMN upstream_error INTEGER NOT NULL DEFAULT 0",
298 [],
299 )?;
300 }
301 conn.execute(
302 "CREATE INDEX IF NOT EXISTS idx_audit_receipt ON harness_audit_log(receipt_id)",
303 [],
304 )?;
305 Ok(())
306}
307
308pub struct SqliteAuditLog {
309 path: PathBuf,
310 conn: Mutex<Connection>,
311}
312
313impl SqliteAuditLog {
314 pub fn open(path: &Path) -> Result<Self, rusqlite::Error> {
315 let conn = Connection::open(path)?;
316 conn.execute_batch(LEDGER_DDL_CREATE)?;
321 migrate_audit_log(&conn)?;
322 conn.execute_batch(LEDGER_DDL_POST_MIGRATE)?;
323 Ok(Self {
324 path: path.to_path_buf(),
325 conn: Mutex::new(conn),
326 })
327 }
328
329 pub fn path(&self) -> &Path {
330 &self.path
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use std::env;
338
339 #[test]
340 fn sqlite_append_then_delete_aborts() {
341 let mut p = env::temp_dir();
342 p.push(format!(
343 "aiegis_test_audit_{}.db",
344 std::process::id()
345 ));
346 let _ = std::fs::remove_file(&p);
347 let sink = SqliteAuditLog::open(&p).expect("open");
348 sink.append(AuditEntry::build(
349 None,
350 Some("tool.read_file".into()),
351 None,
352 "ALLOW".into(),
353 None,
354 None,
355 None,
356 None,
357 1,
358 ));
359 assert_eq!(sink.len(), 1);
360 let conn = Connection::open(&p).unwrap();
362 let res = conn.execute("DELETE FROM harness_audit_log WHERE seq=1", []);
363 assert!(res.is_err(), "DELETE should have been rejected by trigger");
364 let _ = std::fs::remove_file(&p);
365 }
366}
367
368impl SqliteAuditLog {
369 fn schema_intact(conn: &Connection) -> Result<(), String> {
377 let table_count: i64 = conn
378 .query_row(
379 "SELECT COUNT(*) FROM sqlite_master \
380 WHERE type='table' AND name='harness_audit_log'",
381 [],
382 |r| r.get(0),
383 )
384 .map_err(|e| format!("schema_probe_failed:{e}"))?;
385 if table_count != 1 {
386 return Err("schema_tamper:canonical_table_missing".to_string());
387 }
388 let trigger_count: i64 = conn
389 .query_row(
390 "SELECT COUNT(*) FROM sqlite_master \
391 WHERE type='trigger' \
392 AND tbl_name='harness_audit_log' \
393 AND name IN ('trg_audit_no_delete','trg_audit_no_update')",
394 [],
395 |r| r.get(0),
396 )
397 .map_err(|e| format!("schema_probe_failed:{e}"))?;
398 if trigger_count != 2 {
399 return Err(format!(
400 "schema_tamper:append_only_triggers_missing:{trigger_count}/2"
401 ));
402 }
403 Ok(())
404 }
405}
406
407impl AuditSink for SqliteAuditLog {
408 fn append(&self, entry: AuditEntry) {
409 if let Err(e) = self.try_append(entry) {
410 tracing::error!("ledger_append_failed: {}", e);
411 }
412 }
413 fn try_append(&self, entry: AuditEntry) -> Result<(), String> {
414 let conn = self.conn.lock().unwrap();
415 SqliteAuditLog::schema_intact(&conn)?;
417 conn.execute(
418 "INSERT INTO harness_audit_log \
419 (ts, agent_did, action, target, decision, deciding_pack, \
420 deciding_rule, deciding_layer, deny_reason, decision_ms, \
421 payload_sha256, receipt_id, upstream_error) \
422 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
423 params![
424 entry.ts,
425 entry.agent_did,
426 entry.action,
427 entry.target,
428 entry.decision,
429 entry.deciding_pack,
430 entry.deciding_rule,
431 entry.deciding_layer,
432 entry.deny_reason,
433 entry.decision_ms as i64,
434 entry.payload_sha256,
435 entry.receipt_id,
436 if entry.upstream_error { 1i64 } else { 0i64 },
437 ],
438 )
439 .map(|_| ())
440 .map_err(|e| format!("insert_failed:{e}"))
441 }
442 fn len(&self) -> usize {
443 let conn = self.conn.lock().unwrap();
444 conn.query_row("SELECT COUNT(*) FROM harness_audit_log", [], |r| r.get::<_, i64>(0))
445 .map(|n| n as usize)
446 .unwrap_or(0)
447 }
448}