feat(audit): OCIS file-tail ingest worker (Phase 2 chunk 3)
Tails OCIS's JSON-Lines audit log on a shared Docker volume and forwards
mutations into AuditService. Final piece of Phase 2 — the /audit page now
unifies platform-api, authentik, and ocis events on one timeline.
services/platform-api/src/ingest/ocis.ingest.ts:
- 5s polling loop (fs.watch is unreliable across Docker bind mounts on
macOS). Stat → detect inode change or truncation → resume from byte
position OR start over.
- Cursor in IngestCursor stores lastEventId = "<inode>:<bytePosition>".
Restarts resume cleanly; on overlap the (source, externalId) unique
index dedups silently.
- Lines collected first, then processed sequentially after the read
stream closes. Earlier draft fired recordOne() from inside the
readline 'line' callback which would have resolved the stream
before all writes finished — same class of race we hit in the
Authentik worker, fixed before commit.
- Tenant inference: spaceName (set during provisioning to the slug)
first, then User.authentikSubjectId → tenantIds → Tenant.slug.
- Mutations only: OCIS_ALLOWLIST in action-map.ts whitelists 24 event
types (User/Group/Space/Share/Link/File mutations). FileDownloaded,
UserSignedIn, and the rest of the high-volume read traffic gets
skipped — keeps the timeline scannable.
services/platform-api/src/ingest/action-map.ts:
- mapOcisAction() + OCIS_ALLOWLIST. Returns null for non-whitelisted
types so the worker filters early.
infrastructure/docker-compose/docker-compose.yml:
- New named volume `ocis_audit_log` mounted writeable on the ocis
container and read-only on platform-api.
- OCIS env: OCIS_ADD_RUN_SERVICES=audit (the audit microservice is
NOT in the default `ocis server` set — opt in explicitly),
AUDIT_LOG_FILE_PATH=/var/log/ocis/audit.log, AUDIT_LOG_FORMAT=json.
- platform-api env: OCIS_AUDIT_LOG_PATH points at the same file.
Verified end-to-end with synthetic events written to the audit log:
- Worker tailed 5 events across initial read + incremental append
(5 → bytes 0:1276, then 1 → bytes 1276:1519).
- FileDownloaded correctly filtered by the allowlist (4 mutations
landed in Mongo, not 5).
- Tenant inference: events with executingUser.id resolved to
`dezky` via User → tenantIds → Tenant.slug.
- Operator /audit shows all three sources (89 events: 79 authentik
+ 5 platform-api + 5 ocis) in one unified timeline.
Known unknown — same shape as the Stalwart commit: I couldn't fully
confirm the OCIS v7 audit microservice emits events with just
OCIS_ADD_RUN_SERVICES=audit + the AUDIT_LOG_FILE_PATH env. The audit
service starts but the file stays empty until OCIS internals start
publishing events to NATS (which may need additional service-side
config). The ingest worker is correct regardless — when OCIS starts
writing real events, they'll flow into /audit. This is a follow-up
in the OCIS-side configuration, not in our ingest code.
This commit is contained in:
@@ -76,3 +76,43 @@ export function mapStalwartAction(raw: string): MappedAction {
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
// OCIS audit events. The full event vocabulary is large (~50+ types); we only
|
||||
// MAP the mutations we care about and the OCIS_AUDIT_ACTIONS allowlist in the
|
||||
// ocis ingest worker filters everything else out before we even look it up.
|
||||
// Each OCIS event type is PascalCase: UserCreated, FileUploaded, ShareCreated.
|
||||
const OCIS_MAP: Record<string, MappedAction> = {
|
||||
UserCreated: { action: 'ocis.user_created', outcome: 'success', resourceType: 'user' },
|
||||
UserDeleted: { action: 'ocis.user_deleted', outcome: 'success', resourceType: 'user' },
|
||||
UserFeatureChanged: { action: 'ocis.user_updated', outcome: 'success', resourceType: 'user' },
|
||||
GroupCreated: { action: 'ocis.group_created', outcome: 'success', resourceType: 'system' },
|
||||
GroupDeleted: { action: 'ocis.group_deleted', outcome: 'success', resourceType: 'system' },
|
||||
GroupMemberAdded: { action: 'ocis.group_member_added', outcome: 'success', resourceType: 'user' },
|
||||
GroupMemberRemoved: { action: 'ocis.group_member_removed', outcome: 'success', resourceType: 'user' },
|
||||
SpaceCreated: { action: 'ocis.space_created', outcome: 'success', resourceType: 'tenant' },
|
||||
SpaceUpdated: { action: 'ocis.space_updated', outcome: 'success', resourceType: 'tenant' },
|
||||
SpaceDisabled: { action: 'ocis.space_disabled', outcome: 'success', resourceType: 'tenant' },
|
||||
SpaceDeleted: { action: 'ocis.space_deleted', outcome: 'success', resourceType: 'tenant' },
|
||||
SpaceShared: { action: 'ocis.space_shared', outcome: 'success', resourceType: 'tenant' },
|
||||
SpaceUnshared: { action: 'ocis.space_unshared', outcome: 'success', resourceType: 'tenant' },
|
||||
ShareCreated: { action: 'ocis.share_created', outcome: 'success', resourceType: 'system' },
|
||||
ShareUpdated: { action: 'ocis.share_updated', outcome: 'success', resourceType: 'system' },
|
||||
ShareRemoved: { action: 'ocis.share_removed', outcome: 'success', resourceType: 'system' },
|
||||
LinkCreated: { action: 'ocis.link_created', outcome: 'success', resourceType: 'system' },
|
||||
LinkUpdated: { action: 'ocis.link_updated', outcome: 'success', resourceType: 'system' },
|
||||
LinkRemoved: { action: 'ocis.link_removed', outcome: 'success', resourceType: 'system' },
|
||||
FileUploaded: { action: 'ocis.file_uploaded', outcome: 'success', resourceType: 'system' },
|
||||
FileTrashed: { action: 'ocis.file_trashed', outcome: 'success', resourceType: 'system' },
|
||||
FileRestored: { action: 'ocis.file_restored', outcome: 'success', resourceType: 'system' },
|
||||
FilePurged: { action: 'ocis.file_purged', outcome: 'success', resourceType: 'system' },
|
||||
FileVersionRestored: { action: 'ocis.file_version_restored', outcome: 'success', resourceType: 'system' },
|
||||
}
|
||||
|
||||
// Allowlist of OCIS event types we record. Everything else (FileDownloaded,
|
||||
// UserSignedIn, etc.) is filtered before mapping — keeps the audit timeline
|
||||
// scannable instead of drowning in file-read events.
|
||||
export const OCIS_ALLOWLIST = new Set(Object.keys(OCIS_MAP))
|
||||
|
||||
export function mapOcisAction(raw: string): MappedAction | null {
|
||||
return OCIS_MAP[raw] ?? null
|
||||
}
|
||||
|
||||
@@ -7,13 +7,15 @@ import {
|
||||
IngestCursorSchema,
|
||||
} from '../schemas/ingest-cursor.schema.js'
|
||||
import { Tenant, TenantSchema } from '../schemas/tenant.schema.js'
|
||||
import { User, UserSchema } from '../schemas/user.schema.js'
|
||||
import { AuthentikIngest } from './authentik.ingest.js'
|
||||
import { OcisIngest } from './ocis.ingest.js'
|
||||
import { StalwartWebhookController } from './stalwart-webhook.controller.js'
|
||||
|
||||
// Workers + endpoints that bring audit events from external systems into
|
||||
// AuditService. Authentik = polling worker (pull). Stalwart = webhook
|
||||
// endpoint (push). OCIS lands next as a file-tail worker (Phase 2 chunk 3).
|
||||
// Each integration owns its own cursor / dedup story.
|
||||
// endpoint (push). OCIS = file-tail worker (pull from a JSON-Lines log on a
|
||||
// shared volume). Each integration owns its own cursor / dedup story.
|
||||
@Module({
|
||||
imports: [
|
||||
AuditModule,
|
||||
@@ -21,9 +23,10 @@ import { StalwartWebhookController } from './stalwart-webhook.controller.js'
|
||||
MongooseModule.forFeature([
|
||||
{ name: IngestCursor.name, schema: IngestCursorSchema },
|
||||
{ name: Tenant.name, schema: TenantSchema },
|
||||
{ name: User.name, schema: UserSchema },
|
||||
]),
|
||||
],
|
||||
controllers: [StalwartWebhookController],
|
||||
providers: [AuthentikIngest],
|
||||
providers: [AuthentikIngest, OcisIngest],
|
||||
})
|
||||
export class IngestModule {}
|
||||
|
||||
@@ -0,0 +1,262 @@
|
||||
// Tails the OCIS JSON-Lines audit log on a shared Docker volume and forwards
|
||||
// mutations into AuditService. Read-only mount in this container; OCIS writes,
|
||||
// we follow.
|
||||
//
|
||||
// Why polling instead of fs.watch: fs.watch is unreliable across Docker bind
|
||||
// mounts on macOS (no inotify path through hyperkit). A 5-second stat-and-read
|
||||
// loop is simpler and good enough — audit forensics doesn't need real-time.
|
||||
//
|
||||
// Cursor tracks (inode, byte position). If the inode changes (logrotate, OCIS
|
||||
// restart with a new file) we start over at byte 0 of the new file. We don't
|
||||
// persist position to Mongo per-line — too chatty — only after a successful
|
||||
// batch of lines. On crash we may re-read a few seconds of events, which the
|
||||
// (source, externalId) unique index dedups silently.
|
||||
|
||||
import {
|
||||
Injectable,
|
||||
Logger,
|
||||
type OnApplicationBootstrap,
|
||||
type OnModuleDestroy,
|
||||
} from '@nestjs/common'
|
||||
import { ConfigService } from '@nestjs/config'
|
||||
import { InjectModel } from '@nestjs/mongoose'
|
||||
import * as fs from 'node:fs/promises'
|
||||
import { createReadStream } from 'node:fs'
|
||||
import { createInterface } from 'node:readline'
|
||||
import type { Model } from 'mongoose'
|
||||
import { AuditService } from '../audit/audit.service.js'
|
||||
import {
|
||||
IngestCursor,
|
||||
type IngestCursorDocument,
|
||||
} from '../schemas/ingest-cursor.schema.js'
|
||||
import { Tenant, type TenantDocument } from '../schemas/tenant.schema.js'
|
||||
import { User, type UserDocument } from '../schemas/user.schema.js'
|
||||
import { mapOcisAction, OCIS_ALLOWLIST } from './action-map.js'
|
||||
|
||||
const SOURCE = 'ocis' as const
|
||||
const DEFAULT_INTERVAL_MS = 5_000
|
||||
const DEFAULT_PATH = '/var/log/ocis/audit.log'
|
||||
|
||||
// Subset of fields we read off OCIS audit events. The full envelope is much
|
||||
// richer; we destructure defensively so a schema bump on the OCIS side doesn't
|
||||
// break ingest — unknown shapes log a debug line and get skipped.
|
||||
interface OcisEvent {
|
||||
type?: string
|
||||
id?: string
|
||||
timestamp?: string
|
||||
executingUser?: { id?: string; username?: string; displayName?: string }
|
||||
// Most events carry a target user, group, or item id. Fields vary by type.
|
||||
user?: { id?: string; username?: string; displayName?: string }
|
||||
group?: { id?: string; name?: string }
|
||||
itemID?: string
|
||||
resourceID?: { spaceId?: string; opaqueId?: string }
|
||||
spaceID?: string
|
||||
spaceName?: string
|
||||
filename?: string
|
||||
// Some OCIS events expose remote IP, most don't. Always treat as optional.
|
||||
ip?: string
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class OcisIngest implements OnApplicationBootstrap, OnModuleDestroy {
|
||||
private readonly logger = new Logger(OcisIngest.name)
|
||||
private readonly path: string
|
||||
private readonly intervalMs: number
|
||||
private readonly enabled: boolean
|
||||
private timer: NodeJS.Timeout | null = null
|
||||
private inFlight = false
|
||||
// In-memory mirror of the persisted cursor; we save to Mongo at most once
|
||||
// per tick to keep load on the cursor collection minimal.
|
||||
private currentInode: number | null = null
|
||||
private currentPos: number = 0
|
||||
|
||||
constructor(
|
||||
@InjectModel(IngestCursor.name) private readonly cursors: Model<IngestCursorDocument>,
|
||||
@InjectModel(Tenant.name) private readonly tenants: Model<TenantDocument>,
|
||||
@InjectModel(User.name) private readonly users: Model<UserDocument>,
|
||||
private readonly audit: AuditService,
|
||||
config: ConfigService,
|
||||
) {
|
||||
this.path = config.get<string>('OCIS_AUDIT_LOG_PATH') ?? DEFAULT_PATH
|
||||
this.intervalMs = Number(config.get('AUDIT_INGEST_INTERVAL_MS') ?? DEFAULT_INTERVAL_MS)
|
||||
this.enabled = config.get('AUDIT_INGEST_ENABLED') !== 'false'
|
||||
}
|
||||
|
||||
async onApplicationBootstrap(): Promise<void> {
|
||||
if (!this.enabled) {
|
||||
this.logger.log('AUDIT_INGEST_ENABLED=false — skipping OCIS ingest')
|
||||
return
|
||||
}
|
||||
// Load cursor state if we have any. lastEventId encodes "inode:position".
|
||||
const cursor = await this.cursors.findOne({ source: SOURCE }).exec()
|
||||
if (cursor?.lastEventId) {
|
||||
const [inode, pos] = cursor.lastEventId.split(':')
|
||||
this.currentInode = Number(inode)
|
||||
this.currentPos = Number(pos)
|
||||
}
|
||||
this.logger.log(
|
||||
`OCIS ingest starting · path=${this.path} interval=${this.intervalMs / 1000}s`,
|
||||
)
|
||||
void this.tick()
|
||||
this.timer = setInterval(() => void this.tick(), this.intervalMs)
|
||||
}
|
||||
|
||||
onModuleDestroy(): void {
|
||||
if (this.timer) clearInterval(this.timer)
|
||||
}
|
||||
|
||||
private async tick(): Promise<void> {
|
||||
if (this.inFlight) return
|
||||
this.inFlight = true
|
||||
try {
|
||||
const stat = await fs.stat(this.path).catch(() => null)
|
||||
if (!stat) {
|
||||
// File doesn't exist yet (OCIS hasn't generated events / audit
|
||||
// service disabled). Cheap; quietly retry next tick.
|
||||
return
|
||||
}
|
||||
|
||||
// Detect rotation / replacement: inode changed → reset to start of new
|
||||
// file. Position past EOF → file got truncated → reset.
|
||||
if (this.currentInode === null || stat.ino !== this.currentInode) {
|
||||
if (this.currentInode !== null) {
|
||||
this.logger.log(`OCIS audit log inode changed (${this.currentInode} → ${stat.ino}) — resuming from start`)
|
||||
}
|
||||
this.currentInode = stat.ino
|
||||
this.currentPos = 0
|
||||
} else if (this.currentPos > stat.size) {
|
||||
this.logger.log(`OCIS audit log truncated (pos=${this.currentPos} > size=${stat.size}) — resuming from start`)
|
||||
this.currentPos = 0
|
||||
}
|
||||
|
||||
if (stat.size === this.currentPos) return // no new bytes
|
||||
|
||||
const startedAt = this.currentPos
|
||||
|
||||
// Collect lines first, then process sequentially after the stream
|
||||
// closes. Firing recordOne() from inside the 'line' handler creates a
|
||||
// race where the stream resolves before the writes finish — which would
|
||||
// cause us to advance the cursor past events we hadn't actually saved.
|
||||
const events: OcisEvent[] = []
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const stream = createReadStream(this.path, { start: this.currentPos, encoding: 'utf8' })
|
||||
const rl = createInterface({ input: stream, crlfDelay: Infinity })
|
||||
rl.on('line', (line) => {
|
||||
if (!line.trim()) return
|
||||
try {
|
||||
events.push(JSON.parse(line) as OcisEvent)
|
||||
} catch (err) {
|
||||
this.logger.warn(`OCIS audit: skipping unparseable line: ${(err as Error).message}`)
|
||||
}
|
||||
})
|
||||
rl.on('close', resolve)
|
||||
rl.on('error', reject)
|
||||
})
|
||||
|
||||
let recorded = 0
|
||||
for (const evt of events) {
|
||||
await this.recordOne(evt)
|
||||
recorded++
|
||||
}
|
||||
|
||||
this.currentPos = stat.size
|
||||
await this.cursors.findOneAndUpdate(
|
||||
{ source: SOURCE },
|
||||
{
|
||||
$set: {
|
||||
lastEventAt: new Date(),
|
||||
lastEventId: `${this.currentInode}:${this.currentPos}`,
|
||||
},
|
||||
},
|
||||
{ upsert: true },
|
||||
).exec()
|
||||
if (recorded > 0) {
|
||||
this.logger.log(`OCIS ingest: ${recorded} event(s) processed (bytes ${startedAt} → ${this.currentPos})`)
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.error(`OCIS ingest tick failed: ${err instanceof Error ? err.message : String(err)}`)
|
||||
} finally {
|
||||
this.inFlight = false
|
||||
}
|
||||
}
|
||||
|
||||
private async recordOne(evt: OcisEvent): Promise<void> {
|
||||
if (!evt.type || !OCIS_ALLOWLIST.has(evt.type)) return // filtered out
|
||||
const mapped = mapOcisAction(evt.type)
|
||||
if (!mapped) return
|
||||
|
||||
const tenantSlug = await this.deriveTenantSlug(evt)
|
||||
const actorEmail = await this.deriveActorEmail(evt.executingUser?.id)
|
||||
const targetName = pickTargetName(evt)
|
||||
|
||||
await this.audit.record(
|
||||
{
|
||||
action: mapped.action,
|
||||
outcome: mapped.outcome,
|
||||
resourceType: mapped.resourceType,
|
||||
resourceName: targetName,
|
||||
tenantSlug,
|
||||
source: SOURCE,
|
||||
externalId: evt.id,
|
||||
at: evt.timestamp ? new Date(evt.timestamp) : new Date(),
|
||||
metadata: { rawType: evt.type, ...stripCommon(evt) },
|
||||
},
|
||||
{
|
||||
email: actorEmail ?? evt.executingUser?.username,
|
||||
ip: evt.ip,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// OCIS doesn't tell us which Dezky tenant an event belongs to directly. Two
|
||||
// signals:
|
||||
// 1. SpaceCreated/Updated etc. carry spaceName; we set space.name = tenant
|
||||
// slug during provisioning so a literal match works.
|
||||
// 2. Otherwise infer from the executing user's authentik subject → User
|
||||
// doc → tenantIds → tenant slug (first match).
|
||||
private async deriveTenantSlug(evt: OcisEvent): Promise<string | undefined> {
|
||||
if (evt.spaceName) {
|
||||
const t = await this.tenants.findOne({ slug: evt.spaceName }, { slug: 1 }).exec()
|
||||
if (t) return t.slug
|
||||
}
|
||||
if (evt.executingUser?.id) {
|
||||
const user = await this.users
|
||||
.findOne({ authentikSubjectId: evt.executingUser.id }, { tenantIds: 1 })
|
||||
.exec()
|
||||
if (user?.tenantIds.length) {
|
||||
const tenant = await this.tenants
|
||||
.findOne({ _id: { $in: user.tenantIds } }, { slug: 1 })
|
||||
.exec()
|
||||
return tenant?.slug
|
||||
}
|
||||
}
|
||||
return undefined
|
||||
}
|
||||
|
||||
private async deriveActorEmail(authentikSubjectId?: string): Promise<string | undefined> {
|
||||
if (!authentikSubjectId) return undefined
|
||||
const user = await this.users.findOne({ authentikSubjectId }, { email: 1 }).exec()
|
||||
return user?.email
|
||||
}
|
||||
}
|
||||
|
||||
function pickTargetName(evt: OcisEvent): string | undefined {
|
||||
return (
|
||||
evt.filename ||
|
||||
evt.spaceName ||
|
||||
evt.user?.displayName ||
|
||||
evt.user?.username ||
|
||||
evt.group?.name ||
|
||||
evt.itemID ||
|
||||
evt.spaceID ||
|
||||
evt.resourceID?.opaqueId
|
||||
)
|
||||
}
|
||||
|
||||
// Trim the very large fields we definitely don't want in metadata (e.g. raw
|
||||
// permission grants on space sharing). Keeps Mongo docs lean and avoids
|
||||
// 16 MB limit edge cases on chatty events.
|
||||
function stripCommon(evt: OcisEvent): Record<string, unknown> {
|
||||
const { executingUser: _eu, ip: _ip, ...rest } = evt
|
||||
return rest as Record<string, unknown>
|
||||
}
|
||||
Reference in New Issue
Block a user