feat(audit): Authentik events ingest worker (Phase 2 chunk 1)
Background worker that pulls Authentik's /api/v3/events/events/ on a 60s cadence and writes each event into our audit log via AuditService. External system events now share the same /audit timeline as internally-recorded platform mutations — operator queries don't have to cross-reference Authentik's own UI to see logins, password changes, group membership, impersonation, etc. Pieces: - src/schemas/ingest-cursor.schema.ts: one row per source, tracks lastEventAt + lastEventId so restarts resume without re-pulling. - src/schemas/audit-event.schema.ts: new `externalId` field; new compound unique index on (source, externalId) with a partial filter on externalId being a string. Partial (not sparse) so internally- recorded events with externalId=null don't collide. - src/audit/audit.service.ts: AuditRecordInput grows `externalId` + `at` fields. record() now silently swallows MongoError code 11000 (duplicate key) so re-pulling the cursor overlap doesn't log noise. - src/integrations/authentik.client.ts: listEvents(since, page, pageSize) on the existing client — reuses the admin token and base URL the provisioning code already configured. - src/ingest/action-map.ts: 16 known Authentik actions → dotted authentik.* verbs (login, login_failed, password_changed, impersonation_started, …). Unknown actions fall through to authentik.<raw> rather than getting silently dropped. - src/ingest/authentik.ingest.ts: OnApplicationBootstrap worker. Reads cursor → pulls events with created__gt=cursor, ordering=created ASC → paginates forward (10 pages × 100/page safety cap per tick) → writes each event with source='authentik' + externalId=pk + at= evt.created → advances cursor to the newest seen. inFlight guard prevents overlapping ticks. AUDIT_INGEST_ENABLED=false disables for test environments. - Tenant inference: from the user's groups (same convention the portal flag-eval proxy uses). Admin groups stripped; first match against a real Tenant.slug wins. Unmatched → tenantSlug undefined, event still lands in the global timeline. Smoke-tested: fresh Mongo + restart → 78 Authentik events ingested, 0 duplicates. Performed a login at app.dezky.local → next 60s tick captured the new login row with actor email + IP. Compound unique index on (source, externalId) verified to reject re-pulled events silently (no error logs). Out of scope here (covered by chunks 2 + 3): - Stalwart webhook ingest - OCIS file-tail ingest
This commit is contained in:
@@ -5,6 +5,7 @@ import { AuditModule } from './audit/audit.module.js'
|
||||
import { AuthModule } from './auth/auth.module.js'
|
||||
import { FlagsModule } from './flags/flags.module.js'
|
||||
import { HealthModule } from './health/health.module.js'
|
||||
import { IngestModule } from './ingest/ingest.module.js'
|
||||
import { PartnersModule } from './partners/partners.module.js'
|
||||
import { SeedModule } from './seed/seed.module.js'
|
||||
import { SubscriptionsModule } from './subscriptions/subscriptions.module.js'
|
||||
@@ -25,6 +26,7 @@ import { UsersModule } from './users/users.module.js'
|
||||
UsersModule,
|
||||
SubscriptionsModule,
|
||||
FlagsModule,
|
||||
IngestModule,
|
||||
SeedModule,
|
||||
],
|
||||
})
|
||||
|
||||
@@ -38,6 +38,12 @@ export interface AuditRecordInput {
|
||||
partnerSlug?: string
|
||||
source?: AuditSource
|
||||
metadata?: Record<string, unknown>
|
||||
// Dedup key for ingest workers (Authentik pk, OCIS event id, …). Internally
|
||||
// recorded events leave this unset.
|
||||
externalId?: string
|
||||
// When the event happened at the source. Ingest workers pass this; internal
|
||||
// mutations let AuditService default to now().
|
||||
at?: Date
|
||||
}
|
||||
|
||||
export interface AuditListFilters {
|
||||
@@ -71,7 +77,7 @@ export class AuditService {
|
||||
async record(input: AuditRecordInput, actor?: AuditActor): Promise<void> {
|
||||
try {
|
||||
await this.model.create({
|
||||
at: new Date(),
|
||||
at: input.at ?? new Date(),
|
||||
actorType: actor?.userId || actor?.email ? 'user' : 'system',
|
||||
actorId: actor?.userId,
|
||||
actorEmail: actor?.email,
|
||||
@@ -85,13 +91,20 @@ export class AuditService {
|
||||
partnerSlug: input.partnerSlug,
|
||||
source: input.source ?? 'platform-api',
|
||||
metadata: input.metadata,
|
||||
externalId: input.externalId,
|
||||
})
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`audit write failed for action=${input.action} resource=${input.resourceId}: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
)
|
||||
// Duplicate-key on (source, externalId) is expected when ingest workers
|
||||
// re-poll an overlapping window — quietly ignore so the worker doesn't
|
||||
// log noise every cycle. Everything else is logged.
|
||||
const isDup = (err as { code?: number })?.code === 11000
|
||||
if (!isDup) {
|
||||
this.logger.error(
|
||||
`audit write failed for action=${input.action} resource=${input.resourceId}: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
// Per-source mapping from raw event action strings to the dotted verbs we
|
||||
// store on AuditEvent.action. Known actions get a clean explicit verb; unknown
|
||||
// ones fall through to `<source>.<raw>` so we don't silently drop new event
|
||||
// types Authentik / Stalwart / OCIS introduces.
|
||||
|
||||
import type { AuditOutcome, AuditResourceType } from '../schemas/audit-event.schema.js'
|
||||
|
||||
export interface MappedAction {
|
||||
action: string
|
||||
outcome: AuditOutcome
|
||||
resourceType?: AuditResourceType
|
||||
}
|
||||
|
||||
// Authentik action strings (from /api/v3/events/events/ — see
|
||||
// https://goauthentik.io/docs/events/ for the full enum).
|
||||
const AUTHENTIK_MAP: Record<string, MappedAction> = {
|
||||
login: { action: 'authentik.login', outcome: 'success', resourceType: 'user' },
|
||||
login_failed: { action: 'authentik.login_failed', outcome: 'failure', resourceType: 'user' },
|
||||
logout: { action: 'authentik.logout', outcome: 'success', resourceType: 'user' },
|
||||
user_write: { action: 'authentik.user_updated', outcome: 'success', resourceType: 'user' },
|
||||
password_set: { action: 'authentik.password_changed', outcome: 'success', resourceType: 'user' },
|
||||
secret_rotate: { action: 'authentik.secret_rotated', outcome: 'success', resourceType: 'system' },
|
||||
group_membership_set: { action: 'authentik.group_membership_changed', outcome: 'success', resourceType: 'user' },
|
||||
invitation_used: { action: 'authentik.invitation_used', outcome: 'success', resourceType: 'user' },
|
||||
authorize_application: { action: 'authentik.app_authorized', outcome: 'success', resourceType: 'user' },
|
||||
impersonation_started: { action: 'authentik.impersonation_started', outcome: 'success', resourceType: 'user' },
|
||||
impersonation_ended: { action: 'authentik.impersonation_ended', outcome: 'success', resourceType: 'user' },
|
||||
policy_execution: { action: 'authentik.policy_executed', outcome: 'success', resourceType: 'system' },
|
||||
policy_exception: { action: 'authentik.policy_exception', outcome: 'failure', resourceType: 'system' },
|
||||
configuration_error: { action: 'authentik.configuration_error', outcome: 'failure', resourceType: 'system' },
|
||||
model_created: { action: 'authentik.model_created', outcome: 'success', resourceType: 'system' },
|
||||
model_updated: { action: 'authentik.model_updated', outcome: 'success', resourceType: 'system' },
|
||||
model_deleted: { action: 'authentik.model_deleted', outcome: 'success', resourceType: 'system' },
|
||||
}
|
||||
|
||||
export function mapAuthentikAction(raw: string): MappedAction {
|
||||
return (
|
||||
AUTHENTIK_MAP[raw] ?? {
|
||||
action: `authentik.${raw}`,
|
||||
outcome: 'success',
|
||||
}
|
||||
)
|
||||
}
|
||||
@@ -0,0 +1,187 @@
|
||||
// Polls Authentik's /events/events/ endpoint on an interval and writes each
|
||||
// event into our audit log via AuditService.record(). Cursor state lives in
|
||||
// the ingest_cursors collection so a platform-api restart resumes where it
|
||||
// left off without re-recording (sparse unique index on
|
||||
// AuditEvent { source, externalId } is the second line of defense against
|
||||
// dupes).
|
||||
|
||||
import {
|
||||
Injectable,
|
||||
Logger,
|
||||
type OnApplicationBootstrap,
|
||||
type OnModuleDestroy,
|
||||
} from '@nestjs/common'
|
||||
import { ConfigService } from '@nestjs/config'
|
||||
import { InjectModel } from '@nestjs/mongoose'
|
||||
import type { Model } from 'mongoose'
|
||||
import { AuditService } from '../audit/audit.service.js'
|
||||
import { AuthentikClient, type AuthentikEvent } from '../integrations/authentik.client.js'
|
||||
import { Tenant, type TenantDocument } from '../schemas/tenant.schema.js'
|
||||
import {
|
||||
IngestCursor,
|
||||
type IngestCursorDocument,
|
||||
} from '../schemas/ingest-cursor.schema.js'
|
||||
import { mapAuthentikAction } from './action-map.js'
|
||||
|
||||
const SOURCE = 'authentik' as const
|
||||
const DEFAULT_INTERVAL_MS = 60_000
|
||||
const MAX_PAGES_PER_TICK = 10 // safety cap — 100/page × 10 = 1000 events max per cycle
|
||||
|
||||
// Authentik bundles platform admin into specific groups; strip those when we
|
||||
// derive a tenant slug from the user's group list (the same convention the
|
||||
// portal /api/flags/evaluate proxy uses).
|
||||
const ADMIN_GROUPS = new Set(['dezky-platform-admins', 'authentik Admins'])
|
||||
|
||||
@Injectable()
|
||||
export class AuthentikIngest implements OnApplicationBootstrap, OnModuleDestroy {
|
||||
private readonly logger = new Logger(AuthentikIngest.name)
|
||||
private readonly intervalMs: number
|
||||
private readonly enabled: boolean
|
||||
private timer: NodeJS.Timeout | null = null
|
||||
private inFlight = false
|
||||
|
||||
constructor(
|
||||
@InjectModel(IngestCursor.name) private readonly cursors: Model<IngestCursorDocument>,
|
||||
@InjectModel(Tenant.name) private readonly tenants: Model<TenantDocument>,
|
||||
private readonly audit: AuditService,
|
||||
private readonly authentik: AuthentikClient,
|
||||
config: ConfigService,
|
||||
) {
|
||||
this.intervalMs = Number(config.get('AUDIT_INGEST_INTERVAL_MS') ?? DEFAULT_INTERVAL_MS)
|
||||
// Default ON in dev/prod. Set AUDIT_INGEST_ENABLED=false in test runs that
|
||||
// don't have a real Authentik to poll.
|
||||
this.enabled = config.get('AUDIT_INGEST_ENABLED') !== 'false'
|
||||
}
|
||||
|
||||
onApplicationBootstrap(): void {
|
||||
if (!this.enabled) {
|
||||
this.logger.log('AUDIT_INGEST_ENABLED=false — skipping Authentik ingest')
|
||||
return
|
||||
}
|
||||
this.logger.log(`Authentik ingest starting (every ${this.intervalMs / 1000}s)`)
|
||||
// Fire once on boot so we don't have to wait a full interval for the
|
||||
// first pull, then schedule the recurring loop.
|
||||
void this.tick()
|
||||
this.timer = setInterval(() => void this.tick(), this.intervalMs)
|
||||
}
|
||||
|
||||
onModuleDestroy(): void {
|
||||
if (this.timer) clearInterval(this.timer)
|
||||
}
|
||||
|
||||
// One pull cycle. Reads the cursor, paginates forward through new events,
|
||||
// writes each to AuditEvent. We avoid overlapping runs with an in-flight
|
||||
// flag — Authentik can be slow when there are many events.
|
||||
private async tick(): Promise<void> {
|
||||
if (this.inFlight) {
|
||||
this.logger.warn('previous tick still running — skipping')
|
||||
return
|
||||
}
|
||||
this.inFlight = true
|
||||
try {
|
||||
const cursor = await this.cursors.findOne({ source: SOURCE }).exec()
|
||||
const since = cursor?.lastEventAt ?? new Date(Date.now() - 24 * 60 * 60 * 1000) // first run: last 24h
|
||||
|
||||
let page = 1
|
||||
let recorded = 0
|
||||
let newest: { at: Date; id: string } | undefined
|
||||
|
||||
while (page <= MAX_PAGES_PER_TICK) {
|
||||
const res = await this.authentik.listEvents(since, page).catch((err) => {
|
||||
this.logger.error(`Authentik listEvents page ${page} failed: ${err instanceof Error ? err.message : String(err)}`)
|
||||
return null
|
||||
})
|
||||
if (!res || !res.results.length) break
|
||||
|
||||
for (const evt of res.results) {
|
||||
await this.recordOne(evt)
|
||||
recorded++
|
||||
const at = new Date(evt.created)
|
||||
if (!newest || at > newest.at) newest = { at, id: evt.pk }
|
||||
}
|
||||
|
||||
if (res.pagination.next === 0) break // no next page
|
||||
page++
|
||||
}
|
||||
|
||||
if (newest) {
|
||||
await this.cursors.findOneAndUpdate(
|
||||
{ source: SOURCE },
|
||||
{ $set: { lastEventAt: newest.at, lastEventId: newest.id } },
|
||||
{ upsert: true },
|
||||
).exec()
|
||||
// `recorded` counts attempts, not net new rows — the dedup index on
|
||||
// (source, externalId) silently rejects events Authentik returns in
|
||||
// the overlap of the cursor window. Real net-new = recorded minus the
|
||||
// duplicate-key suppressions in AuditService.record.
|
||||
this.logger.log(
|
||||
`Authentik ingest: ${recorded} event(s) processed, cursor → ${newest.at.toISOString()}`,
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.error(`Authentik ingest tick failed: ${err instanceof Error ? err.message : String(err)}`)
|
||||
} finally {
|
||||
this.inFlight = false
|
||||
}
|
||||
}
|
||||
|
||||
private async recordOne(evt: AuthentikEvent): Promise<void> {
|
||||
const mapped = mapAuthentikAction(evt.action)
|
||||
const userEmail = evt.user?.email || evt.user?.username
|
||||
const tenantSlug = await this.deriveTenantSlug(evt)
|
||||
const targetName = pickTargetName(evt)
|
||||
|
||||
await this.audit.record({
|
||||
action: mapped.action,
|
||||
outcome: mapped.outcome,
|
||||
resourceType: mapped.resourceType,
|
||||
resourceName: targetName,
|
||||
tenantSlug,
|
||||
source: SOURCE,
|
||||
externalId: evt.pk,
|
||||
at: new Date(evt.created),
|
||||
metadata: {
|
||||
app: evt.app,
|
||||
context: evt.context,
|
||||
rawAction: evt.action,
|
||||
},
|
||||
}, {
|
||||
email: userEmail,
|
||||
ip: evt.client_ip,
|
||||
})
|
||||
}
|
||||
|
||||
// Authentik doesn't tell us which tenant an event belongs to; we infer it
|
||||
// from the user's groups (same rule the portal uses to pick a tenant slug
|
||||
// for flag eval). If the actor has no recognizable tenant group, we leave
|
||||
// tenantSlug undefined — the event still lands in the global timeline,
|
||||
// just not bucketed.
|
||||
private async deriveTenantSlug(evt: AuthentikEvent): Promise<string | undefined> {
|
||||
const groups = (evt.context?.['user_groups'] as { name?: string }[] | undefined) ?? []
|
||||
const tenantGroupNames = groups
|
||||
.map((g) => g.name)
|
||||
.filter((n): n is string => typeof n === 'string' && !ADMIN_GROUPS.has(n))
|
||||
if (!tenantGroupNames.length) return undefined
|
||||
// We bias toward the first match that actually exists as a tenant in our
|
||||
// DB — Authentik groups can be arbitrary names. One query covers all
|
||||
// candidates.
|
||||
const match = await this.tenants
|
||||
.findOne({ slug: { $in: tenantGroupNames } }, { slug: 1 })
|
||||
.exec()
|
||||
return match?.slug
|
||||
}
|
||||
}
|
||||
|
||||
// Pick a human label for the audit row's "target" column. Authentik puts the
|
||||
// affected object in different shapes depending on the action; this favors
|
||||
// the most useful display string per event type.
|
||||
function pickTargetName(evt: AuthentikEvent): string | undefined {
|
||||
const ctx = evt.context ?? {}
|
||||
if (typeof ctx['username'] === 'string') return ctx['username'] as string
|
||||
if (typeof ctx['email'] === 'string') return ctx['email'] as string
|
||||
if (ctx['model'] && typeof ctx['model'] === 'object') {
|
||||
const m = ctx['model'] as { name?: string; app?: string }
|
||||
if (m.name) return `${m.app ? `${m.app}.` : ''}${m.name}`
|
||||
}
|
||||
return evt.user?.email || evt.user?.username
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
import { Module } from '@nestjs/common'
|
||||
import { MongooseModule } from '@nestjs/mongoose'
|
||||
import { AuditModule } from '../audit/audit.module.js'
|
||||
import { IntegrationsModule } from '../integrations/integrations.module.js'
|
||||
import {
|
||||
IngestCursor,
|
||||
IngestCursorSchema,
|
||||
} from '../schemas/ingest-cursor.schema.js'
|
||||
import { Tenant, TenantSchema } from '../schemas/tenant.schema.js'
|
||||
import { AuthentikIngest } from './authentik.ingest.js'
|
||||
|
||||
// Workers that pull audit events from external systems (Authentik today,
|
||||
// Stalwart + OCIS later) and forward them to AuditService. Each worker owns
|
||||
// its own cursor in the ingest_cursors collection so restarts resume cleanly.
|
||||
@Module({
|
||||
imports: [
|
||||
AuditModule,
|
||||
IntegrationsModule,
|
||||
MongooseModule.forFeature([
|
||||
{ name: IngestCursor.name, schema: IngestCursorSchema },
|
||||
{ name: Tenant.name, schema: TenantSchema },
|
||||
]),
|
||||
],
|
||||
providers: [AuthentikIngest],
|
||||
})
|
||||
export class IngestModule {}
|
||||
@@ -69,4 +69,37 @@ export class AuthentikClient {
|
||||
}
|
||||
this.logger.log(`Deleted Authentik group ${groupId}`)
|
||||
}
|
||||
|
||||
// Pull a window of Authentik events. Used by the audit ingest worker.
|
||||
// `since` filters by created timestamp (strict greater-than); pagination is
|
||||
// forward-only via `page`. Authentik's default page size is 100.
|
||||
async listEvents(
|
||||
since?: Date,
|
||||
page = 1,
|
||||
pageSize = 100,
|
||||
): Promise<AuthentikEventPage> {
|
||||
const params = new URLSearchParams({
|
||||
ordering: 'created',
|
||||
page: String(page),
|
||||
page_size: String(pageSize),
|
||||
})
|
||||
if (since) params.set('created__gt', since.toISOString())
|
||||
return this.request<AuthentikEventPage>(`/events/events/?${params}`)
|
||||
}
|
||||
}
|
||||
|
||||
// Shape returned by /events/events/. Only the fields we read; Authentik
|
||||
// includes a number of others (tenant, brand) we don't need.
|
||||
export interface AuthentikEvent {
|
||||
pk: string
|
||||
action: string
|
||||
app?: string
|
||||
user?: { pk?: number; username?: string; name?: string; email?: string }
|
||||
context?: Record<string, unknown>
|
||||
client_ip?: string
|
||||
created: string
|
||||
}
|
||||
export interface AuthentikEventPage {
|
||||
pagination: { next: number; previous: number; count: number; current: number; total_pages: number; start_index: number; end_index: number }
|
||||
results: AuthentikEvent[]
|
||||
}
|
||||
|
||||
@@ -94,6 +94,14 @@ export class AuditEvent {
|
||||
@Prop({ type: Object })
|
||||
metadata?: Record<string, unknown>
|
||||
|
||||
// External event id (Authentik `pk`, OCIS event id, Stalwart event id…).
|
||||
// Combined with `source` it's the dedup key for ingest workers — re-pulling
|
||||
// the same Authentik event window is harmless. The compound unique index on
|
||||
// (source, externalId) below — with a partialFilterExpression — gives both
|
||||
// the dedup constraint and the lookup index in one shot.
|
||||
@Prop()
|
||||
externalId?: string
|
||||
|
||||
// Tamper-evidence prep. Populated by a later phase (hash-chain + signing).
|
||||
@Prop()
|
||||
prevHash?: string
|
||||
@@ -108,3 +116,12 @@ AuditEventSchema.index({ at: -1 })
|
||||
AuditEventSchema.index({ tenantSlug: 1, at: -1 })
|
||||
AuditEventSchema.index({ actorId: 1, at: -1 })
|
||||
AuditEventSchema.index({ action: 1, at: -1 })
|
||||
// Dedup guard for ingest workers — same (source, externalId) only stored once.
|
||||
// Use a partial filter rather than `sparse` because the latter only skips
|
||||
// missing fields; Mongo will happily index `null` values from internally-
|
||||
// recorded events and break the uniqueness invariant. Partial filter includes
|
||||
// only documents that actually carry a string externalId.
|
||||
AuditEventSchema.index(
|
||||
{ source: 1, externalId: 1 },
|
||||
{ unique: true, partialFilterExpression: { externalId: { $type: 'string' } } },
|
||||
)
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose'
|
||||
import { HydratedDocument } from 'mongoose'
|
||||
|
||||
export type IngestCursorDocument = HydratedDocument<IngestCursor>
|
||||
|
||||
// One row per ingest source. The worker reads `lastEventAt` on startup, pulls
|
||||
// events with `created > lastEventAt`, then advances the cursor. Safe across
|
||||
// platform-api restarts — the worker resumes where it left off.
|
||||
@Schema({ collection: 'ingest_cursors', timestamps: true })
|
||||
export class IngestCursor {
|
||||
@Prop({ required: true, unique: true, index: true })
|
||||
source!: string // 'authentik' | 'ocis' | 'stalwart'
|
||||
|
||||
@Prop({ required: true })
|
||||
lastEventAt!: Date
|
||||
|
||||
// Optional — the external id of the last event we recorded. Useful when
|
||||
// multiple events share a timestamp; we use it as a tiebreaker so we don't
|
||||
// miss or double-record events that fall on the same millisecond.
|
||||
@Prop()
|
||||
lastEventId?: string
|
||||
}
|
||||
|
||||
export const IngestCursorSchema = SchemaFactory.createForClass(IngestCursor)
|
||||
Reference in New Issue
Block a user