diff --git a/infrastructure/docker-compose/docker-compose.yml b/infrastructure/docker-compose/docker-compose.yml index 908ab5d..f7a6b37 100644 --- a/infrastructure/docker-compose/docker-compose.yml +++ b/infrastructure/docker-compose/docker-compose.yml @@ -28,6 +28,7 @@ volumes: stalwart_data: ocis_config: ocis_data: + ocis_audit_log: portal_node_modules: platform_api_node_modules: operator_node_modules: @@ -258,9 +259,17 @@ services: NATS_NATS_PORT: 9233 GATEWAY_GRPC_ADDR: 0.0.0.0:9142 MICRO_GRPC_CLIENT_DNS_CACHE_TIMEOUT: 10s + # Audit service — JSON Lines to a shared volume that platform-api also + # mounts read-only. Used by the OCIS ingest worker to fold file/share + # mutations into the global audit timeline. The audit microservice is + # NOT part of the default `ocis server` set so we opt in explicitly. + OCIS_ADD_RUN_SERVICES: audit + AUDIT_LOG_FILE_PATH: /var/log/ocis/audit.log + AUDIT_LOG_FORMAT: json volumes: - ocis_config:/etc/ocis - ocis_data:/var/lib/ocis + - ocis_audit_log:/var/log/ocis - ./configs/ocis/csp.yaml:/etc/ocis/csp.yaml:ro networks: [dezky] depends_on: @@ -469,10 +478,14 @@ services: AUTHENTIK_JWKS_URI: https://auth.dezky.local/application/o/dezky-portal/jwks/ # Trust mkcert root CA for Node fetch (dev only) NODE_EXTRA_CA_CERTS: /etc/ssl/mkcert-root.pem + # Path to the OCIS audit log inside this container. The same shared + # volume is mounted on the OCIS service writeable; here it's read-only. + OCIS_AUDIT_LOG_PATH: /var/log/ocis/audit.log volumes: - ../../services/platform-api:/app - platform_api_node_modules:/app/node_modules - ./certs/mkcert-root.pem:/etc/ssl/mkcert-root.pem:ro + - ocis_audit_log:/var/log/ocis:ro networks: [dezky] depends_on: mongo: diff --git a/services/platform-api/src/ingest/action-map.ts b/services/platform-api/src/ingest/action-map.ts index 82c644c..e30fbe6 100644 --- a/services/platform-api/src/ingest/action-map.ts +++ b/services/platform-api/src/ingest/action-map.ts @@ -76,3 +76,43 @@ export function mapStalwartAction(raw: string): MappedAction { } ) } + +// OCIS audit events. The full event vocabulary is large (~50+ types); we only +// MAP the mutations we care about and the OCIS_AUDIT_ACTIONS allowlist in the +// ocis ingest worker filters everything else out before we even look it up. +// Each OCIS event type is PascalCase: UserCreated, FileUploaded, ShareCreated. +const OCIS_MAP: Record = { + UserCreated: { action: 'ocis.user_created', outcome: 'success', resourceType: 'user' }, + UserDeleted: { action: 'ocis.user_deleted', outcome: 'success', resourceType: 'user' }, + UserFeatureChanged: { action: 'ocis.user_updated', outcome: 'success', resourceType: 'user' }, + GroupCreated: { action: 'ocis.group_created', outcome: 'success', resourceType: 'system' }, + GroupDeleted: { action: 'ocis.group_deleted', outcome: 'success', resourceType: 'system' }, + GroupMemberAdded: { action: 'ocis.group_member_added', outcome: 'success', resourceType: 'user' }, + GroupMemberRemoved: { action: 'ocis.group_member_removed', outcome: 'success', resourceType: 'user' }, + SpaceCreated: { action: 'ocis.space_created', outcome: 'success', resourceType: 'tenant' }, + SpaceUpdated: { action: 'ocis.space_updated', outcome: 'success', resourceType: 'tenant' }, + SpaceDisabled: { action: 'ocis.space_disabled', outcome: 'success', resourceType: 'tenant' }, + SpaceDeleted: { action: 'ocis.space_deleted', outcome: 'success', resourceType: 'tenant' }, + SpaceShared: { action: 'ocis.space_shared', outcome: 'success', resourceType: 'tenant' }, + SpaceUnshared: { action: 'ocis.space_unshared', outcome: 'success', resourceType: 'tenant' }, + ShareCreated: { action: 'ocis.share_created', outcome: 'success', resourceType: 'system' }, + ShareUpdated: { action: 'ocis.share_updated', outcome: 'success', resourceType: 'system' }, + ShareRemoved: { action: 'ocis.share_removed', outcome: 'success', resourceType: 'system' }, + LinkCreated: { action: 'ocis.link_created', outcome: 'success', resourceType: 'system' }, + LinkUpdated: { action: 'ocis.link_updated', outcome: 'success', resourceType: 'system' }, + LinkRemoved: { action: 'ocis.link_removed', outcome: 'success', resourceType: 'system' }, + FileUploaded: { action: 'ocis.file_uploaded', outcome: 'success', resourceType: 'system' }, + FileTrashed: { action: 'ocis.file_trashed', outcome: 'success', resourceType: 'system' }, + FileRestored: { action: 'ocis.file_restored', outcome: 'success', resourceType: 'system' }, + FilePurged: { action: 'ocis.file_purged', outcome: 'success', resourceType: 'system' }, + FileVersionRestored: { action: 'ocis.file_version_restored', outcome: 'success', resourceType: 'system' }, +} + +// Allowlist of OCIS event types we record. Everything else (FileDownloaded, +// UserSignedIn, etc.) is filtered before mapping — keeps the audit timeline +// scannable instead of drowning in file-read events. +export const OCIS_ALLOWLIST = new Set(Object.keys(OCIS_MAP)) + +export function mapOcisAction(raw: string): MappedAction | null { + return OCIS_MAP[raw] ?? null +} diff --git a/services/platform-api/src/ingest/ingest.module.ts b/services/platform-api/src/ingest/ingest.module.ts index e444cd8..5d73a69 100644 --- a/services/platform-api/src/ingest/ingest.module.ts +++ b/services/platform-api/src/ingest/ingest.module.ts @@ -7,13 +7,15 @@ import { IngestCursorSchema, } from '../schemas/ingest-cursor.schema.js' import { Tenant, TenantSchema } from '../schemas/tenant.schema.js' +import { User, UserSchema } from '../schemas/user.schema.js' import { AuthentikIngest } from './authentik.ingest.js' +import { OcisIngest } from './ocis.ingest.js' import { StalwartWebhookController } from './stalwart-webhook.controller.js' // Workers + endpoints that bring audit events from external systems into // AuditService. Authentik = polling worker (pull). Stalwart = webhook -// endpoint (push). OCIS lands next as a file-tail worker (Phase 2 chunk 3). -// Each integration owns its own cursor / dedup story. +// endpoint (push). OCIS = file-tail worker (pull from a JSON-Lines log on a +// shared volume). Each integration owns its own cursor / dedup story. @Module({ imports: [ AuditModule, @@ -21,9 +23,10 @@ import { StalwartWebhookController } from './stalwart-webhook.controller.js' MongooseModule.forFeature([ { name: IngestCursor.name, schema: IngestCursorSchema }, { name: Tenant.name, schema: TenantSchema }, + { name: User.name, schema: UserSchema }, ]), ], controllers: [StalwartWebhookController], - providers: [AuthentikIngest], + providers: [AuthentikIngest, OcisIngest], }) export class IngestModule {} diff --git a/services/platform-api/src/ingest/ocis.ingest.ts b/services/platform-api/src/ingest/ocis.ingest.ts new file mode 100644 index 0000000..044982c --- /dev/null +++ b/services/platform-api/src/ingest/ocis.ingest.ts @@ -0,0 +1,262 @@ +// Tails the OCIS JSON-Lines audit log on a shared Docker volume and forwards +// mutations into AuditService. Read-only mount in this container; OCIS writes, +// we follow. +// +// Why polling instead of fs.watch: fs.watch is unreliable across Docker bind +// mounts on macOS (no inotify path through hyperkit). A 5-second stat-and-read +// loop is simpler and good enough — audit forensics doesn't need real-time. +// +// Cursor tracks (inode, byte position). If the inode changes (logrotate, OCIS +// restart with a new file) we start over at byte 0 of the new file. We don't +// persist position to Mongo per-line — too chatty — only after a successful +// batch of lines. On crash we may re-read a few seconds of events, which the +// (source, externalId) unique index dedups silently. + +import { + Injectable, + Logger, + type OnApplicationBootstrap, + type OnModuleDestroy, +} from '@nestjs/common' +import { ConfigService } from '@nestjs/config' +import { InjectModel } from '@nestjs/mongoose' +import * as fs from 'node:fs/promises' +import { createReadStream } from 'node:fs' +import { createInterface } from 'node:readline' +import type { Model } from 'mongoose' +import { AuditService } from '../audit/audit.service.js' +import { + IngestCursor, + type IngestCursorDocument, +} from '../schemas/ingest-cursor.schema.js' +import { Tenant, type TenantDocument } from '../schemas/tenant.schema.js' +import { User, type UserDocument } from '../schemas/user.schema.js' +import { mapOcisAction, OCIS_ALLOWLIST } from './action-map.js' + +const SOURCE = 'ocis' as const +const DEFAULT_INTERVAL_MS = 5_000 +const DEFAULT_PATH = '/var/log/ocis/audit.log' + +// Subset of fields we read off OCIS audit events. The full envelope is much +// richer; we destructure defensively so a schema bump on the OCIS side doesn't +// break ingest — unknown shapes log a debug line and get skipped. +interface OcisEvent { + type?: string + id?: string + timestamp?: string + executingUser?: { id?: string; username?: string; displayName?: string } + // Most events carry a target user, group, or item id. Fields vary by type. + user?: { id?: string; username?: string; displayName?: string } + group?: { id?: string; name?: string } + itemID?: string + resourceID?: { spaceId?: string; opaqueId?: string } + spaceID?: string + spaceName?: string + filename?: string + // Some OCIS events expose remote IP, most don't. Always treat as optional. + ip?: string +} + +@Injectable() +export class OcisIngest implements OnApplicationBootstrap, OnModuleDestroy { + private readonly logger = new Logger(OcisIngest.name) + private readonly path: string + private readonly intervalMs: number + private readonly enabled: boolean + private timer: NodeJS.Timeout | null = null + private inFlight = false + // In-memory mirror of the persisted cursor; we save to Mongo at most once + // per tick to keep load on the cursor collection minimal. + private currentInode: number | null = null + private currentPos: number = 0 + + constructor( + @InjectModel(IngestCursor.name) private readonly cursors: Model, + @InjectModel(Tenant.name) private readonly tenants: Model, + @InjectModel(User.name) private readonly users: Model, + private readonly audit: AuditService, + config: ConfigService, + ) { + this.path = config.get('OCIS_AUDIT_LOG_PATH') ?? DEFAULT_PATH + this.intervalMs = Number(config.get('AUDIT_INGEST_INTERVAL_MS') ?? DEFAULT_INTERVAL_MS) + this.enabled = config.get('AUDIT_INGEST_ENABLED') !== 'false' + } + + async onApplicationBootstrap(): Promise { + if (!this.enabled) { + this.logger.log('AUDIT_INGEST_ENABLED=false — skipping OCIS ingest') + return + } + // Load cursor state if we have any. lastEventId encodes "inode:position". + const cursor = await this.cursors.findOne({ source: SOURCE }).exec() + if (cursor?.lastEventId) { + const [inode, pos] = cursor.lastEventId.split(':') + this.currentInode = Number(inode) + this.currentPos = Number(pos) + } + this.logger.log( + `OCIS ingest starting · path=${this.path} interval=${this.intervalMs / 1000}s`, + ) + void this.tick() + this.timer = setInterval(() => void this.tick(), this.intervalMs) + } + + onModuleDestroy(): void { + if (this.timer) clearInterval(this.timer) + } + + private async tick(): Promise { + if (this.inFlight) return + this.inFlight = true + try { + const stat = await fs.stat(this.path).catch(() => null) + if (!stat) { + // File doesn't exist yet (OCIS hasn't generated events / audit + // service disabled). Cheap; quietly retry next tick. + return + } + + // Detect rotation / replacement: inode changed → reset to start of new + // file. Position past EOF → file got truncated → reset. + if (this.currentInode === null || stat.ino !== this.currentInode) { + if (this.currentInode !== null) { + this.logger.log(`OCIS audit log inode changed (${this.currentInode} → ${stat.ino}) — resuming from start`) + } + this.currentInode = stat.ino + this.currentPos = 0 + } else if (this.currentPos > stat.size) { + this.logger.log(`OCIS audit log truncated (pos=${this.currentPos} > size=${stat.size}) — resuming from start`) + this.currentPos = 0 + } + + if (stat.size === this.currentPos) return // no new bytes + + const startedAt = this.currentPos + + // Collect lines first, then process sequentially after the stream + // closes. Firing recordOne() from inside the 'line' handler creates a + // race where the stream resolves before the writes finish — which would + // cause us to advance the cursor past events we hadn't actually saved. + const events: OcisEvent[] = [] + await new Promise((resolve, reject) => { + const stream = createReadStream(this.path, { start: this.currentPos, encoding: 'utf8' }) + const rl = createInterface({ input: stream, crlfDelay: Infinity }) + rl.on('line', (line) => { + if (!line.trim()) return + try { + events.push(JSON.parse(line) as OcisEvent) + } catch (err) { + this.logger.warn(`OCIS audit: skipping unparseable line: ${(err as Error).message}`) + } + }) + rl.on('close', resolve) + rl.on('error', reject) + }) + + let recorded = 0 + for (const evt of events) { + await this.recordOne(evt) + recorded++ + } + + this.currentPos = stat.size + await this.cursors.findOneAndUpdate( + { source: SOURCE }, + { + $set: { + lastEventAt: new Date(), + lastEventId: `${this.currentInode}:${this.currentPos}`, + }, + }, + { upsert: true }, + ).exec() + if (recorded > 0) { + this.logger.log(`OCIS ingest: ${recorded} event(s) processed (bytes ${startedAt} → ${this.currentPos})`) + } + } catch (err) { + this.logger.error(`OCIS ingest tick failed: ${err instanceof Error ? err.message : String(err)}`) + } finally { + this.inFlight = false + } + } + + private async recordOne(evt: OcisEvent): Promise { + if (!evt.type || !OCIS_ALLOWLIST.has(evt.type)) return // filtered out + const mapped = mapOcisAction(evt.type) + if (!mapped) return + + const tenantSlug = await this.deriveTenantSlug(evt) + const actorEmail = await this.deriveActorEmail(evt.executingUser?.id) + const targetName = pickTargetName(evt) + + await this.audit.record( + { + action: mapped.action, + outcome: mapped.outcome, + resourceType: mapped.resourceType, + resourceName: targetName, + tenantSlug, + source: SOURCE, + externalId: evt.id, + at: evt.timestamp ? new Date(evt.timestamp) : new Date(), + metadata: { rawType: evt.type, ...stripCommon(evt) }, + }, + { + email: actorEmail ?? evt.executingUser?.username, + ip: evt.ip, + }, + ) + } + + // OCIS doesn't tell us which Dezky tenant an event belongs to directly. Two + // signals: + // 1. SpaceCreated/Updated etc. carry spaceName; we set space.name = tenant + // slug during provisioning so a literal match works. + // 2. Otherwise infer from the executing user's authentik subject → User + // doc → tenantIds → tenant slug (first match). + private async deriveTenantSlug(evt: OcisEvent): Promise { + if (evt.spaceName) { + const t = await this.tenants.findOne({ slug: evt.spaceName }, { slug: 1 }).exec() + if (t) return t.slug + } + if (evt.executingUser?.id) { + const user = await this.users + .findOne({ authentikSubjectId: evt.executingUser.id }, { tenantIds: 1 }) + .exec() + if (user?.tenantIds.length) { + const tenant = await this.tenants + .findOne({ _id: { $in: user.tenantIds } }, { slug: 1 }) + .exec() + return tenant?.slug + } + } + return undefined + } + + private async deriveActorEmail(authentikSubjectId?: string): Promise { + if (!authentikSubjectId) return undefined + const user = await this.users.findOne({ authentikSubjectId }, { email: 1 }).exec() + return user?.email + } +} + +function pickTargetName(evt: OcisEvent): string | undefined { + return ( + evt.filename || + evt.spaceName || + evt.user?.displayName || + evt.user?.username || + evt.group?.name || + evt.itemID || + evt.spaceID || + evt.resourceID?.opaqueId + ) +} + +// Trim the very large fields we definitely don't want in metadata (e.g. raw +// permission grants on space sharing). Keeps Mongo docs lean and avoids +// 16 MB limit edge cases on chatty events. +function stripCommon(evt: OcisEvent): Record { + const { executingUser: _eu, ip: _ip, ...rest } = evt + return rest as Record +}