// Moves chained audit events older than AUDIT_HOT_RETENTION_DAYS from Mongo // to cold storage. Idempotent and crash-safe — events are deleted ONLY after // the JSONL + manifest have been confirmed in S3 (HEAD check). A failed // upload or HEAD check leaves the events in hot Mongo for the next run. // // Trust chain across tiers: // - Hot events carry sha256 hash + prev-hash links (Phase 3). // - When archived, the JSONL preserves these fields verbatim. // - The manifest records the JSONL's sha256 + the boundary event hashes, // signed with AUDIT_SIGNING_KEY (same key as Phase 3 checkpoints). // - The Phase 3 checkpoint that covers the archived seq range still // verifies — we're not orphaning chain integrity, just moving the // events themselves. import { Injectable, Logger } from '@nestjs/common' import { ConfigService } from '@nestjs/config' import { InjectModel } from '@nestjs/mongoose' import { createHash, createHmac } from 'node:crypto' import { gzipSync } from 'node:zlib' import type { Model } from 'mongoose' import { AuditEvent, type AuditEventDocument, } from '../schemas/audit-event.schema.js' import { ArchiveBatch, type ArchiveBatchDocument, } from '../schemas/archive-batch.schema.js' import { ColdStorageClient } from './cold-storage.client.js' export interface ArchiveResult { ok: boolean reason?: string batchId?: string startSeq?: number endSeq?: number eventCount?: number } interface ArchiveManifest { version: 1 startSeq: number endSeq: number eventCount: number startedAt: string endedAt: string bytesUncompressed: number bytesCompressed: number sha256: string // sha256 of the JSONL (uncompressed) firstEventHash: string lastEventHash: string signature: string sigAlg: 'HMAC-SHA-256' } @Injectable() export class ArchiveService { private readonly logger = new Logger(ArchiveService.name) private readonly retentionDays: number private readonly signingKey: string // Server-side encryption — opt-in via AUDIT_COLD_SSE=true. Production // (Hetzner Object Storage) supports SSE-S3 natively. Dev/MinIO without // a KMS rejects it with "KMS is not configured" so the default is off. // Client-side encryption is a larger workstream tracked for later. private readonly serverSideEncryption: 'AES256' | undefined constructor( @InjectModel(AuditEvent.name) private readonly events: Model, @InjectModel(ArchiveBatch.name) private readonly batches: Model, private readonly cold: ColdStorageClient, config: ConfigService, ) { this.retentionDays = Number(config.get('AUDIT_HOT_RETENTION_DAYS') ?? 90) this.signingKey = config.getOrThrow('AUDIT_SIGNING_KEY') this.serverSideEncryption = config.get('AUDIT_COLD_SSE') === 'true' ? 'AES256' : undefined } // One archive run. Selects events older than the retention boundary, packs // them, uploads, then deletes from hot Mongo. Returns a small result so // the worker can log a meaningful line. async runOnce(opts?: { override?: boolean }): Promise { // Cutoff: events with at < cutoff go cold. For dev/forced runs, the // caller can override the retention check entirely (so we can exercise // the path on minute-old events). const cutoff = opts?.override ? new Date() : new Date(Date.now() - this.retentionDays * 24 * 60 * 60 * 1000) // Pull the eligible events ordered by seq. Cap per run so a runaway // backlog doesn't OOM us — at 100k events/batch the next run picks up // where we left off. const MAX_PER_BATCH = 100_000 const events = await this.events .find({ chained: true, at: { $lt: cutoff } }) .sort({ seq: 1 }) .limit(MAX_PER_BATCH) .lean() .exec() if (events.length === 0) { return { ok: true, reason: 'no events older than cutoff' } } const startSeq = events[0].seq! const endSeq = events[events.length - 1].seq! const startedAt = new Date() // Build JSONL — one event per line, no extra wrapping. Preserves the // original document shape so offline tools can `jq` it directly. const jsonlLines = events.map((e) => JSON.stringify(stripMongoMeta(e))) const jsonl = jsonlLines.join('\n') + '\n' const bytesUncompressed = Buffer.byteLength(jsonl, 'utf8') const gzipped = gzipSync(Buffer.from(jsonl, 'utf8')) // Hash the uncompressed payload — the manifest's hash is content- // addressable so anyone redownloading + decompressing can re-verify. const sha256 = createHash('sha256').update(jsonl).digest('hex') const objectPrefix = objectKeyPrefix(startedAt, startSeq, endSeq) const jsonlKey = `${objectPrefix}.jsonl.gz` const manifestKey = `${objectPrefix}.manifest.json` const firstEventHash = events[0].hash ?? '' const lastEventHash = events[events.length - 1].hash ?? '' const endedAt = new Date() // Sign the manifest. Same key as Phase 3 checkpoints — an attacker who // wants to forge an archive batch needs both S3 write AND this key. const unsigned = { version: 1 as const, startSeq, endSeq, eventCount: events.length, startedAt: startedAt.toISOString(), endedAt: endedAt.toISOString(), bytesUncompressed, bytesCompressed: gzipped.length, sha256, firstEventHash, lastEventHash, } const signature = createHmac('sha256', this.signingKey) .update(JSON.stringify(unsigned)) .digest('hex') const manifest: ArchiveManifest = { ...unsigned, signature, sigAlg: 'HMAC-SHA-256', } const manifestJson = JSON.stringify(manifest, null, 2) const manifestSha256 = createHash('sha256').update(manifestJson).digest('hex') try { await this.cold.put({ key: jsonlKey, body: gzipped, contentType: 'application/gzip', serverSideEncryption: this.serverSideEncryption, }) await this.cold.put({ key: manifestKey, body: manifestJson, contentType: 'application/json', serverSideEncryption: this.serverSideEncryption, }) // Confirm both objects landed before deleting from hot. const [jsonlHead, manifestHead] = await Promise.all([ this.cold.head(jsonlKey), this.cold.head(manifestKey), ]) if (!jsonlHead.exists || !manifestHead.exists) { throw new Error( `archive HEAD failed: jsonl=${jsonlHead.exists} manifest=${manifestHead.exists}`, ) } } catch (err) { this.logger.error( `archive upload failed (seq ${startSeq}-${endSeq}): ${ err instanceof Error ? err.message : String(err) } — events remain in hot Mongo`, ) return { ok: false, reason: err instanceof Error ? err.message : String(err), startSeq, endSeq, eventCount: events.length, } } // Record the batch BEFORE the delete so a crash between the two leaves // us with a batch record pointing at events that still exist in hot — // safe. The reverse (delete-then-record) could orphan events. const batch = await this.batches.create({ archivedAt: endedAt, startSeq, endSeq, eventCount: events.length, manifestSha256, jsonlKey, manifestKey, bytesUncompressed, }) // Delete from hot. One bulk op; safe because we own this seq range. await this.events .deleteMany({ chained: true, seq: { $gte: startSeq, $lte: endSeq } }) .exec() this.logger.log( `archive batch ${batch._id} · seq ${startSeq}-${endSeq} · ${events.length} events → ${jsonlKey}`, ) return { ok: true, batchId: String(batch._id), startSeq, endSeq, eventCount: events.length, } } listBatches(limit = 100): Promise { return this.batches.find().sort({ archivedAt: -1 }).limit(limit).exec() } // Oldest chained event still in hot Mongo. Used by /audit/verify to report // the tier boundary. async oldestHotSeq(): Promise { const e = await this.events.findOne({ chained: true }, { seq: 1 }).sort({ seq: 1 }).exec() return e?.seq ?? null } // Highest seq archived (across all batches). async highestArchivedSeq(): Promise { const b = await this.batches.findOne().sort({ endSeq: -1 }).exec() return b?.endSeq ?? null } } // Removes Mongoose-internal keys we don't want in the cold JSONL. // _id stays (it's a useful retrieval handle for compliance lookups), but // __v and any virtual aliases are stripped. function stripMongoMeta(doc: Record): Record { const out: Record = {} for (const [k, v] of Object.entries(doc)) { if (k === '__v') continue out[k] = v } return out } // S3 key layout: audit/YYYY/MM/DD/seq-{start}-{end}. Year/month/day buckets // keep S3 list responses manageable as years roll over. function objectKeyPrefix(at: Date, startSeq: number, endSeq: number): string { const y = at.getUTCFullYear() const m = String(at.getUTCMonth() + 1).padStart(2, '0') const d = String(at.getUTCDate()).padStart(2, '0') return `audit/${y}/${m}/${d}/seq-${startSeq}-${endSeq}` }