feat(audit): cold-storage archival to S3 (Phase 4)

Final piece of the audit work. Events older than the hot retention window
move to S3-compatible object storage with signed manifests. Production uses
Hetzner Object Storage; dev uses a MinIO container with the same API.

Infra (infrastructure/docker-compose):
  - New `minio` service exposing the S3 API at minio:9000 + admin console at
    minio.dezky.local. Healthchecked. Bucket-init sidecar runs `mc mb` once
    to create `dezky-audit`; safe to re-run.
  - .env adds MINIO_ROOT_USER + MINIO_ROOT_PASSWORD.
  - platform-api env: AUDIT_COLD_{ENDPOINT,REGION,BUCKET,ACCESS_KEY,SECRET_KEY}
    + AUDIT_HOT_RETENTION_DAYS=90 + ARCHIVE_ENABLED=false (dormant in dev;
    operator UI's "Run archive now" bypasses this gate). AUDIT_COLD_SSE
    opts into SSE-S3 — left unset in dev because MinIO without a KMS rejects
    AES256 PUTs with "KMS is not configured".

Platform-api (services/platform-api/src/cold/):
  - cold-storage.client.ts: thin @aws-sdk/client-s3 wrapper — put/head/list.
    forcePathStyle=true so MinIO and Hetzner both work; same code, env-swap.
  - archive.service.ts: runOnce() selects chained events with at < cutoff →
    serializes to JSONL → gzip → sha256s → uploads JSONL + signed manifest
    → HEAD-confirms both objects exist → records an ArchiveBatch doc → only
    then deletes from hot Mongo. Crash-safe: a failed upload leaves events
    in hot. Manifest uses the Phase 3 AUDIT_SIGNING_KEY (HMAC-SHA-256), so
    archives + checkpoints share trust chain. Bypassable via { override:
    true } for the operator's UI force-run.
  - archive.worker.ts: hourly tick guarded by configured run-hour-UTC
    (default 03:00) + day-guard so the same UTC day doesn't archive twice.
    Disabled until ARCHIVE_ENABLED=true.
  - archive-batch.schema.ts: { archivedAt, startSeq, endSeq, eventCount,
    manifestSha256, jsonlKey, manifestKey, bytesUncompressed }. The
    manifest sha256 stored in Mongo lets us detect manifest tampering
    without downloading the actual manifest.

Audit module additions:
  - audit.controller.ts: GET /audit/archives, POST /audit/archive/run,
    /audit/verify now reports { oldestHotSeq, highestArchivedSeq } so the
    UI shows the tier boundary.

Operator UI (apps/operator):
  - 2 new proxies: /api/audit/archives + /api/audit/archive/run (force
    override=true). Both behind operator auth via the existing platformApi
    helper.
  - audit.vue: new "Cold storage" card with batch table (archived-at, seq
    range, event count, size, truncated manifest sha256), "Run archive
    now" button + per-run result line.

Smoke-tested end-to-end:
  - 7 chained events in hot. /api/audit/archive/run → ok=true, batchId
    returned. JSONL + manifest both exist in MinIO (verified via mc ls +
    mc cat). Mongo's chained set went 7 → 0. Verify reports
    highestArchivedSeq=1446 (since we burn-allocate seqs on Authentik
    dup-key rejections). Operator /audit panel shows the batch with
    manifest hash 1d8263…
  - First attempt with SSE-S3 enabled failed cleanly (MinIO KMS not
    configured) — archive service correctly left events in hot Mongo.
    Made SSE opt-in via AUDIT_COLD_SSE=true; prod turns it on.

Out of scope (each could be its own session):
  - Restore-to-hot endpoint (today: download from S3 + offline query)
  - Client-side encryption (today: SSE-S3 in prod, none in dev)
  - Multi-region replication
  - Soft TTL safety net (defense-in-depth on top of app-managed deletion)

This completes the four-phase audit log work:
  1. platform-api as audit hub
  2. External system ingest (Authentik / Stalwart / OCIS)
  3. Hash-chain + signed checkpoints (tamper evidence)
  4. Cold-storage archival (retention without unbounded Mongo growth)
This commit is contained in:
Ronni Baslund
2026-05-24 21:03:41 +02:00
parent 9435baa09d
commit 4d9e906ec1
13 changed files with 1279 additions and 10 deletions
@@ -0,0 +1,261 @@
// Moves chained audit events older than AUDIT_HOT_RETENTION_DAYS from Mongo
// to cold storage. Idempotent and crash-safe — events are deleted ONLY after
// the JSONL + manifest have been confirmed in S3 (HEAD check). A failed
// upload or HEAD check leaves the events in hot Mongo for the next run.
//
// Trust chain across tiers:
// - Hot events carry sha256 hash + prev-hash links (Phase 3).
// - When archived, the JSONL preserves these fields verbatim.
// - The manifest records the JSONL's sha256 + the boundary event hashes,
// signed with AUDIT_SIGNING_KEY (same key as Phase 3 checkpoints).
// - The Phase 3 checkpoint that covers the archived seq range still
// verifies — we're not orphaning chain integrity, just moving the
// events themselves.
import { Injectable, Logger } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import { InjectModel } from '@nestjs/mongoose'
import { createHash, createHmac } from 'node:crypto'
import { gzipSync } from 'node:zlib'
import type { Model } from 'mongoose'
import {
AuditEvent,
type AuditEventDocument,
} from '../schemas/audit-event.schema.js'
import {
ArchiveBatch,
type ArchiveBatchDocument,
} from '../schemas/archive-batch.schema.js'
import { ColdStorageClient } from './cold-storage.client.js'
export interface ArchiveResult {
ok: boolean
reason?: string
batchId?: string
startSeq?: number
endSeq?: number
eventCount?: number
}
interface ArchiveManifest {
version: 1
startSeq: number
endSeq: number
eventCount: number
startedAt: string
endedAt: string
bytesUncompressed: number
bytesCompressed: number
sha256: string // sha256 of the JSONL (uncompressed)
firstEventHash: string
lastEventHash: string
signature: string
sigAlg: 'HMAC-SHA-256'
}
@Injectable()
export class ArchiveService {
private readonly logger = new Logger(ArchiveService.name)
private readonly retentionDays: number
private readonly signingKey: string
// Server-side encryption — opt-in via AUDIT_COLD_SSE=true. Production
// (Hetzner Object Storage) supports SSE-S3 natively. Dev/MinIO without
// a KMS rejects it with "KMS is not configured" so the default is off.
// Client-side encryption is a larger workstream tracked for later.
private readonly serverSideEncryption: 'AES256' | undefined
constructor(
@InjectModel(AuditEvent.name) private readonly events: Model<AuditEventDocument>,
@InjectModel(ArchiveBatch.name) private readonly batches: Model<ArchiveBatchDocument>,
private readonly cold: ColdStorageClient,
config: ConfigService,
) {
this.retentionDays = Number(config.get('AUDIT_HOT_RETENTION_DAYS') ?? 90)
this.signingKey = config.getOrThrow<string>('AUDIT_SIGNING_KEY')
this.serverSideEncryption = config.get('AUDIT_COLD_SSE') === 'true' ? 'AES256' : undefined
}
// One archive run. Selects events older than the retention boundary, packs
// them, uploads, then deletes from hot Mongo. Returns a small result so
// the worker can log a meaningful line.
async runOnce(opts?: { override?: boolean }): Promise<ArchiveResult> {
// Cutoff: events with at < cutoff go cold. For dev/forced runs, the
// caller can override the retention check entirely (so we can exercise
// the path on minute-old events).
const cutoff = opts?.override
? new Date()
: new Date(Date.now() - this.retentionDays * 24 * 60 * 60 * 1000)
// Pull the eligible events ordered by seq. Cap per run so a runaway
// backlog doesn't OOM us — at 100k events/batch the next run picks up
// where we left off.
const MAX_PER_BATCH = 100_000
const events = await this.events
.find({ chained: true, at: { $lt: cutoff } })
.sort({ seq: 1 })
.limit(MAX_PER_BATCH)
.lean()
.exec()
if (events.length === 0) {
return { ok: true, reason: 'no events older than cutoff' }
}
const startSeq = events[0].seq!
const endSeq = events[events.length - 1].seq!
const startedAt = new Date()
// Build JSONL — one event per line, no extra wrapping. Preserves the
// original document shape so offline tools can `jq` it directly.
const jsonlLines = events.map((e) => JSON.stringify(stripMongoMeta(e)))
const jsonl = jsonlLines.join('\n') + '\n'
const bytesUncompressed = Buffer.byteLength(jsonl, 'utf8')
const gzipped = gzipSync(Buffer.from(jsonl, 'utf8'))
// Hash the uncompressed payload — the manifest's hash is content-
// addressable so anyone redownloading + decompressing can re-verify.
const sha256 = createHash('sha256').update(jsonl).digest('hex')
const objectPrefix = objectKeyPrefix(startedAt, startSeq, endSeq)
const jsonlKey = `${objectPrefix}.jsonl.gz`
const manifestKey = `${objectPrefix}.manifest.json`
const firstEventHash = events[0].hash ?? '<missing>'
const lastEventHash = events[events.length - 1].hash ?? '<missing>'
const endedAt = new Date()
// Sign the manifest. Same key as Phase 3 checkpoints — an attacker who
// wants to forge an archive batch needs both S3 write AND this key.
const unsigned = {
version: 1 as const,
startSeq,
endSeq,
eventCount: events.length,
startedAt: startedAt.toISOString(),
endedAt: endedAt.toISOString(),
bytesUncompressed,
bytesCompressed: gzipped.length,
sha256,
firstEventHash,
lastEventHash,
}
const signature = createHmac('sha256', this.signingKey)
.update(JSON.stringify(unsigned))
.digest('hex')
const manifest: ArchiveManifest = {
...unsigned,
signature,
sigAlg: 'HMAC-SHA-256',
}
const manifestJson = JSON.stringify(manifest, null, 2)
const manifestSha256 = createHash('sha256').update(manifestJson).digest('hex')
try {
await this.cold.put({
key: jsonlKey,
body: gzipped,
contentType: 'application/gzip',
serverSideEncryption: this.serverSideEncryption,
})
await this.cold.put({
key: manifestKey,
body: manifestJson,
contentType: 'application/json',
serverSideEncryption: this.serverSideEncryption,
})
// Confirm both objects landed before deleting from hot.
const [jsonlHead, manifestHead] = await Promise.all([
this.cold.head(jsonlKey),
this.cold.head(manifestKey),
])
if (!jsonlHead.exists || !manifestHead.exists) {
throw new Error(
`archive HEAD failed: jsonl=${jsonlHead.exists} manifest=${manifestHead.exists}`,
)
}
} catch (err) {
this.logger.error(
`archive upload failed (seq ${startSeq}-${endSeq}): ${
err instanceof Error ? err.message : String(err)
} — events remain in hot Mongo`,
)
return {
ok: false,
reason: err instanceof Error ? err.message : String(err),
startSeq,
endSeq,
eventCount: events.length,
}
}
// Record the batch BEFORE the delete so a crash between the two leaves
// us with a batch record pointing at events that still exist in hot —
// safe. The reverse (delete-then-record) could orphan events.
const batch = await this.batches.create({
archivedAt: endedAt,
startSeq,
endSeq,
eventCount: events.length,
manifestSha256,
jsonlKey,
manifestKey,
bytesUncompressed,
})
// Delete from hot. One bulk op; safe because we own this seq range.
await this.events
.deleteMany({ chained: true, seq: { $gte: startSeq, $lte: endSeq } })
.exec()
this.logger.log(
`archive batch ${batch._id} · seq ${startSeq}-${endSeq} · ${events.length} events → ${jsonlKey}`,
)
return {
ok: true,
batchId: String(batch._id),
startSeq,
endSeq,
eventCount: events.length,
}
}
listBatches(limit = 100): Promise<ArchiveBatchDocument[]> {
return this.batches.find().sort({ archivedAt: -1 }).limit(limit).exec()
}
// Oldest chained event still in hot Mongo. Used by /audit/verify to report
// the tier boundary.
async oldestHotSeq(): Promise<number | null> {
const e = await this.events.findOne({ chained: true }, { seq: 1 }).sort({ seq: 1 }).exec()
return e?.seq ?? null
}
// Highest seq archived (across all batches).
async highestArchivedSeq(): Promise<number | null> {
const b = await this.batches.findOne().sort({ endSeq: -1 }).exec()
return b?.endSeq ?? null
}
}
// Removes Mongoose-internal keys we don't want in the cold JSONL.
// _id stays (it's a useful retrieval handle for compliance lookups), but
// __v and any virtual aliases are stripped.
function stripMongoMeta(doc: Record<string, unknown>): Record<string, unknown> {
const out: Record<string, unknown> = {}
for (const [k, v] of Object.entries(doc)) {
if (k === '__v') continue
out[k] = v
}
return out
}
// S3 key layout: audit/YYYY/MM/DD/seq-{start}-{end}. Year/month/day buckets
// keep S3 list responses manageable as years roll over.
function objectKeyPrefix(at: Date, startSeq: number, endSeq: number): string {
const y = at.getUTCFullYear()
const m = String(at.getUTCMonth() + 1).padStart(2, '0')
const d = String(at.getUTCDate()).padStart(2, '0')
return `audit/${y}/${m}/${d}/seq-${startSeq}-${endSeq}`
}
@@ -0,0 +1,78 @@
// Daily-ish scheduler for the audit archive run. Disabled by default
// (ARCHIVE_ENABLED=false) — production turns it on once volumes warrant.
// The operator UI can force a run via POST /audit/archive/run regardless
// of this flag.
import {
Injectable,
Logger,
type OnApplicationBootstrap,
type OnModuleDestroy,
} from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import { ArchiveService } from './archive.service.js'
// One hour. Inside each tick we check if the current UTC hour matches the
// configured run-hour; only one of 24 ticks per day actually invokes the
// archive. Cheap and simple — no real cron lib needed.
const TICK_MS = 60 * 60 * 1000
const DEFAULT_RUN_HOUR_UTC = 3 // 03:00 UTC daily
@Injectable()
export class ArchiveWorker implements OnApplicationBootstrap, OnModuleDestroy {
private readonly logger = new Logger(ArchiveWorker.name)
private readonly enabled: boolean
private readonly runHour: number
private timer: NodeJS.Timeout | null = null
private lastRunDay: string | null = null
constructor(
private readonly archive: ArchiveService,
config: ConfigService,
) {
this.enabled = config.get('ARCHIVE_ENABLED') === 'true'
this.runHour = Number(config.get('ARCHIVE_RUN_HOUR_UTC') ?? DEFAULT_RUN_HOUR_UTC)
}
onApplicationBootstrap(): void {
if (!this.enabled) {
this.logger.log(
'ARCHIVE_ENABLED=false — archive scheduler dormant. Operator UI can force runs.',
)
return
}
this.logger.log(`Archive scheduler active · runs daily at ${this.runHour}:00 UTC`)
// Fire once on startup in case we missed a window; the day-guard prevents
// double-runs within the same UTC day.
void this.tick()
this.timer = setInterval(() => void this.tick(), TICK_MS)
}
onModuleDestroy(): void {
if (this.timer) clearInterval(this.timer)
}
private async tick(): Promise<void> {
const now = new Date()
if (now.getUTCHours() !== this.runHour) return
const today = now.toISOString().slice(0, 10) // YYYY-MM-DD UTC
if (this.lastRunDay === today) return // already ran today
this.lastRunDay = today
this.logger.log(`Archive tick fired for ${today}`)
try {
const res = await this.archive.runOnce()
if (res.ok && res.eventCount) {
this.logger.log(`Archive complete: ${res.eventCount} events (${res.startSeq}-${res.endSeq})`)
} else if (res.ok) {
this.logger.log(`Archive complete: ${res.reason ?? 'no-op'}`)
} else {
this.logger.error(`Archive failed: ${res.reason}`)
}
} catch (err) {
this.logger.error(
`Archive tick crashed: ${err instanceof Error ? err.message : String(err)}`,
)
}
}
}
@@ -0,0 +1,110 @@
// Thin wrapper around @aws-sdk/client-s3 configured for Dezky's audit
// archive bucket. Same code talks to MinIO in dev and Hetzner Object
// Storage in prod — only the endpoint + credentials change via env.
//
// We deliberately keep the surface small (put, head, list) so the archive
// worker doesn't depend on AWS SDK specifics that might shift between
// versions. Streaming reads / restore-from-cold are a follow-up; today's
// scope is the write path.
import { Injectable, Logger } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import {
HeadObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
S3Client,
} from '@aws-sdk/client-s3'
export interface ColdPutInput {
key: string
body: Buffer | string
contentType?: string
// Server-side encryption for prod (Hetzner SSE-S3). MinIO ignores when
// unset; setting it explicitly is a no-op for dev but documents intent.
serverSideEncryption?: 'AES256'
}
export interface ColdHeadResult {
exists: boolean
size?: number
etag?: string
}
@Injectable()
export class ColdStorageClient {
private readonly logger = new Logger(ColdStorageClient.name)
private readonly client: S3Client
private readonly bucket: string
constructor(config: ConfigService) {
const endpoint = config.getOrThrow<string>('AUDIT_COLD_ENDPOINT')
const region = config.get<string>('AUDIT_COLD_REGION') ?? 'us-east-1'
const accessKeyId = config.getOrThrow<string>('AUDIT_COLD_ACCESS_KEY')
const secretAccessKey = config.getOrThrow<string>('AUDIT_COLD_SECRET_KEY')
this.bucket = config.getOrThrow<string>('AUDIT_COLD_BUCKET')
this.client = new S3Client({
endpoint,
region,
credentials: { accessKeyId, secretAccessKey },
// forcePathStyle is mandatory for MinIO and harmless for Hetzner (which
// accepts both virtual-host + path-style). Without it the SDK tries
// `bucket.minio:9000` which doesn't resolve.
forcePathStyle: true,
})
}
get bucketName(): string {
return this.bucket
}
// Upload an object. Returns the ETag the server assigned. Throws on
// network/auth failure; callers should not catch and pretend success —
// the archive worker uses throw-on-failure to keep events in hot storage.
async put(input: ColdPutInput): Promise<{ etag?: string }> {
const res = await this.client.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: input.key,
Body: input.body,
ContentType: input.contentType,
ServerSideEncryption: input.serverSideEncryption,
}),
)
return { etag: res.ETag }
}
// Existence + size check. The archive worker calls this after every put
// to confirm the upload actually landed before deleting from hot Mongo.
async head(key: string): Promise<ColdHeadResult> {
try {
const res = await this.client.send(
new HeadObjectCommand({ Bucket: this.bucket, Key: key }),
)
return { exists: true, size: res.ContentLength, etag: res.ETag }
} catch (err) {
// S3 returns 404 for missing keys; the SDK surfaces it as a NotFound
// error. Treat 404 as "doesn't exist" without logging — anything else
// is real failure.
const e = err as { name?: string; $metadata?: { httpStatusCode?: number } }
if (e?.name === 'NotFound' || e?.$metadata?.httpStatusCode === 404) {
return { exists: false }
}
throw err
}
}
// List objects under a prefix. Used by the operator UI to show recent
// archive batches. Paginated via continuation token; for the v1 UI we cap
// at a single page since we only show the latest N.
async list(prefix: string, max = 100): Promise<{ keys: string[]; truncated: boolean }> {
const res = await this.client.send(
new ListObjectsV2Command({ Bucket: this.bucket, Prefix: prefix, MaxKeys: max }),
)
return {
keys: (res.Contents ?? []).map((c) => c.Key!).filter(Boolean),
truncated: !!res.IsTruncated,
}
}
}
@@ -0,0 +1,26 @@
import { Module } from '@nestjs/common'
import { MongooseModule } from '@nestjs/mongoose'
import {
ArchiveBatch,
ArchiveBatchSchema,
} from '../schemas/archive-batch.schema.js'
import { AuditEvent, AuditEventSchema } from '../schemas/audit-event.schema.js'
import { ArchiveService } from './archive.service.js'
import { ArchiveWorker } from './archive.worker.js'
import { ColdStorageClient } from './cold-storage.client.js'
// Phase 4 — cold storage + retention. Owns the S3 client wrapper, the
// archive service that moves events from hot Mongo to cold S3, and the
// daily worker scheduler. AuditModule imports this and exposes archive
// endpoints on /audit/archive*.
@Module({
imports: [
MongooseModule.forFeature([
{ name: AuditEvent.name, schema: AuditEventSchema },
{ name: ArchiveBatch.name, schema: ArchiveBatchSchema },
]),
],
providers: [ColdStorageClient, ArchiveService, ArchiveWorker],
exports: [ArchiveService],
})
export class ColdModule {}