diff --git a/services/platform-api/src/app.module.ts b/services/platform-api/src/app.module.ts index 97c49e5..ed78a27 100644 --- a/services/platform-api/src/app.module.ts +++ b/services/platform-api/src/app.module.ts @@ -5,6 +5,7 @@ import { AuditModule } from './audit/audit.module.js' import { AuthModule } from './auth/auth.module.js' import { FlagsModule } from './flags/flags.module.js' import { HealthModule } from './health/health.module.js' +import { IngestModule } from './ingest/ingest.module.js' import { PartnersModule } from './partners/partners.module.js' import { SeedModule } from './seed/seed.module.js' import { SubscriptionsModule } from './subscriptions/subscriptions.module.js' @@ -25,6 +26,7 @@ import { UsersModule } from './users/users.module.js' UsersModule, SubscriptionsModule, FlagsModule, + IngestModule, SeedModule, ], }) diff --git a/services/platform-api/src/audit/audit.service.ts b/services/platform-api/src/audit/audit.service.ts index 7698225..c188dfa 100644 --- a/services/platform-api/src/audit/audit.service.ts +++ b/services/platform-api/src/audit/audit.service.ts @@ -38,6 +38,12 @@ export interface AuditRecordInput { partnerSlug?: string source?: AuditSource metadata?: Record + // Dedup key for ingest workers (Authentik pk, OCIS event id, …). Internally + // recorded events leave this unset. + externalId?: string + // When the event happened at the source. Ingest workers pass this; internal + // mutations let AuditService default to now(). + at?: Date } export interface AuditListFilters { @@ -71,7 +77,7 @@ export class AuditService { async record(input: AuditRecordInput, actor?: AuditActor): Promise { try { await this.model.create({ - at: new Date(), + at: input.at ?? new Date(), actorType: actor?.userId || actor?.email ? 'user' : 'system', actorId: actor?.userId, actorEmail: actor?.email, @@ -85,13 +91,20 @@ export class AuditService { partnerSlug: input.partnerSlug, source: input.source ?? 'platform-api', metadata: input.metadata, + externalId: input.externalId, }) } catch (err) { - this.logger.error( - `audit write failed for action=${input.action} resource=${input.resourceId}: ${ - err instanceof Error ? err.message : String(err) - }`, - ) + // Duplicate-key on (source, externalId) is expected when ingest workers + // re-poll an overlapping window — quietly ignore so the worker doesn't + // log noise every cycle. Everything else is logged. + const isDup = (err as { code?: number })?.code === 11000 + if (!isDup) { + this.logger.error( + `audit write failed for action=${input.action} resource=${input.resourceId}: ${ + err instanceof Error ? err.message : String(err) + }`, + ) + } } } diff --git a/services/platform-api/src/ingest/action-map.ts b/services/platform-api/src/ingest/action-map.ts new file mode 100644 index 0000000..1d11125 --- /dev/null +++ b/services/platform-api/src/ingest/action-map.ts @@ -0,0 +1,43 @@ +// Per-source mapping from raw event action strings to the dotted verbs we +// store on AuditEvent.action. Known actions get a clean explicit verb; unknown +// ones fall through to `.` so we don't silently drop new event +// types Authentik / Stalwart / OCIS introduces. + +import type { AuditOutcome, AuditResourceType } from '../schemas/audit-event.schema.js' + +export interface MappedAction { + action: string + outcome: AuditOutcome + resourceType?: AuditResourceType +} + +// Authentik action strings (from /api/v3/events/events/ — see +// https://goauthentik.io/docs/events/ for the full enum). +const AUTHENTIK_MAP: Record = { + login: { action: 'authentik.login', outcome: 'success', resourceType: 'user' }, + login_failed: { action: 'authentik.login_failed', outcome: 'failure', resourceType: 'user' }, + logout: { action: 'authentik.logout', outcome: 'success', resourceType: 'user' }, + user_write: { action: 'authentik.user_updated', outcome: 'success', resourceType: 'user' }, + password_set: { action: 'authentik.password_changed', outcome: 'success', resourceType: 'user' }, + secret_rotate: { action: 'authentik.secret_rotated', outcome: 'success', resourceType: 'system' }, + group_membership_set: { action: 'authentik.group_membership_changed', outcome: 'success', resourceType: 'user' }, + invitation_used: { action: 'authentik.invitation_used', outcome: 'success', resourceType: 'user' }, + authorize_application: { action: 'authentik.app_authorized', outcome: 'success', resourceType: 'user' }, + impersonation_started: { action: 'authentik.impersonation_started', outcome: 'success', resourceType: 'user' }, + impersonation_ended: { action: 'authentik.impersonation_ended', outcome: 'success', resourceType: 'user' }, + policy_execution: { action: 'authentik.policy_executed', outcome: 'success', resourceType: 'system' }, + policy_exception: { action: 'authentik.policy_exception', outcome: 'failure', resourceType: 'system' }, + configuration_error: { action: 'authentik.configuration_error', outcome: 'failure', resourceType: 'system' }, + model_created: { action: 'authentik.model_created', outcome: 'success', resourceType: 'system' }, + model_updated: { action: 'authentik.model_updated', outcome: 'success', resourceType: 'system' }, + model_deleted: { action: 'authentik.model_deleted', outcome: 'success', resourceType: 'system' }, +} + +export function mapAuthentikAction(raw: string): MappedAction { + return ( + AUTHENTIK_MAP[raw] ?? { + action: `authentik.${raw}`, + outcome: 'success', + } + ) +} diff --git a/services/platform-api/src/ingest/authentik.ingest.ts b/services/platform-api/src/ingest/authentik.ingest.ts new file mode 100644 index 0000000..55fc591 --- /dev/null +++ b/services/platform-api/src/ingest/authentik.ingest.ts @@ -0,0 +1,187 @@ +// Polls Authentik's /events/events/ endpoint on an interval and writes each +// event into our audit log via AuditService.record(). Cursor state lives in +// the ingest_cursors collection so a platform-api restart resumes where it +// left off without re-recording (sparse unique index on +// AuditEvent { source, externalId } is the second line of defense against +// dupes). + +import { + Injectable, + Logger, + type OnApplicationBootstrap, + type OnModuleDestroy, +} from '@nestjs/common' +import { ConfigService } from '@nestjs/config' +import { InjectModel } from '@nestjs/mongoose' +import type { Model } from 'mongoose' +import { AuditService } from '../audit/audit.service.js' +import { AuthentikClient, type AuthentikEvent } from '../integrations/authentik.client.js' +import { Tenant, type TenantDocument } from '../schemas/tenant.schema.js' +import { + IngestCursor, + type IngestCursorDocument, +} from '../schemas/ingest-cursor.schema.js' +import { mapAuthentikAction } from './action-map.js' + +const SOURCE = 'authentik' as const +const DEFAULT_INTERVAL_MS = 60_000 +const MAX_PAGES_PER_TICK = 10 // safety cap — 100/page × 10 = 1000 events max per cycle + +// Authentik bundles platform admin into specific groups; strip those when we +// derive a tenant slug from the user's group list (the same convention the +// portal /api/flags/evaluate proxy uses). +const ADMIN_GROUPS = new Set(['dezky-platform-admins', 'authentik Admins']) + +@Injectable() +export class AuthentikIngest implements OnApplicationBootstrap, OnModuleDestroy { + private readonly logger = new Logger(AuthentikIngest.name) + private readonly intervalMs: number + private readonly enabled: boolean + private timer: NodeJS.Timeout | null = null + private inFlight = false + + constructor( + @InjectModel(IngestCursor.name) private readonly cursors: Model, + @InjectModel(Tenant.name) private readonly tenants: Model, + private readonly audit: AuditService, + private readonly authentik: AuthentikClient, + config: ConfigService, + ) { + this.intervalMs = Number(config.get('AUDIT_INGEST_INTERVAL_MS') ?? DEFAULT_INTERVAL_MS) + // Default ON in dev/prod. Set AUDIT_INGEST_ENABLED=false in test runs that + // don't have a real Authentik to poll. + this.enabled = config.get('AUDIT_INGEST_ENABLED') !== 'false' + } + + onApplicationBootstrap(): void { + if (!this.enabled) { + this.logger.log('AUDIT_INGEST_ENABLED=false — skipping Authentik ingest') + return + } + this.logger.log(`Authentik ingest starting (every ${this.intervalMs / 1000}s)`) + // Fire once on boot so we don't have to wait a full interval for the + // first pull, then schedule the recurring loop. + void this.tick() + this.timer = setInterval(() => void this.tick(), this.intervalMs) + } + + onModuleDestroy(): void { + if (this.timer) clearInterval(this.timer) + } + + // One pull cycle. Reads the cursor, paginates forward through new events, + // writes each to AuditEvent. We avoid overlapping runs with an in-flight + // flag — Authentik can be slow when there are many events. + private async tick(): Promise { + if (this.inFlight) { + this.logger.warn('previous tick still running — skipping') + return + } + this.inFlight = true + try { + const cursor = await this.cursors.findOne({ source: SOURCE }).exec() + const since = cursor?.lastEventAt ?? new Date(Date.now() - 24 * 60 * 60 * 1000) // first run: last 24h + + let page = 1 + let recorded = 0 + let newest: { at: Date; id: string } | undefined + + while (page <= MAX_PAGES_PER_TICK) { + const res = await this.authentik.listEvents(since, page).catch((err) => { + this.logger.error(`Authentik listEvents page ${page} failed: ${err instanceof Error ? err.message : String(err)}`) + return null + }) + if (!res || !res.results.length) break + + for (const evt of res.results) { + await this.recordOne(evt) + recorded++ + const at = new Date(evt.created) + if (!newest || at > newest.at) newest = { at, id: evt.pk } + } + + if (res.pagination.next === 0) break // no next page + page++ + } + + if (newest) { + await this.cursors.findOneAndUpdate( + { source: SOURCE }, + { $set: { lastEventAt: newest.at, lastEventId: newest.id } }, + { upsert: true }, + ).exec() + // `recorded` counts attempts, not net new rows — the dedup index on + // (source, externalId) silently rejects events Authentik returns in + // the overlap of the cursor window. Real net-new = recorded minus the + // duplicate-key suppressions in AuditService.record. + this.logger.log( + `Authentik ingest: ${recorded} event(s) processed, cursor → ${newest.at.toISOString()}`, + ) + } + } catch (err) { + this.logger.error(`Authentik ingest tick failed: ${err instanceof Error ? err.message : String(err)}`) + } finally { + this.inFlight = false + } + } + + private async recordOne(evt: AuthentikEvent): Promise { + const mapped = mapAuthentikAction(evt.action) + const userEmail = evt.user?.email || evt.user?.username + const tenantSlug = await this.deriveTenantSlug(evt) + const targetName = pickTargetName(evt) + + await this.audit.record({ + action: mapped.action, + outcome: mapped.outcome, + resourceType: mapped.resourceType, + resourceName: targetName, + tenantSlug, + source: SOURCE, + externalId: evt.pk, + at: new Date(evt.created), + metadata: { + app: evt.app, + context: evt.context, + rawAction: evt.action, + }, + }, { + email: userEmail, + ip: evt.client_ip, + }) + } + + // Authentik doesn't tell us which tenant an event belongs to; we infer it + // from the user's groups (same rule the portal uses to pick a tenant slug + // for flag eval). If the actor has no recognizable tenant group, we leave + // tenantSlug undefined — the event still lands in the global timeline, + // just not bucketed. + private async deriveTenantSlug(evt: AuthentikEvent): Promise { + const groups = (evt.context?.['user_groups'] as { name?: string }[] | undefined) ?? [] + const tenantGroupNames = groups + .map((g) => g.name) + .filter((n): n is string => typeof n === 'string' && !ADMIN_GROUPS.has(n)) + if (!tenantGroupNames.length) return undefined + // We bias toward the first match that actually exists as a tenant in our + // DB — Authentik groups can be arbitrary names. One query covers all + // candidates. + const match = await this.tenants + .findOne({ slug: { $in: tenantGroupNames } }, { slug: 1 }) + .exec() + return match?.slug + } +} + +// Pick a human label for the audit row's "target" column. Authentik puts the +// affected object in different shapes depending on the action; this favors +// the most useful display string per event type. +function pickTargetName(evt: AuthentikEvent): string | undefined { + const ctx = evt.context ?? {} + if (typeof ctx['username'] === 'string') return ctx['username'] as string + if (typeof ctx['email'] === 'string') return ctx['email'] as string + if (ctx['model'] && typeof ctx['model'] === 'object') { + const m = ctx['model'] as { name?: string; app?: string } + if (m.name) return `${m.app ? `${m.app}.` : ''}${m.name}` + } + return evt.user?.email || evt.user?.username +} diff --git a/services/platform-api/src/ingest/ingest.module.ts b/services/platform-api/src/ingest/ingest.module.ts new file mode 100644 index 0000000..e1fc2ec --- /dev/null +++ b/services/platform-api/src/ingest/ingest.module.ts @@ -0,0 +1,26 @@ +import { Module } from '@nestjs/common' +import { MongooseModule } from '@nestjs/mongoose' +import { AuditModule } from '../audit/audit.module.js' +import { IntegrationsModule } from '../integrations/integrations.module.js' +import { + IngestCursor, + IngestCursorSchema, +} from '../schemas/ingest-cursor.schema.js' +import { Tenant, TenantSchema } from '../schemas/tenant.schema.js' +import { AuthentikIngest } from './authentik.ingest.js' + +// Workers that pull audit events from external systems (Authentik today, +// Stalwart + OCIS later) and forward them to AuditService. Each worker owns +// its own cursor in the ingest_cursors collection so restarts resume cleanly. +@Module({ + imports: [ + AuditModule, + IntegrationsModule, + MongooseModule.forFeature([ + { name: IngestCursor.name, schema: IngestCursorSchema }, + { name: Tenant.name, schema: TenantSchema }, + ]), + ], + providers: [AuthentikIngest], +}) +export class IngestModule {} diff --git a/services/platform-api/src/integrations/authentik.client.ts b/services/platform-api/src/integrations/authentik.client.ts index 2c6003c..1929caf 100644 --- a/services/platform-api/src/integrations/authentik.client.ts +++ b/services/platform-api/src/integrations/authentik.client.ts @@ -69,4 +69,37 @@ export class AuthentikClient { } this.logger.log(`Deleted Authentik group ${groupId}`) } + + // Pull a window of Authentik events. Used by the audit ingest worker. + // `since` filters by created timestamp (strict greater-than); pagination is + // forward-only via `page`. Authentik's default page size is 100. + async listEvents( + since?: Date, + page = 1, + pageSize = 100, + ): Promise { + const params = new URLSearchParams({ + ordering: 'created', + page: String(page), + page_size: String(pageSize), + }) + if (since) params.set('created__gt', since.toISOString()) + return this.request(`/events/events/?${params}`) + } +} + +// Shape returned by /events/events/. Only the fields we read; Authentik +// includes a number of others (tenant, brand) we don't need. +export interface AuthentikEvent { + pk: string + action: string + app?: string + user?: { pk?: number; username?: string; name?: string; email?: string } + context?: Record + client_ip?: string + created: string +} +export interface AuthentikEventPage { + pagination: { next: number; previous: number; count: number; current: number; total_pages: number; start_index: number; end_index: number } + results: AuthentikEvent[] } diff --git a/services/platform-api/src/schemas/audit-event.schema.ts b/services/platform-api/src/schemas/audit-event.schema.ts index c55dddc..845cc6d 100644 --- a/services/platform-api/src/schemas/audit-event.schema.ts +++ b/services/platform-api/src/schemas/audit-event.schema.ts @@ -94,6 +94,14 @@ export class AuditEvent { @Prop({ type: Object }) metadata?: Record + // External event id (Authentik `pk`, OCIS event id, Stalwart event id…). + // Combined with `source` it's the dedup key for ingest workers — re-pulling + // the same Authentik event window is harmless. The compound unique index on + // (source, externalId) below — with a partialFilterExpression — gives both + // the dedup constraint and the lookup index in one shot. + @Prop() + externalId?: string + // Tamper-evidence prep. Populated by a later phase (hash-chain + signing). @Prop() prevHash?: string @@ -108,3 +116,12 @@ AuditEventSchema.index({ at: -1 }) AuditEventSchema.index({ tenantSlug: 1, at: -1 }) AuditEventSchema.index({ actorId: 1, at: -1 }) AuditEventSchema.index({ action: 1, at: -1 }) +// Dedup guard for ingest workers — same (source, externalId) only stored once. +// Use a partial filter rather than `sparse` because the latter only skips +// missing fields; Mongo will happily index `null` values from internally- +// recorded events and break the uniqueness invariant. Partial filter includes +// only documents that actually carry a string externalId. +AuditEventSchema.index( + { source: 1, externalId: 1 }, + { unique: true, partialFilterExpression: { externalId: { $type: 'string' } } }, +) diff --git a/services/platform-api/src/schemas/ingest-cursor.schema.ts b/services/platform-api/src/schemas/ingest-cursor.schema.ts new file mode 100644 index 0000000..b06b311 --- /dev/null +++ b/services/platform-api/src/schemas/ingest-cursor.schema.ts @@ -0,0 +1,24 @@ +import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose' +import { HydratedDocument } from 'mongoose' + +export type IngestCursorDocument = HydratedDocument + +// One row per ingest source. The worker reads `lastEventAt` on startup, pulls +// events with `created > lastEventAt`, then advances the cursor. Safe across +// platform-api restarts — the worker resumes where it left off. +@Schema({ collection: 'ingest_cursors', timestamps: true }) +export class IngestCursor { + @Prop({ required: true, unique: true, index: true }) + source!: string // 'authentik' | 'ocis' | 'stalwart' + + @Prop({ required: true }) + lastEventAt!: Date + + // Optional — the external id of the last event we recorded. Useful when + // multiple events share a timestamp; we use it as a tiebreaker so we don't + // miss or double-record events that fall on the same millisecond. + @Prop() + lastEventId?: string +} + +export const IngestCursorSchema = SchemaFactory.createForClass(IngestCursor)