On this page
emitAsync Stamp Gating for Idempotent Bootstrap Retries
A bootstrap that emits sync to a queue then stamps "done" silently strands downstream when Redis blips. emitAsync gates the stamp on enqueue admission.
ContactCacheService.refreshFromGoogle wrote contacts to Postgres, fired
off per-email events to a queue listener, then stamped contactsSyncedAt = now(). The code looked correct. But when Redis blipped,
the listener’s queue.add rejected into the void — the promise was detached
because the emit was synchronous fire-and-forget — and the stamp landed
anyway. The next call short-circuited on the stamp, never re-fanned-out, and
the downstream Typesense index stayed empty. Users searched and got zero
results with no exception surfaced anywhere.
How the Silent Failure Plays Out
A bootstrap path that fans out to a queue and writes a “done” sentinel afterwards has a silent failure mode when the emit is sync fire-and-forget:
- Service writes rows to DB.
- Service loops
eventEmitter.emit('topic', evt)per row.emitis sync and does NOT await listener returns. The listener doesawait queue.add(...)— but that promise is detached. - Service writes
bootstrapped_at = now(). - Listener’s
queue.addrejects (Redis unreachable). The promise rejects to nowhere; the service has already committed the stamp. - Next request sees
bootstrapped_at IS NOT NULL, short-circuits to “read from local state”, and never re-fans-out. The downstream system (Typesense, search index, cache, audit log) is permanently stale for those rows.
DB rows are correct. The sentinel is set. Downstream is empty. No exception surfaced anywhere. The bug only shows up when a user later searches and gets zero results — by which point logs from the original Redis blip are long gone.
The Solution: Make the Emit Awaitable
Use EventEmitter2.emitAsync and gate the sentinel write on its resolution.
The contract becomes: the stamp only lands if every listener’s returned
promise resolved. Any rejection anywhere leaves the stamp un-set, so the
next call retries the whole bootstrap.
Step 1: Publisher exposes an *Async variant
@Injectable()
export class ContactEventPublisher {
constructor(private readonly eventEmitter: EventEmitter2) {}
// Sync fire-and-forget — fine for hot paths where caller doesn't care
publishContactUpserted(userId: number, email: string): void {
this.eventEmitter.emit("contact.upserted", { userId, email });
}
// Awaited — caller blocks until every listener's returned promise resolves
async publishContactsBulkUpsertedAsync(
userId: number,
emails: string[]
): Promise<void> {
await this.eventEmitter.emitAsync("contact.bulk.upserted", {
userId,
emails
});
}
} emitAsync returns Promise<unknown[]> — the resolved array of every
listener’s return value. Reject anywhere in any listener and the whole
thing rejects.
Step 2: Caller awaits the emit, then stamps
async refreshFromGoogle(integration: Integration): Promise<void> {
const contacts = await this.googlePeople.getContacts(integration.id);
if (contacts.length > 0) {
await this.userContactsService.bulkUpsert(integration.userId, contacts);
// Awaited — if listener's queue.add rejects, this rejects, stamp below
// never runs, the integration's contactsSyncedAt stays NULL, next call
// retries the whole bootstrap.
await this.contactEventPublisher.publishContactsBulkUpsertedAsync(
integration.userId,
integration.id,
contacts.map(c => c.email),
);
}
// Only lands if emit succeeded — gates the sentinel on enqueue admission
await this.integrationRepo.update(
{ id: integration.id },
{ contactsSyncedAt: new Date() },
);
} Step 3: Listener does its work inside an awaited handler
@OnEvent('contact.bulk.upserted')
async onBulkUpserted(evt: ContactsBulkUpsertedEvent): Promise<void> {
if (evt.emails.length === 0) return;
await this.queue.addBulk(/* N chunked jobs */);
} Returning the promise (or awaiting inside an async handler) is what
makes emitAsync actually wait. A handler that swallows the promise
(void return, fire-and-forget inside) defeats the gating.
Key Points
- Gating contract: sentinel writes only land if
await emitAsyncresolves. Any rejection in any listener leaves the sentinel un-set → next call retries. - Idempotency is mandatory. The retry must be safe —
bulkUpsertneedsON CONFLICT DO UPDATE, downstream upserts needaction: 'upsert'. Without idempotency, retry creates duplicates instead of healing the gap. - Naming convention.
*Asyncsuffix on the publisher method signals callers mustawait. SyncpublishXand asyncpublishXAsynccan coexist on the same publisher — different call sites care about different things. - Empty payload edge case. Skip the emit on empty input (no work to enqueue), but decide deliberately whether to still stamp. Stamping on empty avoids pointless re-fetch when the source genuinely has no data; not-stamping forces a retry that will also fan out nothing. Pick one and document the choice.
Listener-Side Swallowing Breaks the Gate Silently
If any @OnEvent handler is declared async but its body fires queue.add without await, the handler returns undefined immediately
and emitAsync sees a resolved listener even though enqueue is in flight.
Worse: a post-resolve rejection becomes an unhandled rejection with no
visible error on the gating path.
The fix is mechanical — every awaited-emit handler must return await (or just await) the work it depends on. Pin the contract with a unit
test that mocks queue.add to reject and asserts the publisher’s *Async method rejects.
Multiple Listeners on the Same Topic
emitAsync waits for ALL listeners. If you add a downstream consumer
(e.g., analytics) that does slow IO inside the same handler, every
bootstrap call now blocks on it. Either move slow consumers to a
different topic or fire them on a separate sync emit from inside the
awaited handler so the listener-of-listeners decoupling is explicit.
Wrapping in a TypeORM Transaction Is Usually Not the Answer
A common impulse: “make the DB write + sentinel atomic in a txn.” For
the bootstrap case there’s typically only one DB write before the emit,
so the txn wraps a single statement (already atomic via Postgres
autocommit). The real bug is sequencing (sentinel between writes), not
atomicity. Adding a txn doesn’t fix the gating problem and adds a
long-running transaction holding row locks across an await to a queue
server — bad practice.
When to Use
- One-shot bootstrap paths where a sentinel decides whether to re-fan-out (TTL stamps, “first sync done” flags, materialized-view refresh markers).
- Any flow where the upstream side-effect (DB write) must precede the downstream side-effect (queue enqueue), but failure of the downstream must prevent declaring “done”.
- Multi-listener fan-outs where all listeners must succeed before the caller can claim the operation completed.
When NOT to Use
- Hot-path single-emit fire-and-forget — keep
emitfor low-latency cases where the listener failure is recoverable by other means (e.g., per-row retry from a poll loop). - Cases where the listener’s work is slow and the caller can’t afford to block on it — split into a fast acknowledgement listener + a separate slow worker.
- When you have no idempotent retry path — gating without idempotency just trades one bug (silent stale state) for another (duplicate rows on retry).
Takeaway
Sync emit + post-emit stamp is one of those patterns that looks correct
in code review and survives every happy-path test. The bug only appears
when an external dependency blips at exactly the wrong moment. emitAsync plus listener await plus stamp-after-emit composes a gate that turns
that silent failure into a clean retry — provided your bootstrap path is
idempotent, which it should already be.