1mod pack_fetcher;
10mod upstream;
11
12use axum::{
13 extract::State,
14 http::{HeaderMap, StatusCode},
15 response::{IntoResponse, Response},
16 routing::{get, post},
17 Json, Router,
18};
19use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
20use clap::Parser;
21use harness_core::{
22 audit::{AuditEntry, AuditSink, SqliteAuditLog},
23 eval::evaluate_packs,
24 load_pack, PolicyPack, DAEMON_VERSION, DEFAULT_PACK_ISSUER_PUBKEY_HEX, SPEC_VERSION,
25};
26use pack_fetcher::RemotePack;
27use serde_json::{json, Value};
28use std::collections::{HashMap, VecDeque};
29use std::path::PathBuf;
30use std::sync::{Arc, Mutex};
31use std::time::{Instant, SystemTime, UNIX_EPOCH};
32use upstream::UpstreamClient;
33
34#[derive(Parser, Debug)]
36#[command(
37 name = "aiegis-harness",
38 about = "AiEGIS Harness reference daemon (L2 / proxy mode).",
39 version
40)]
41struct Args {
42 #[arg(long, default_value_t = 8080)]
44 port: u16,
45
46 #[arg(long = "policy-pack", value_name = "PATH")]
48 policy_pack: Vec<PathBuf>,
49
50 #[arg(long, default_value = "harness_audit.db")]
54 audit_db: PathBuf,
55
56 #[arg(long, default_value_t = 60)]
58 rate_cap: u32,
59
60 #[arg(long, short)]
62 verbose: bool,
63
64 #[arg(long, value_name = "URL")]
75 pack_source: Option<String>,
76
77 #[arg(long, default_value = DEFAULT_PACK_ISSUER_PUBKEY_HEX)]
81 issuer_pubkey_hex: String,
82
83 #[arg(long, value_name = "URL")]
90 upstream_protect: Option<String>,
91}
92
93#[derive(Default)]
95struct RateState {
96 buckets: Mutex<HashMap<String, VecDeque<f64>>>,
97}
98
99impl RateState {
100 fn observe(&self, agent_did: &str) -> usize {
101 let now = SystemTime::now()
102 .duration_since(UNIX_EPOCH)
103 .map(|d| d.as_secs_f64())
104 .unwrap_or(0.0);
105 let mut buckets = self.buckets.lock().unwrap();
106 let bucket = buckets.entry(agent_did.to_string()).or_default();
107 while let Some(&front) = bucket.front() {
108 if front < now - 60.0 {
109 bucket.pop_front();
110 } else {
111 break;
112 }
113 }
114 bucket.push_back(now);
115 bucket.len()
116 }
117}
118
119#[derive(Clone)]
120struct AppState {
121 packs: Arc<Vec<PolicyPack>>,
122 remote_packs: Arc<Vec<RemotePack>>,
123 pack_source_fingerprint: Arc<String>,
124 audit: Arc<dyn AuditSink>,
125 rate: Arc<RateState>,
126 upstream: Option<UpstreamClient>,
127 receipt_counter: Arc<std::sync::atomic::AtomicU64>,
128}
129
130#[tokio::main]
131async fn main() -> std::process::ExitCode {
132 let args = Args::parse();
133
134 let level = if args.verbose { "debug" } else { "info" };
135 tracing_subscriber::fmt()
136 .with_env_filter(
137 tracing_subscriber::EnvFilter::try_from_default_env()
138 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(level)),
139 )
140 .with_target(false)
141 .init();
142
143 if args.policy_pack.is_empty() && args.pack_source.is_none() {
144 tracing::warn!("No --policy-pack or --pack-source supplied; daemon will ALLOW everything.");
145 }
146
147 let remote_packs: Vec<RemotePack> = if let Some(base) = args.pack_source.as_deref() {
149 match pack_fetcher::fetch_all(base, &args.issuer_pubkey_hex).await {
150 Ok(v) => {
151 tracing::info!(
152 "pack_source={} verified_packs={} (inventory-only; reference daemon does not execute OPA Rego v1)",
153 base,
154 v.len()
155 );
156 v
157 }
158 Err(e) => {
159 tracing::error!("pack_source_failed url={} err={}", base, e);
160 return std::process::ExitCode::from(2);
161 }
162 }
163 } else {
164 Vec::new()
165 };
166 let pack_source_fingerprint = pack_fetcher::fingerprint(&remote_packs);
167
168 let mut packs: Vec<PolicyPack> = Vec::new();
173 let mut seen_pack_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
174 for path in &args.policy_pack {
175 match load_pack(path) {
176 Ok(pack) => {
177 if !seen_pack_ids.insert(pack.pack_id.clone()) {
178 tracing::error!(
179 "pack_id_collision: refusing to load duplicate pack_id={} from {} (already loaded earlier)",
180 pack.pack_id,
181 path.display()
182 );
183 return std::process::ExitCode::from(2);
184 }
185 if pack
186 .signature
187 .value
188 .as_deref()
189 .map(|v| v == "REFERENCE_PACK_NOT_SIGNED_LOCAL_DEV_ONLY")
190 .unwrap_or(false)
191 {
192 tracing::warn!(
193 "pack={} ships UNSIGNED — local dev only; prod must sign + verify against issuer JWKS",
194 pack.pack_id
195 );
196 }
197 tracing::info!(
198 "loaded pack={} rules={}",
199 pack.pack_id,
200 pack.rules.len()
201 );
202 packs.push(pack);
203 }
204 Err(e) => {
205 tracing::error!("pack_load_failed path={} err={}", path.display(), e);
206 return std::process::ExitCode::from(2);
207 }
208 }
209 }
210
211 let audit: Arc<dyn AuditSink> = match SqliteAuditLog::open(&args.audit_db) {
214 Ok(s) => {
215 tracing::info!("audit ledger ready at {}", args.audit_db.display());
216 Arc::new(s)
217 }
218 Err(e) => {
219 tracing::error!(
220 "audit_db_open_failed path={} err={}",
221 args.audit_db.display(),
222 e
223 );
224 return std::process::ExitCode::from(2);
225 }
226 };
227
228 let upstream = args.upstream_protect.as_ref().map(|u| {
229 tracing::info!("upstream_protect_url={}", u);
230 UpstreamClient::new(u.clone())
231 });
232
233 let state = AppState {
234 packs: Arc::new(packs),
235 remote_packs: Arc::new(remote_packs),
236 pack_source_fingerprint: Arc::new(pack_source_fingerprint),
237 audit,
238 rate: Arc::new(RateState::default()),
239 upstream,
240 receipt_counter: Arc::new(std::sync::atomic::AtomicU64::new(0)),
241 };
242
243 let app = Router::new()
244 .route("/health", get(health))
245 .route("/api/protect/health", get(health))
246 .route("/api/protect", post(protect))
247 .fallback(not_found)
248 .with_state(state);
249
250 let addr = format!("127.0.0.1:{}", args.port);
251 let listener = match tokio::net::TcpListener::bind(&addr).await {
252 Ok(l) => l,
253 Err(e) => {
254 tracing::error!("bind failed on {}: {}", addr, e);
255 return std::process::ExitCode::from(2);
256 }
257 };
258 tracing::info!(
259 "AiEGIS Harness daemon listening on http://{} (POST /api/protect)",
260 addr
261 );
262
263 let shutdown = async {
265 let _ = tokio::signal::ctrl_c().await;
266 tracing::info!("shutting down");
267 };
268
269 if let Err(e) = axum::serve(listener, app).with_graceful_shutdown(shutdown).await {
270 tracing::error!("server error: {}", e);
271 return std::process::ExitCode::from(1);
272 }
273 std::process::ExitCode::SUCCESS
274}
275
276async fn health(State(state): State<AppState>) -> impl IntoResponse {
277 let receipt_count = state
278 .receipt_counter
279 .load(std::sync::atomic::Ordering::Relaxed);
280 let body = json!({
281 "status": "ready",
282 "version": DAEMON_VERSION,
283 "spec_version": SPEC_VERSION,
284 "packs_loaded": state.packs.len(),
285 "pack_ids": state.packs.iter().map(|p| &p.pack_id).collect::<Vec<_>>(),
286 "remote_packs_loaded": state.remote_packs.len(),
287 "remote_pack_ids": state.remote_packs.iter()
288 .map(|p| format!("{}@{}", p.name, p.version)).collect::<Vec<_>>(),
289 "pack_source_fingerprint": state.pack_source_fingerprint.as_str(),
290 "remote_packs_inventory_only": true,
291 "upstream_protect_configured": state.upstream.is_some(),
292 "receipts_minted": receipt_count,
293 });
294 (StatusCode::OK, json_with_version_header(body))
295}
296
297async fn not_found() -> impl IntoResponse {
298 (
299 StatusCode::NOT_FOUND,
300 json_with_version_header(json!({"error": "not_found"})),
301 )
302}
303
304async fn protect(
305 State(state): State<AppState>,
306 headers: HeaderMap,
307 body: axum::body::Bytes,
308) -> Response {
309 let t0 = Instant::now();
310
311 let payload: Value = if body.is_empty() {
312 json!({})
313 } else {
314 match serde_json::from_slice::<Value>(&body) {
315 Ok(v) => v,
316 Err(_) => {
317 let err_body = json!({
318 "decision": "DENY",
319 "reason": "malformed_json",
320 "layer": "L?",
321 "decision_ms": 0,
322 });
323 return (StatusCode::BAD_REQUEST, json_with_version_header(err_body))
324 .into_response();
325 }
326 }
327 };
328
329 let agent_tag = headers
330 .get("X-AEGIS-Tag")
331 .and_then(|h| h.to_str().ok())
332 .unwrap_or("")
333 .to_string();
334
335 let agent_did = extract_agent_did(&agent_tag);
338
339 let actions = state.rate.observe(agent_did.as_deref().unwrap_or("anonymous"));
341 let daemon_state = json!({"agent_actions_last_minute": actions});
342
343 let result = evaluate_packs(&state.packs, &payload, &daemon_state);
344
345 let (receipt_id, upstream_error) = if let Some(client) = &state.upstream {
349 if result.decision != "DENY" {
350 match client.post(&payload, &agent_tag).await {
351 Ok(rcp) => {
352 state
353 .receipt_counter
354 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
355 (Some(rcp.rid), false)
356 }
357 Err(e) => {
358 tracing::warn!("upstream_post_failed err={}", e);
359 (None, true)
360 }
361 }
362 } else {
363 (None, false)
364 }
365 } else {
366 (None, false)
367 };
368
369 let decision_ms = t0.elapsed().as_millis() as u64;
370
371 let entry = AuditEntry::build_with_receipt(
372 agent_did.clone(),
373 payload.get("action").and_then(|v| v.as_str()).map(String::from),
374 payload.get("target").and_then(|v| v.as_str()).map(String::from),
375 result.decision.to_string(),
376 result.deciding_pack.clone(),
377 result.deciding_rule.clone(),
378 result.deciding_layer.clone(),
379 result.reason.clone(),
380 decision_ms,
381 receipt_id.clone(),
382 upstream_error,
383 );
384 if let Err(ledger_err) = state.audit.try_append(entry) {
389 tracing::error!("ledger_append_failed_failing_closed: {}", ledger_err);
390 let resp = json!({
391 "decision": "DENY",
392 "reason": "ledger_unavailable",
393 "layer": "L0-AuditLedger",
394 "deciding_pack": null,
395 "deciding_rule": null,
396 "decision_ms": decision_ms,
397 "agent_did": agent_did,
398 "receipt_id": null,
399 "upstream_error": upstream_error,
400 "ledger_error": ledger_err,
401 });
402 return (StatusCode::SERVICE_UNAVAILABLE, json_with_version_header(resp))
403 .into_response();
404 }
405
406 let status = if result.decision == "ALLOW" {
407 StatusCode::OK
408 } else {
409 StatusCode::UNAUTHORIZED
410 };
411 let resp = json!({
412 "decision": result.decision,
413 "reason": result.reason,
414 "layer": result.deciding_layer,
415 "deciding_pack": result.deciding_pack,
416 "deciding_rule": result.deciding_rule,
417 "decision_ms": decision_ms,
418 "agent_did": agent_did,
419 "receipt_id": receipt_id,
420 "upstream_error": upstream_error,
421 });
422 (status, json_with_version_header(resp)).into_response()
423}
424
425fn extract_agent_did(tag: &str) -> Option<String> {
426 const MAX_TAG_BYTES: usize = 8 * 1024;
438 const MAX_DID_CHARS: usize = 256;
439 if tag.len() > MAX_TAG_BYTES {
440 return None;
441 }
442 if tag.matches('.').count() != 2 {
443 return None;
444 }
445 let parts: Vec<&str> = tag.split('.').collect();
446 let mut payload_seg = parts[1].to_string();
447 while payload_seg.len() % 4 != 0 {
448 payload_seg.push('=');
449 }
450 let decoded = URL_SAFE_NO_PAD
452 .decode(payload_seg.trim_end_matches('='))
453 .ok()?;
454 let claims: Value = serde_json::from_slice(&decoded).ok()?;
455 let sub = claims.get("sub").and_then(|v| v.as_str())?;
456 if sub.chars().count() > MAX_DID_CHARS {
457 return None;
458 }
459 Some(sub.to_string())
460}
461
462fn json_with_version_header(body: Value) -> Response {
464 let mut resp = Json(body).into_response();
465 if let Ok(val) = axum::http::HeaderValue::from_str(DAEMON_VERSION) {
466 resp.headers_mut().insert("X-AEGIS-Harness-Version", val);
467 }
468 resp
469}