feat(scheduling): tenant webhooks for booking lifecycle
This commit is contained in:
@@ -23,6 +23,7 @@ import type { HostCalendarAccess } from '../stalwart-calendar/calendar-gateway.t
|
||||
import { CredentialProvisioner } from '../stalwart-calendar/credential-provisioner.service.js'
|
||||
import { JmapCalendarGateway } from '../stalwart-calendar/jmap-calendar.gateway.js'
|
||||
import { buildMeetUrl, meetJwtEnabled, type MeetJwtConfig } from './meet-room.js'
|
||||
import { WebhooksService, toWebhookBookingView } from '../webhooks/webhooks.service.js'
|
||||
|
||||
const HOLD_MS = 10 * 60 * 1000
|
||||
|
||||
@@ -67,6 +68,7 @@ export class BookingsService {
|
||||
private readonly provisioner: CredentialProvisioner,
|
||||
private readonly gateway: JmapCalendarGateway,
|
||||
private readonly mailer: JmapMailer,
|
||||
private readonly webhooks: WebhooksService,
|
||||
config: ConfigService,
|
||||
) {
|
||||
this.bookingPublicUrl = (config.get<string>('BOOKING_PUBLIC_URL') ?? 'https://booking.dezky.local').replace(/\/$/, '')
|
||||
@@ -195,6 +197,9 @@ export class BookingsService {
|
||||
if (!written) {
|
||||
throw new ServiceUnavailableException('Could not complete the booking on the calendar — please try again.')
|
||||
}
|
||||
// Booking is now confirmed with a calendar event — emit the lifecycle event.
|
||||
// Fire-and-forget; webhook delivery must never block or break a booking.
|
||||
void this.webhooks.dispatch('booking.created', toWebhookBookingView(booking))
|
||||
return booking
|
||||
}
|
||||
|
||||
@@ -334,6 +339,7 @@ export class BookingsService {
|
||||
this.sendEmail(ctx, booking, access, 'cancellation').catch((e) =>
|
||||
this.logger.warn(`Cancellation email failed: ${e.message}`),
|
||||
)
|
||||
void this.webhooks.dispatch('booking.cancelled', toWebhookBookingView(booking))
|
||||
return booking
|
||||
}
|
||||
|
||||
@@ -372,6 +378,9 @@ export class BookingsService {
|
||||
old.status = 'rescheduled'
|
||||
await old.save()
|
||||
await this.lockModel.deleteOne({ hostId: old.hostId, startUtc: old.startUtc, bookingId: old._id }).exec()
|
||||
// The fresh booking already emitted 'booking.created'; emit 'booking.rescheduled'
|
||||
// for it too (its rescheduledFromBookingId points at the old booking).
|
||||
void this.webhooks.dispatch('booking.rescheduled', toWebhookBookingView(fresh))
|
||||
return fresh
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,8 @@ import { Host, HostSchema } from '../schemas/scheduling-host.schema.js'
|
||||
import { SlotLock, SlotLockSchema } from '../schemas/slot-lock.schema.js'
|
||||
import { Tenant, TenantSchema } from '../schemas/tenant.schema.js'
|
||||
import { User, UserSchema } from '../schemas/user.schema.js'
|
||||
import { WebhookDelivery, WebhookDeliverySchema } from '../schemas/webhook-delivery.schema.js'
|
||||
import { WebhookSubscription, WebhookSubscriptionSchema } from '../schemas/webhook-subscription.schema.js'
|
||||
import { TenantsModule } from '../tenants/tenants.module.js'
|
||||
import { ABUSE_GUARD, abuseGuardFactory } from './abuse/abuse-guard.js'
|
||||
import { AvailabilityService } from './availability/availability.service.js'
|
||||
@@ -26,6 +28,9 @@ import { BookingReminderWorker } from './reminders/booking-reminder.worker.js'
|
||||
import { SchedulingAdminController } from './scheduling-admin.controller.js'
|
||||
import { SlotService } from './slots/slot.service.js'
|
||||
import { StalwartCalendarModule } from './stalwart-calendar/stalwart-calendar.module.js'
|
||||
import { WebhookDeliveryWorker } from './webhooks/webhook-delivery.worker.js'
|
||||
import { WebhooksController } from './webhooks/webhooks.controller.js'
|
||||
import { WebhooksService } from './webhooks/webhooks.service.js'
|
||||
|
||||
// dezky Scheduling — Calendly-style booking on top of Stalwart calendars. Public
|
||||
// pages (booking.dezky.eu) hit the unauthenticated /api/v1/public routes; host
|
||||
@@ -41,6 +46,8 @@ import { StalwartCalendarModule } from './stalwart-calendar/stalwart-calendar.mo
|
||||
{ name: SlotLock.name, schema: SlotLockSchema },
|
||||
{ name: Tenant.name, schema: TenantSchema },
|
||||
{ name: User.name, schema: UserSchema },
|
||||
{ name: WebhookSubscription.name, schema: WebhookSubscriptionSchema },
|
||||
{ name: WebhookDelivery.name, schema: WebhookDeliverySchema },
|
||||
]),
|
||||
// Drives the @Cron booking reminder worker.
|
||||
ScheduleModule.forRoot(),
|
||||
@@ -52,7 +59,7 @@ import { StalwartCalendarModule } from './stalwart-calendar/stalwart-calendar.mo
|
||||
IntegrationsModule, // StalwartClient — host→account lookup during onboarding
|
||||
StalwartCalendarModule,
|
||||
],
|
||||
controllers: [SchedulingAdminController, PublicSchedulingController],
|
||||
controllers: [WebhooksController, SchedulingAdminController, PublicSchedulingController],
|
||||
providers: [
|
||||
HostsService,
|
||||
AvailabilityService,
|
||||
@@ -63,6 +70,8 @@ import { StalwartCalendarModule } from './stalwart-calendar/stalwart-calendar.mo
|
||||
JmapMailer,
|
||||
BookingReminderWorker,
|
||||
CalendarRetryWorker,
|
||||
WebhooksService,
|
||||
WebhookDeliveryWorker,
|
||||
// Pluggable captcha guard for the public booking surface (Turnstile when
|
||||
// TURNSTILE_SECRET is set, otherwise a no-op).
|
||||
{ provide: ABUSE_GUARD, useFactory: abuseGuardFactory, inject: [ConfigService] },
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
import { ArrayNotEmpty, IsArray, IsBoolean, IsIn, IsOptional, IsUrl, MaxLength } from 'class-validator'
|
||||
import { WEBHOOK_EVENTS, type WebhookEvent } from '../../../schemas/webhook-subscription.schema.js'
|
||||
|
||||
// Only allow https receivers (or http for local dev hostnames). Booking payloads
|
||||
// carry attendee PII, so plain http to arbitrary hosts is rejected.
|
||||
const URL_OPTS = { protocols: ['https', 'http'], require_protocol: true, require_tld: false }
|
||||
|
||||
export class CreateWebhookDto {
|
||||
@IsUrl(URL_OPTS, { message: 'url must be a valid http(s) URL' })
|
||||
@MaxLength(2048)
|
||||
url!: string
|
||||
|
||||
@IsArray()
|
||||
@ArrayNotEmpty()
|
||||
@IsIn(WEBHOOK_EVENTS, { each: true, message: `events must be one of: ${WEBHOOK_EVENTS.join(', ')}` })
|
||||
events!: WebhookEvent[]
|
||||
}
|
||||
|
||||
export class UpdateWebhookDto {
|
||||
@IsOptional()
|
||||
@IsUrl(URL_OPTS, { message: 'url must be a valid http(s) URL' })
|
||||
@MaxLength(2048)
|
||||
url?: string
|
||||
|
||||
@IsOptional()
|
||||
@IsArray()
|
||||
@ArrayNotEmpty()
|
||||
@IsIn(WEBHOOK_EVENTS, { each: true, message: `events must be one of: ${WEBHOOK_EVENTS.join(', ')}` })
|
||||
events?: WebhookEvent[]
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
active?: boolean
|
||||
}
|
||||
@@ -0,0 +1,132 @@
|
||||
import { Injectable, Logger } from '@nestjs/common'
|
||||
import { InjectModel } from '@nestjs/mongoose'
|
||||
import { Cron, CronExpression } from '@nestjs/schedule'
|
||||
import { Model } from 'mongoose'
|
||||
import {
|
||||
WebhookDelivery,
|
||||
WebhookDeliveryDocument,
|
||||
} from '../../schemas/webhook-delivery.schema.js'
|
||||
|
||||
const REQUEST_TIMEOUT_MS = 10_000
|
||||
|
||||
// Periodic worker that drains due 'pending' webhook deliveries. For each it POSTs
|
||||
// the captured (already signed) payload to the receiver. A 2xx marks it
|
||||
// 'delivered'; anything else (non-2xx, network error, timeout) increments
|
||||
// attempts and either schedules an exponential-backoff retry or, once
|
||||
// maxAttempts is hit, marks it terminally 'failed'. Each row is claimed
|
||||
// atomically (status flip via findOneAndUpdate) so overlapping runs never
|
||||
// double-send the same delivery.
|
||||
@Injectable()
|
||||
export class WebhookDeliveryWorker {
|
||||
private readonly logger = new Logger(WebhookDeliveryWorker.name)
|
||||
|
||||
constructor(
|
||||
@InjectModel(WebhookDelivery.name)
|
||||
private readonly deliveryModel: Model<WebhookDeliveryDocument>,
|
||||
) {}
|
||||
|
||||
@Cron(CronExpression.EVERY_MINUTE, { name: 'webhook-deliveries' })
|
||||
async run(): Promise<void> {
|
||||
const now = new Date()
|
||||
const due = await this.deliveryModel
|
||||
.find({ status: 'pending', nextAttemptAt: { $lte: now } })
|
||||
.sort({ nextAttemptAt: 1 })
|
||||
.limit(100)
|
||||
.exec()
|
||||
|
||||
for (const delivery of due) {
|
||||
await this.attempt(delivery).catch((e) =>
|
||||
this.logger.warn(`Webhook delivery ${delivery._id} errored: ${(e as Error).message}`),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private async attempt(delivery: WebhookDeliveryDocument): Promise<void> {
|
||||
// Claim it: only proceed if it's still pending and due, bumping nextAttemptAt
|
||||
// far out so a concurrent run won't pick it up while we're in flight.
|
||||
const claim = await this.deliveryModel
|
||||
.findOneAndUpdate(
|
||||
{ _id: delivery._id, status: 'pending', nextAttemptAt: { $lte: new Date() } },
|
||||
{ $set: { nextAttemptAt: new Date(Date.now() + REQUEST_TIMEOUT_MS + 60_000) } },
|
||||
)
|
||||
.exec()
|
||||
if (!claim) return // another run claimed it
|
||||
|
||||
const attempts = claim.attempts + 1
|
||||
try {
|
||||
const res = await this.post(claim.url, claim.payload, claim.signature, claim.event)
|
||||
if (res.ok) {
|
||||
await this.deliveryModel
|
||||
.updateOne(
|
||||
{ _id: claim._id },
|
||||
{ $set: { status: 'delivered', attempts, deliveredAt: new Date(), lastStatusCode: res.status, lastError: undefined } },
|
||||
)
|
||||
.exec()
|
||||
this.logger.log(`Delivered webhook ${claim._id} (${claim.event}) → ${claim.url} [${res.status}]`)
|
||||
return
|
||||
}
|
||||
await this.fail(claim._id, attempts, claim.maxAttempts, `HTTP ${res.status}`, res.status)
|
||||
} catch (err) {
|
||||
await this.fail(claim._id, attempts, claim.maxAttempts, (err as Error).message)
|
||||
}
|
||||
}
|
||||
|
||||
private async post(
|
||||
url: string,
|
||||
body: string,
|
||||
signature: string,
|
||||
event: string,
|
||||
): Promise<{ ok: boolean; status: number }> {
|
||||
const controller = new AbortController()
|
||||
const timer = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS)
|
||||
try {
|
||||
const res = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'user-agent': 'dezky-webhooks/1',
|
||||
'x-dezky-event': event,
|
||||
'X-Dezky-Signature': `sha256=${signature}`,
|
||||
},
|
||||
body,
|
||||
signal: controller.signal,
|
||||
})
|
||||
return { ok: res.ok, status: res.status }
|
||||
} finally {
|
||||
clearTimeout(timer)
|
||||
}
|
||||
}
|
||||
|
||||
private async fail(
|
||||
id: unknown,
|
||||
attempts: number,
|
||||
maxAttempts: number,
|
||||
error: string,
|
||||
statusCode?: number,
|
||||
): Promise<void> {
|
||||
if (attempts >= maxAttempts) {
|
||||
await this.deliveryModel
|
||||
.updateOne({ _id: id as any }, { $set: { status: 'failed', attempts, lastError: error, lastStatusCode: statusCode } })
|
||||
.exec()
|
||||
this.logger.error(`Webhook ${id} failed terminally after ${attempts} attempts: ${error}`)
|
||||
return
|
||||
}
|
||||
// Exponential backoff capped at ~1h: 2^attempts minutes.
|
||||
const backoffMs = Math.min(2 ** attempts, 60) * 60_000
|
||||
await this.deliveryModel
|
||||
.updateOne(
|
||||
{ _id: id as any },
|
||||
{
|
||||
$set: {
|
||||
status: 'pending',
|
||||
attempts,
|
||||
lastError: error,
|
||||
lastStatusCode: statusCode,
|
||||
nextAttemptAt: new Date(Date.now() + backoffMs),
|
||||
},
|
||||
},
|
||||
)
|
||||
.exec()
|
||||
this.logger.warn(`Webhook ${id} attempt ${attempts}/${maxAttempts} failed (${error}); retrying in ${backoffMs / 60_000}m`)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,79 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
Delete,
|
||||
ForbiddenException,
|
||||
Get,
|
||||
HttpCode,
|
||||
Param,
|
||||
Patch,
|
||||
Post,
|
||||
UseGuards,
|
||||
} from '@nestjs/common'
|
||||
import { Types } from 'mongoose'
|
||||
import { ActorService } from '../../auth/actor.service.js'
|
||||
import { CurrentUser } from '../../auth/current-user.decorator.js'
|
||||
import { JwtAuthGuard } from '../../auth/jwt-auth.guard.js'
|
||||
import type { AuthentikJwtPayload } from '../../auth/jwt-payload.interface.js'
|
||||
import { TenantsService } from '../../tenants/tenants.service.js'
|
||||
import { CreateWebhookDto, UpdateWebhookDto } from './dto/webhook-dtos.js'
|
||||
import { WebhooksService } from './webhooks.service.js'
|
||||
|
||||
// Tenant webhook administration. Same base path + gating as the rest of the
|
||||
// scheduling admin surface (platformAdmin OR a member of the tenant). The
|
||||
// signing secret is returned in full on create/get/rotate so the tenant can
|
||||
// configure their receiver's HMAC verification.
|
||||
@Controller('api/v1/tenants/:slug/scheduling/webhooks')
|
||||
@UseGuards(JwtAuthGuard)
|
||||
export class WebhooksController {
|
||||
constructor(
|
||||
private readonly actor: ActorService,
|
||||
private readonly tenants: TenantsService,
|
||||
private readonly webhooks: WebhooksService,
|
||||
) {}
|
||||
|
||||
private async gate(slug: string, jwt: AuthentikJwtPayload): Promise<Types.ObjectId> {
|
||||
const actor = await this.actor.resolve(jwt)
|
||||
const tenant = await this.tenants.findOneBySlug(slug)
|
||||
if (!actor.platformAdmin && !actor.tenantIds.some((id) => id.equals(tenant._id))) {
|
||||
throw new ForbiddenException(`No access to tenant "${slug}"`)
|
||||
}
|
||||
return tenant._id
|
||||
}
|
||||
|
||||
@Get()
|
||||
async list(@Param('slug') slug: string, @CurrentUser() jwt: AuthentikJwtPayload) {
|
||||
return this.webhooks.list(await this.gate(slug, jwt))
|
||||
}
|
||||
|
||||
@Post()
|
||||
async create(@Param('slug') slug: string, @Body() dto: CreateWebhookDto, @CurrentUser() jwt: AuthentikJwtPayload) {
|
||||
return this.webhooks.create(await this.gate(slug, jwt), dto)
|
||||
}
|
||||
|
||||
@Patch(':id')
|
||||
async update(
|
||||
@Param('slug') slug: string,
|
||||
@Param('id') id: string,
|
||||
@Body() dto: UpdateWebhookDto,
|
||||
@CurrentUser() jwt: AuthentikJwtPayload,
|
||||
) {
|
||||
return this.webhooks.update(await this.gate(slug, jwt), id, dto)
|
||||
}
|
||||
|
||||
@Post(':id/rotate-secret')
|
||||
async rotateSecret(@Param('slug') slug: string, @Param('id') id: string, @CurrentUser() jwt: AuthentikJwtPayload) {
|
||||
return this.webhooks.rotateSecret(await this.gate(slug, jwt), id)
|
||||
}
|
||||
|
||||
@Get(':id/deliveries')
|
||||
async deliveries(@Param('slug') slug: string, @Param('id') id: string, @CurrentUser() jwt: AuthentikJwtPayload) {
|
||||
return this.webhooks.listDeliveries(await this.gate(slug, jwt), id)
|
||||
}
|
||||
|
||||
@Delete(':id')
|
||||
@HttpCode(204)
|
||||
async remove(@Param('slug') slug: string, @Param('id') id: string, @CurrentUser() jwt: AuthentikJwtPayload) {
|
||||
await this.webhooks.remove(await this.gate(slug, jwt), id)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,207 @@
|
||||
import { ConflictException, ForbiddenException, Injectable, Logger, NotFoundException } from '@nestjs/common'
|
||||
import { InjectModel } from '@nestjs/mongoose'
|
||||
import { createHmac, randomBytes } from 'node:crypto'
|
||||
import { Model, Types } from 'mongoose'
|
||||
import type { BookingDocument } from '../../schemas/booking.schema.js'
|
||||
import {
|
||||
WebhookDelivery,
|
||||
WebhookDeliveryDocument,
|
||||
} from '../../schemas/webhook-delivery.schema.js'
|
||||
import {
|
||||
WebhookSubscription,
|
||||
WebhookSubscriptionDocument,
|
||||
type WebhookEvent,
|
||||
} from '../../schemas/webhook-subscription.schema.js'
|
||||
import type { CreateWebhookDto, UpdateWebhookDto } from './dto/webhook-dtos.js'
|
||||
|
||||
const MAX_ATTEMPTS = 8
|
||||
|
||||
// Minimal, booking-agnostic view of a booking the dispatcher serialises into the
|
||||
// webhook payload. Keeping this a plain interface (not the Mongoose document)
|
||||
// keeps BookingsService decoupled from the webhook wire format.
|
||||
export interface WebhookBookingView {
|
||||
id: string
|
||||
tenantId: Types.ObjectId
|
||||
hostId: Types.ObjectId
|
||||
eventTypeId: Types.ObjectId
|
||||
status: string
|
||||
startUtc: Date
|
||||
endUtc: Date
|
||||
attendeeName: string
|
||||
attendeeEmail: string
|
||||
attendeeTimezone: string
|
||||
attendeeNotes?: string
|
||||
locationType?: string
|
||||
locationUrl?: string
|
||||
cancellationReason?: string
|
||||
rescheduledFromBookingId?: Types.ObjectId | null
|
||||
}
|
||||
|
||||
// Maps a persisted booking document to the wire view. Exported so callers
|
||||
// (BookingsService) can hand us a plain view without us importing their types.
|
||||
export function toWebhookBookingView(b: BookingDocument): WebhookBookingView {
|
||||
return {
|
||||
id: String(b._id),
|
||||
tenantId: b.tenantId,
|
||||
hostId: b.hostId,
|
||||
eventTypeId: b.eventTypeId,
|
||||
status: b.status,
|
||||
startUtc: b.startUtc,
|
||||
endUtc: b.endUtc,
|
||||
attendeeName: b.attendeeName,
|
||||
attendeeEmail: b.attendeeEmail,
|
||||
attendeeTimezone: b.attendeeTimezone,
|
||||
attendeeNotes: b.attendeeNotes,
|
||||
locationType: b.locationType,
|
||||
locationUrl: b.locationUrl,
|
||||
cancellationReason: b.cancellationReason,
|
||||
rescheduledFromBookingId: b.rescheduledFromBookingId ?? null,
|
||||
}
|
||||
}
|
||||
|
||||
// Tenant webhooks for booking lifecycle. Admin CRUD over subscriptions plus a
|
||||
// fire-and-forget `dispatch` that fans an event out to every active matching
|
||||
// subscription by persisting one signed WebhookDelivery per subscription. The
|
||||
// actual HTTP POSTs (with retries) are driven asynchronously by the
|
||||
// WebhookDeliveryWorker so booking flows never block on a slow receiver.
|
||||
@Injectable()
|
||||
export class WebhooksService {
|
||||
private readonly logger = new Logger(WebhooksService.name)
|
||||
|
||||
constructor(
|
||||
@InjectModel(WebhookSubscription.name)
|
||||
private readonly subModel: Model<WebhookSubscriptionDocument>,
|
||||
@InjectModel(WebhookDelivery.name)
|
||||
private readonly deliveryModel: Model<WebhookDeliveryDocument>,
|
||||
) {}
|
||||
|
||||
// ── Admin CRUD ─────────────────────────────────────────────────────────────
|
||||
list(tenantId: Types.ObjectId): Promise<WebhookSubscriptionDocument[]> {
|
||||
return this.subModel.find({ tenantId }).sort({ createdAt: -1 }).exec()
|
||||
}
|
||||
|
||||
async create(tenantId: Types.ObjectId, dto: CreateWebhookDto): Promise<WebhookSubscriptionDocument> {
|
||||
const existing = await this.subModel.findOne({ tenantId, url: dto.url }).exec()
|
||||
if (existing) throw new ConflictException('A webhook with that URL already exists.')
|
||||
return this.subModel.create({
|
||||
tenantId,
|
||||
url: dto.url,
|
||||
events: dto.events,
|
||||
secret: generateSecret(),
|
||||
active: true,
|
||||
})
|
||||
}
|
||||
|
||||
async update(tenantId: Types.ObjectId, id: string, dto: UpdateWebhookDto): Promise<WebhookSubscriptionDocument> {
|
||||
const sub = await this.getOwned(tenantId, id)
|
||||
if (dto.url !== undefined) sub.url = dto.url
|
||||
if (dto.events !== undefined) sub.events = dto.events
|
||||
if (dto.active !== undefined) sub.active = dto.active
|
||||
return sub.save()
|
||||
}
|
||||
|
||||
async rotateSecret(tenantId: Types.ObjectId, id: string): Promise<WebhookSubscriptionDocument> {
|
||||
const sub = await this.getOwned(tenantId, id)
|
||||
sub.secret = generateSecret()
|
||||
return sub.save()
|
||||
}
|
||||
|
||||
async remove(tenantId: Types.ObjectId, id: string): Promise<void> {
|
||||
const sub = await this.getOwned(tenantId, id)
|
||||
await this.subModel.deleteOne({ _id: sub._id }).exec()
|
||||
}
|
||||
|
||||
// Recent deliveries for a subscription (admin observability).
|
||||
async listDeliveries(tenantId: Types.ObjectId, id: string): Promise<WebhookDeliveryDocument[]> {
|
||||
await this.getOwned(tenantId, id)
|
||||
return this.deliveryModel
|
||||
.find({ tenantId, subscriptionId: new Types.ObjectId(id) })
|
||||
.sort({ createdAt: -1 })
|
||||
.limit(50)
|
||||
.exec()
|
||||
}
|
||||
|
||||
private async getOwned(tenantId: Types.ObjectId, id: string): Promise<WebhookSubscriptionDocument> {
|
||||
if (!Types.ObjectId.isValid(id)) throw new NotFoundException('Webhook not found')
|
||||
const sub = await this.subModel.findById(id).exec()
|
||||
if (!sub) throw new NotFoundException('Webhook not found')
|
||||
if (!sub.tenantId.equals(tenantId)) throw new ForbiddenException('No access to this webhook')
|
||||
return sub
|
||||
}
|
||||
|
||||
// ── Dispatch (called from BookingsService) ─────────────────────────────────
|
||||
/**
|
||||
* Fan an event out to every active subscription for the tenant that listens
|
||||
* for it, by persisting one signed WebhookDelivery each. Best-effort and
|
||||
* non-throwing: callers (booking flows) should `.catch()` defensively but a
|
||||
* failure here must never break a booking. Returns the number of deliveries
|
||||
* enqueued.
|
||||
*/
|
||||
async dispatch(event: WebhookEvent, booking: WebhookBookingView): Promise<number> {
|
||||
try {
|
||||
const subs = await this.subModel
|
||||
.find({ tenantId: booking.tenantId, active: true, events: event })
|
||||
.exec()
|
||||
if (subs.length === 0) return 0
|
||||
|
||||
const sentAt = new Date()
|
||||
let enqueued = 0
|
||||
for (const sub of subs) {
|
||||
const payload = JSON.stringify({
|
||||
event,
|
||||
sentAt: sentAt.toISOString(),
|
||||
subscriptionId: String(sub._id),
|
||||
data: serialiseBooking(booking),
|
||||
})
|
||||
const signature = sign(sub.secret, payload)
|
||||
await this.deliveryModel.create({
|
||||
tenantId: booking.tenantId,
|
||||
subscriptionId: sub._id,
|
||||
event,
|
||||
url: sub.url,
|
||||
payload,
|
||||
signature,
|
||||
status: 'pending',
|
||||
attempts: 0,
|
||||
maxAttempts: MAX_ATTEMPTS,
|
||||
nextAttemptAt: sentAt,
|
||||
})
|
||||
enqueued += 1
|
||||
}
|
||||
return enqueued
|
||||
} catch (err) {
|
||||
this.logger.error(`Webhook dispatch for "${event}" failed: ${(err as Error).message}`)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Hex HMAC-SHA256 of the body under the subscription secret.
|
||||
export function sign(secret: string, body: string): string {
|
||||
return createHmac('sha256', secret).update(body).digest('hex')
|
||||
}
|
||||
|
||||
function generateSecret(): string {
|
||||
return `whsec_${randomBytes(24).toString('hex')}`
|
||||
}
|
||||
|
||||
// Public-safe booking projection for the payload. No manage token / internal
|
||||
// calendar ids — just the booking facts a receiver needs.
|
||||
function serialiseBooking(b: WebhookBookingView) {
|
||||
return {
|
||||
id: b.id,
|
||||
hostId: String(b.hostId),
|
||||
eventTypeId: String(b.eventTypeId),
|
||||
status: b.status,
|
||||
startUtc: b.startUtc.toISOString(),
|
||||
endUtc: b.endUtc.toISOString(),
|
||||
attendeeName: b.attendeeName,
|
||||
attendeeEmail: b.attendeeEmail,
|
||||
attendeeTimezone: b.attendeeTimezone,
|
||||
attendeeNotes: b.attendeeNotes ?? null,
|
||||
locationType: b.locationType ?? null,
|
||||
locationUrl: b.locationUrl ?? null,
|
||||
cancellationReason: b.cancellationReason ?? null,
|
||||
rescheduledFromBookingId: b.rescheduledFromBookingId ? String(b.rescheduledFromBookingId) : null,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose'
|
||||
import { HydratedDocument, Types } from 'mongoose'
|
||||
import type { WebhookEvent } from './webhook-subscription.schema.js'
|
||||
|
||||
export type WebhookDeliveryDocument = HydratedDocument<WebhookDelivery>
|
||||
|
||||
// 'pending' — queued, waiting for the delivery worker (nextAttemptAt due).
|
||||
// 'delivered' — receiver returned 2xx.
|
||||
// 'failed' — exhausted maxAttempts; terminal.
|
||||
export type WebhookDeliveryStatus = 'pending' | 'delivered' | 'failed'
|
||||
|
||||
// One outbound delivery attempt-set for a single (subscription, event) pair. The
|
||||
// payload + signature are captured at enqueue time so retries replay the exact
|
||||
// same signed body. The @nestjs/schedule worker claims due 'pending' rows,
|
||||
// POSTs them, and on non-2xx schedules an exponential backoff retry until
|
||||
// maxAttempts is reached.
|
||||
@Schema({ collection: 'scheduling_webhook_deliveries', timestamps: true })
|
||||
export class WebhookDelivery {
|
||||
@Prop({ type: Types.ObjectId, ref: 'Tenant', required: true, index: true })
|
||||
tenantId!: Types.ObjectId
|
||||
|
||||
@Prop({ type: Types.ObjectId, ref: 'WebhookSubscription', required: true, index: true })
|
||||
subscriptionId!: Types.ObjectId
|
||||
|
||||
@Prop({ required: true })
|
||||
event!: WebhookEvent
|
||||
|
||||
@Prop({ required: true })
|
||||
url!: string
|
||||
|
||||
// The exact JSON body string that was signed (so retries are byte-identical).
|
||||
@Prop({ required: true })
|
||||
payload!: string
|
||||
|
||||
// Hex HMAC-SHA256 of `payload` under the subscription secret. Sent as the
|
||||
// X-Dezky-Signature header.
|
||||
@Prop({ required: true })
|
||||
signature!: string
|
||||
|
||||
@Prop({ enum: ['pending', 'delivered', 'failed'], default: 'pending', index: true })
|
||||
status!: WebhookDeliveryStatus
|
||||
|
||||
@Prop({ default: 0 })
|
||||
attempts!: number
|
||||
|
||||
@Prop({ required: true, default: 8 })
|
||||
maxAttempts!: number
|
||||
|
||||
// When the next attempt is due. The worker polls for due 'pending' rows.
|
||||
@Prop({ required: true, index: true })
|
||||
nextAttemptAt!: Date
|
||||
|
||||
@Prop()
|
||||
lastError?: string
|
||||
|
||||
@Prop()
|
||||
lastStatusCode?: number
|
||||
|
||||
@Prop()
|
||||
deliveredAt?: Date
|
||||
}
|
||||
|
||||
export const WebhookDeliverySchema = SchemaFactory.createForClass(WebhookDelivery)
|
||||
@@ -0,0 +1,36 @@
|
||||
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose'
|
||||
import { HydratedDocument, Types } from 'mongoose'
|
||||
|
||||
export type WebhookSubscriptionDocument = HydratedDocument<WebhookSubscription>
|
||||
|
||||
// Booking lifecycle events a tenant can subscribe to. Kept in sync with the
|
||||
// event names WebhookService.dispatch emits from BookingsService.
|
||||
export type WebhookEvent = 'booking.created' | 'booking.cancelled' | 'booking.rescheduled'
|
||||
|
||||
export const WEBHOOK_EVENTS: WebhookEvent[] = ['booking.created', 'booking.cancelled', 'booking.rescheduled']
|
||||
|
||||
// A tenant-configured HTTPS endpoint that receives signed POSTs for booking
|
||||
// lifecycle events. The `secret` is the HMAC-SHA256 key used to sign each
|
||||
// delivery body (header X-Dezky-Signature); the tenant stores the same secret to
|
||||
// verify. Inactive subscriptions are skipped by the dispatcher.
|
||||
@Schema({ collection: 'scheduling_webhook_subscriptions', timestamps: true })
|
||||
export class WebhookSubscription {
|
||||
@Prop({ type: Types.ObjectId, ref: 'Tenant', required: true, index: true })
|
||||
tenantId!: Types.ObjectId
|
||||
|
||||
@Prop({ required: true, trim: true })
|
||||
url!: string
|
||||
|
||||
// HMAC signing secret. Generated server-side; surfaced to the admin once on
|
||||
// create/rotate so they can configure their receiver's verification.
|
||||
@Prop({ required: true })
|
||||
secret!: string
|
||||
|
||||
@Prop({ type: [String], enum: WEBHOOK_EVENTS, default: WEBHOOK_EVENTS })
|
||||
events!: WebhookEvent[]
|
||||
|
||||
@Prop({ default: true })
|
||||
active!: boolean
|
||||
}
|
||||
|
||||
export const WebhookSubscriptionSchema = SchemaFactory.createForClass(WebhookSubscription)
|
||||
Reference in New Issue
Block a user