On this page
멱등 부트스트랩 재시도를 위한 emitAsync 스탬프 게이팅
큐로 sync emit 후 "완료" 스탬프를 찍는 부트스트랩은 Redis가 잠깐 끊기면 다운스트림이 조용히 비어요. emitAsync가 enqueue 승인에 스탬프를 게이트해줘요.
ContactCacheService.refreshFromGoogle이 Postgres에 contacts를 쓰고,
이메일별 이벤트를 큐 리스너로 fire-and-forget 했고, 그 다음에 contactsSyncedAt = now()를 스탬프했어요. 코드가 맞아 보였어요. 그런데
Redis가 잠깐 끊겼을 때 리스너의 queue.add가 허공으로 reject 됐어요 —
emit이 sync fire-and-forget이라 promise가 detached 됐거든요 — 그리고
스탬프는 그대로 떨어졌어요. 다음 호출은 스탬프에서 short-circuit해서
다시 fan-out하지 않았고, 다운스트림 Typesense 인덱스는 비어 있는 채로
남았어요. 사용자들이 검색하면 zero results였는데 어디에도 예외가
표면화되지 않았어요.
조용한 실패가 펼쳐지는 방식
큐로 fan-out하고 그 다음에 “완료” sentinel을 쓰는 부트스트랩 경로는 emit이 sync fire-and-forget일 때 silent failure mode를 가져요:
- 서비스가 DB에 행을 써요.
- 서비스가 행마다
eventEmitter.emit('topic', evt)를 루프 돌려요.emit은 sync고 리스너 return을 await하지 않아요. 리스너는await queue.add(...)하는데 — 그 promise가 detached 돼요. - 서비스가
bootstrapped_at = now()를 써요. - 리스너의
queue.add가 reject (Redis 도달 불가). Promise가 어디로도 reject되지 않고, 서비스는 이미 스탬프를 commit 했어요. - 다음 요청이
bootstrapped_at IS NOT NULL을 보고 “로컬 상태에서 읽기”로 short-circuit해서 다시 fan-out하지 않아요. 다운스트림 시스템(Typesense, 검색 인덱스, 캐시, audit log)이 그 행들에 대해 영구적으로 stale 해져요.
DB 행은 맞아요. Sentinel은 set돼 있어요. 다운스트림은 비어 있어요. 어디에도 예외가 표면화 안 됐어요. 버그는 사용자가 나중에 검색해서 zero results를 받을 때만 보여요 — 그때쯤 원래 Redis 끊김의 로그는 이미 사라진 지 오래예요.
해결: emit을 awaitable로 만들기
EventEmitter2.emitAsync를 쓰고 sentinel 쓰기를 그 resolution에 게이트
해요. 계약이 이렇게 돼요: 모든 리스너의 returned promise가 resolve된
경우에만 스탬프가 떨어져요. 어디서든 reject되면 스탬프가 un-set로 남아서
다음 호출이 부트스트랩 전체를 재시도해요.
1단계: Publisher가 *Async variant를 노출
@Injectable()
export class ContactEventPublisher {
constructor(private readonly eventEmitter: EventEmitter2) {}
// Sync fire-and-forget — 호출자가 신경 쓰지 않는 핫패스에서 OK
publishContactUpserted(userId: number, email: string): void {
this.eventEmitter.emit("contact.upserted", { userId, email });
}
// Awaited — 모든 리스너의 returned promise가 resolve할 때까지 호출자가 block
async publishContactsBulkUpsertedAsync(
userId: number,
emails: string[]
): Promise<void> {
await this.eventEmitter.emitAsync("contact.bulk.upserted", {
userId,
emails
});
}
} emitAsync는 Promise<unknown[]>을 반환해요 — 모든 리스너의 return value의
resolved array. 어떤 리스너든 어디서든 reject하면 전체가 reject 돼요.
2단계: 호출자가 emit을 await하고 그 다음 스탬프
async refreshFromGoogle(integration: Integration): Promise<void> {
const contacts = await this.googlePeople.getContacts(integration.id);
if (contacts.length > 0) {
await this.userContactsService.bulkUpsert(integration.userId, contacts);
// Awaited — 리스너의 queue.add가 reject하면 이게 reject되고, 아래 스탬프가
// 안 떨어지고, integration의 contactsSyncedAt이 NULL로 남아서 다음 호출이
// 부트스트랩 전체를 재시도.
await this.contactEventPublisher.publishContactsBulkUpsertedAsync(
integration.userId,
integration.id,
contacts.map(c => c.email),
);
}
// emit 성공일 때만 떨어짐 — enqueue 승인에 sentinel 게이트
await this.integrationRepo.update(
{ id: integration.id },
{ contactsSyncedAt: new Date() },
);
} 3단계: 리스너가 awaited handler 안에서 일을 함
@OnEvent('contact.bulk.upserted')
async onBulkUpserted(evt: ContactsBulkUpsertedEvent): Promise<void> {
if (evt.emails.length === 0) return;
await this.queue.addBulk(/* N chunked jobs */);
} Promise를 return하거나 (async handler 안에서 await하거나) 하는 게 emitAsync가 실제로 기다리게 만드는 거예요. Promise를 swallow하는 handler
(void return, fire-and-forget inside)는 게이팅을 무력화해요.
핵심 포인트
- 게이팅 계약: sentinel 쓰기는
await emitAsync가 resolve할 때만 떨어져요. 어떤 리스너에서든 reject되면 sentinel이 un-set로 남아 → 다음 호출이 재시도. - 멱등성은 필수. 재시도가 안전해야 해요 —
bulkUpsert는ON CONFLICT DO UPDATE가 필요하고, 다운스트림 upsert는action: 'upsert'가 필요해요. 멱등성 없으면 재시도가 갭을 치유하는 대신 중복을 만들어요. - 네이밍 컨벤션. Publisher 메서드의
*Async접미사는 호출자가await해야 한다고 신호를 줘요. SyncpublishX와 asyncpublishXAsync는 같은 publisher에 공존할 수 있어요 — 다른 호출 사이트는 다른 걸 신경 써요. - 빈 페이로드 엣지 케이스. 빈 입력에는 emit을 스킵하지만(enqueue할 일 없음), 그래도 스탬프할지를 의도적으로 결정해요. 빈 입력에 스탬프하면 source가 정말 데이터 없을 때 무의미한 re-fetch를 피할 수 있고, 스탬프 안 하면 fan-out도 없는 재시도를 강제해요. 하나 골라서 결정을 문서화해요.
리스너 측 swallow가 게이트를 조용히 깨요
어떤 @OnEvent handler가 async로 선언됐지만 body가 queue.add를 await 없이 실행하면, handler는 즉시 undefined를 return하고 emitAsync는
enqueue가 진행 중인데도 resolved listener를 봐요. 더 나쁜 건: post-resolve
rejection이 게이팅 경로에 visible error 없이 unhandled rejection이 돼요.
수정은 mechanical — 모든 awaited-emit handler는 의존하는 작업을 return await (또는 그냥 await)해야 해요. queue.add가 reject하도록 mock하고
publisher의 *Async 메서드가 reject하는 걸 assert하는 unit test로 계약을
못 박아요.
같은 토픽의 여러 리스너
emitAsync는 모든 리스너를 기다려요. 같은 handler 안에서 느린 IO를 하는
다운스트림 컨슈머(예: analytics)를 추가하면, 모든 부트스트랩 호출이 그것에
block돼요. 느린 컨슈머를 다른 토픽으로 옮기거나, awaited handler 안에서
별도 sync emit으로 발사해서 리스너-of-리스너 decoupling을 명시적으로
만들어요.
TypeORM 트랜잭션으로 감싸는 건 보통 답이 아니에요
흔한 충동: “DB 쓰기 + sentinel을 txn에서 atomic하게 만들자.” 부트스트랩
케이스에서는 보통 emit 전에 DB 쓰기가 하나뿐이라 txn이 단일 statement를
감싸요(이미 Postgres autocommit으로 atomic). 진짜 버그는 sequencing(쓰기들
사이의 sentinel)이지 atomicity가 아니에요. Txn 추가는 게이팅 문제를 안
고치고 큐 서버로의 await를 가로질러 row lock을 잡는 long-running
transaction을 추가해요 — bad practice.
언제 사용할까
- Sentinel이 다시 fan-out할지 결정하는 one-shot 부트스트랩 경로 (TTL 스탬프, “first sync done” 플래그, materialized-view refresh marker).
- 업스트림 부작용(DB 쓰기)이 다운스트림 부작용(큐 enqueue)보다 먼저 와야 하지만, 다운스트림의 실패가 “완료” 선언을 막아야 하는 모든 흐름.
- 호출자가 작업 완료를 주장하기 전에 모든 리스너가 성공해야 하는 multi-listener fan-out.
언제 사용하지 말까
- 핫패스 single-emit fire-and-forget — 리스너 실패가 다른 수단(예: 폴링
루프에서 row별 재시도)으로 복구 가능한 저레이턴시 케이스에는
emit을 유지해요. - 리스너 작업이 느리고 호출자가 거기서 block될 여유가 없는 케이스 — 빠른 acknowledgement 리스너 + 별도 느린 worker로 분할해요.
- 멱등 재시도 경로가 없을 때 — 멱등성 없는 게이팅은 한 버그(silent stale 상태)를 다른 버그(재시도시 중복 행)로 바꾸는 것뿐이에요.
정리
Sync emit + post-emit 스탬프는 코드 리뷰에서 맞아 보이고 모든 happy-path
테스트를 통과하는 패턴이에요. 버그는 외부 의존성이 정확히 잘못된 순간에
끊길 때만 나타나요. emitAsync + 리스너 await + emit 후 스탬프가 그
silent failure를 깨끗한 재시도로 바꿔주는 게이트를 구성해요 — 부트스트랩
경로가 멱등하다는 전제 하에요(이미 그래야 하긴 하죠).