08 — Event Bus¶
As of 2026-05-28.
TL;DR¶
NATS is the platform's only event bus. Not Kafka — despite what docs/architecture.md shows. Subjects follow <domain>.<action>[.<discriminator>] (e.g. trades.executed, orders.cancelled, custody.deposit.confirmed). RPC (request-reply) and pub/sub use the same broker; idempotency is enforced downstream on ledger_entries.transactionId UNIQUE, not at the bus.
Why NATS, not Kafka¶
The original delivery-plan diagram (01-architecture.md; docs/architecture.md) shows Kafka/Redpanda. The platform shipped with NATS instead. Three reasons:
- Single protocol covers both call shapes. The trading hot path needs RPC (api-gateway → order-router → ledger). NATS's
request/replyis first-class — no separate gRPC mesh or RabbitMQ pattern needed. Kafka has no native request-reply. - Simpler ops at our scale. No ZooKeeper / KRaft cluster, no broker rebalancing, no compacted-topic surprises. A single
natscontainer indocker-compose.yml. - JetStream is available if we need persistence later. The
@quantatrade/natspackage already exports ajetstream.ts(packages/nats/src/jetstream.ts) — durable consumers can be added subject-by-subject without changing publishers. Today, nothing onmainuses JetStream; everything is core NATS (at-most-once if the consumer is down, at-least-once if the publisher retries).
If we hit Kafka-shaped requirements (ordered partitions per-symbol for replay, multi-day retention with seek), we revisit. Nothing today needs that.
Subject naming convention¶
Documented in packages/nats/src/subjects.ts:1-10:
Use dots for hierarchy. Wildcards:
*matches one token,>matches multiple trailing tokens.
The actual pattern in use is:
| Pattern | Example | Discriminator |
|---|---|---|
<domain>.<action> |
trades.executed, ledger.credit, risk.check.failed |
none — broadcast / RPC default |
<domain>.<action>.<symbol> |
trades.executed.BTC-USDT, marketdata.ticker.ETH-USD |
trading pair |
<domain>.<action>.<userId> |
balances.updated.usr_abc123, orders.created.usr_abc123 |
per-user fan-out |
<venue>.<domain>.<action>.<symbol> |
venue.binance.bookTicker.BTC-USDT, venue.binance.trade.BTC-USDT |
external venue source |
<domain>.<workflow>.<step> |
custody.deposit.confirmed, custody.withdrawal.signed, custody.withdrawal.broadcast |
workflow step |
Discriminator suffixes let subscribers filter at the broker (subscribe('trades.executed.BTC-USDT') instead of subscribing to everything and filtering in app code).
The canonical builders live in packages/nats/src/subjects.ts:
- Subjects.TRADES.byMarket(symbol) → trades.executed.<symbol> (line 28)
- Subjects.BALANCES.byUser(userId) → balances.updated.<userId> (line 45)
- Subjects.ORDERS.byMarket(event, symbol) → orders.<event>.<symbol> (line 21)
- Subjects.MARKETDATA.byMarket(type, symbol) → marketdata.<type>.<symbol> (line 38)
Inventory¶
Subjects observed in code on platform/main (plus PR#6's venue subjects, marked separately). All subjects are JSON-encoded via JSONCodec (packages/nats/src/publisher.ts:4). Idempotency-key column shows the field consumers should use to dedupe; "n/a" means the message is intrinsically idempotent (state-event broadcast) or no dedup discipline exists today.
Trading core¶
| Subject | Publisher | Subscribers | Mode | Payload (key fields) | Idempotency key |
|---|---|---|---|---|---|
orders.submit |
api-gateway orders.service.ts:61 |
order-router main.ts:531 (queue order-router) |
RPC | {userId, order: NewOrderRequest} → {success, order?, error?} |
none (client-driven; clientOrderId if present) |
orders.cancel |
api-gateway orders.service.ts:161,230 |
order-router main.ts:661 (queue order-router) |
RPC | {userId, orderId, symbol} → {success, error?} |
n/a (operation is idempotent server-side: cancelling a missing order returns {success:false, error:'Order not found'}) |
orders.created |
order-router main.ts:621 |
(ws-gateway expects orders.created.> main.ts:49 — see gap below) |
pub/sub | {type:'order.created', order, timestamp} |
n/a |
orders.cancelled |
order-router main.ts:693,981 |
none on main |
pub/sub | {type:'order.cancelled', orderId, userId?, timestamp} |
n/a |
orders.filled |
order-router main.ts:947 |
none on main |
pub/sub | {type:'order.filled', orderId, timestamp} |
n/a |
orders.rejected |
order-router main.ts:627,1008 |
none on main |
pub/sub | {type:'order.rejected', order?, reason, rejectedVolume?, timestamp} |
n/a |
orders.liquidation |
risk-service liquidation-engine.ts:224 |
none on main (intended: order-router) |
pub/sub | liquidation order spec | n/a |
trades.executed (consolidated) |
order-router main.ts:920 |
ledger-service main.ts:170; risk-service main.ts:88 |
pub/sub | {id, buyerId, sellerId, symbol, price, quantity, buyerFee, sellerFee} |
id (trade ID) — ledger uses it inside the db.$transaction to keep settlement atomic |
trades.executed.<SYM> |
order-router main.ts:899,906 |
(ws-gateway expects marketdata.trade.*, not this — see gap below) |
pub/sub | per-side {type:'trade.executed', trade, timestamp} |
n/a |
trades.settled |
ledger-service activities/trade-settlement-activities.ts:54 |
none on main (informational) |
pub/sub | {tradeId, settledAt} |
n/a |
risk.check.failed |
order-router main.ts:543 |
none on main |
pub/sub | {userId, order, reason} |
n/a |
Ledger¶
| Subject | Publisher | Subscribers | Mode | Payload | Idempotency key |
|---|---|---|---|---|---|
ledger.balances.get |
order-router risk-checker.ts:93 |
ledger-service main.ts:59 (queue ledger-service) |
RPC | {userId, asset?} → {balances: [{asset, available, locked}]} |
n/a (read) |
ledger.credit |
order-router main.ts:753 |
ledger-service main.ts:84 (queue ledger-service) |
RPC | {transactionId, userId, asset, amount, entryType, referenceType, referenceId} → {success, newBalance?, error?} |
transactionId — enforced by ledger_entries.transactionId UNIQUE |
ledger.debit |
(no caller on main) |
ledger-service main.ts:121 |
RPC | same shape as credit | transactionId |
ledger.lock |
(no caller on main) |
ledger-service main.ts:151 |
RPC | {userId, asset, amount, referenceId} → {success, error?} |
referenceId (advisory; not unique-constrained) |
ledger.unlock |
(no caller on main) |
ledger-service main.ts:163 |
RPC | same as lock | referenceId |
balances.updated.<userId> |
ledger-service main.ts:97,134 |
ws-gateway main.ts:58 |
pub/sub | {asset, available} |
n/a |
Custody (BitGo workflows)¶
Subjects defined in packages/nats/src/subjects.ts:48-78. Today most have no in-repo publisher — the BitGo-side custody-service is in a separate repo. The order-router is the only platform-side subscriber.
| Subject | Publisher (intended) | Subscriber | Mode | Payload | Idempotency key |
|---|---|---|---|---|---|
custody.deposit.detected |
custody-service (external) | none on main |
pub/sub | per-tx | txHash |
custody.deposit.confirmed |
custody-service (external) | order-router main.ts:725 |
pub/sub | {txHash, userId, asset, amount} |
txHash — order-router uses deposit:${txHash} as the ledger transactionId (main.ts:754) |
custody.deposit.reorged |
custody-service | none | pub/sub | per-tx | txHash |
custody.withdrawal.{initiated,policy_checked,approval_required,approved,signed,broadcast,confirmed,fee_bumped,submitted,completed,failed,reorged} |
custody-service | none on main |
pub/sub | workflow event | withdrawalId |
custody.withdrawal.requested |
api-gateway (intended) | order-router main.ts:785 |
pub/sub | {userId, asset, amount} |
none (no dedup today — gap) |
custody.user.addresses.assign |
api-gateway custody.service.ts:100 |
custody-service (external) | RPC | {userId} → AddressAssignmentResult |
userId (one-shot per user) |
custody.user.addresses.get |
api-gateway custody.service.ts:139 |
custody-service | RPC | {userId} → GetUserAddressesResult |
n/a (read) |
custody.address.get |
api-gateway custody.service.ts:178 |
custody-service | RPC | {userId, asset, network?} → GetDepositAddressResult |
n/a (read) |
custody.tracker.alert, custody.reorg.detected, custody.alerts, alerts.critical |
custody-service | (alerting sink — external) | pub/sub | alert payload | n/a |
Market data¶
| Subject | Publisher | Subscribers | Mode | Payload | Idempotency key |
|---|---|---|---|---|---|
marketdata.ticker.<SYM> |
(no publisher on main — gap) |
ws-gateway main.ts:21 |
pub/sub | ticker snapshot | n/a |
marketdata.orderbook.<SYM> |
(no publisher on main — gap) |
ws-gateway main.ts:30 |
pub/sub | L2 snapshot/diff | n/a |
marketdata.trade.<SYM> |
(no publisher on main — gap) |
ws-gateway main.ts:39 |
pub/sub | per-trade | n/a |
marketdata.candle.<SYM> |
(defined, unused) | none | pub/sub | OHLCV | n/a |
Venue adapter (PR#6, branch feat/venue-adapter-binance-scaffold)¶
Subjects published in quantatrade-binance/services/venue-adapter-binance/src/main.ts:19-25. The order-router does not yet subscribe on main — that's the next PR.
| Subject | Publisher | Subscribers (planned) | Mode | Payload | Idempotency key |
|---|---|---|---|---|---|
venue.binance.bookTicker.<SYM> |
venue-adapter-binance main.ts:101 |
order-router → RiskChecker (planned) |
pub/sub | {venue, symbol, bidPrice, bidQty, askPrice, askQty, timestamp} |
n/a |
venue.binance.trade.<SYM> |
venue-adapter-binance main.ts:114 |
order-router → RiskChecker.setVenuePrice (planned) |
pub/sub | {venue, symbol, price, quantity, tradeId, timestamp, buyerIsMaker} |
tradeId (Binance-side) |
Wildcard pattern for the order-router subscribe will be venue.*.trade.* — see the Venue adapter subscription model section below.
Margin / liquidation (M8 prep)¶
| Subject | Publisher | Subscribers | Mode | Payload | Idempotency key |
|---|---|---|---|---|---|
margin.position.opened |
(no caller on main) |
none | pub/sub | position state | positionId |
margin.position.updated |
(no caller on main) |
risk-service main.ts:55 |
pub/sub | position state with liquidationPrice |
n/a |
margin.position.closed |
risk-service liquidation-engine.ts:88 |
none on main |
pub/sub | close event | positionId |
margin.liquidation.warning |
risk-service main.ts:113,293 |
none on main |
pub/sub | {userId, marginAccountId, marginLevel, equity, debt, ...} |
n/a |
margin.liquidation.triggered |
(defined, no caller) | none | pub/sub | liquidation summary | positionId |
margin.loan.{created,repaid} |
(defined, no caller) | none | pub/sub | loan event | loanId |
Misc¶
| Subject | Publisher | Subscriber | Mode | Notes |
|---|---|---|---|---|
commands.order.cancel |
risk-service liquidation-engine.ts:59 |
(intended: order-router) | pub/sub | imperative command, not an event |
commands.margin.liquidate |
(defined, no caller) | none | pub/sub | imperative |
notifications.send |
risk-service liquidation-engine.ts:102 |
(intended: notification sink — external) | pub/sub | {userId, type, payload} |
staking.{staked,unstake.requested,unstaked,rewards.distributed} |
(defined, no caller) | none | pub/sub | M5 staking events |
kyc.{verification.started,verification.completed,level.upgraded} |
(defined, no caller — commented out at kyc.service.ts:352) |
none | pub/sub | M4 KYC events |
fiat.{deposit,withdrawal}.* |
(defined, no caller) | none | pub/sub | FitBank/Pix integration scaffold |
RPC pattern¶
// packages/nats/src/publisher.ts:107
async request<TReq, TRes>(subject: string, data: TReq, timeoutMs = 5000): Promise<TRes>
Default timeout: 5000ms. Every call site that doesn't override gets 5s. Observed overrides:
- risk-checker.ts:93 — ledger.balances.get with explicit 5s, fail-open on error.
- All api-gateway → backend RPCs use the default.
Behaviour on timeout: throws NatsError (timeout). Two failure modes are in use across the codebase:
| Call site | On timeout | Rationale |
|---|---|---|
risk-checker.ts:87-104 (ledger.balances.get) |
fail-open — log warning, return null, skip the balance check. |
Matching engine enforces balance authoritatively; fail-closed would block trading whenever the ledger blinks. |
order-router/main.ts:742-775 (ledger.credit after deposit) |
log and continue — deposit has already settled in the engine. Alert fires. | Engine is source of truth; refusing the deposit in the engine because the audit mirror is slow is worse. |
api-gateway/orders.service.ts:57 (orders.submit) |
fail-closed — propagates to the client as an error. | Client must know whether the order was placed. |
api-gateway/custody.service.ts:96-180 (custody RPCs) |
fail-soft — log + return empty/error. | Custody is a side channel for address assignment; user can retry. |
Circuit breaker (packages/nats/src/publisher.ts:14-76) trips after 5 failures within 30s, blocks new requests for 30s, then half-opens for one probe. Throws CircuitBreakerOpenError while open. Status visible via publisher.getCircuitBreakerStatus() — not currently exposed as a metric (gap).
Idempotency at the bus¶
NATS provides at-least-once delivery for queue groups and at-most-once for raw subscribers. Neither is exactly-once. Consumers must dedupe.
The reference pattern: ledger_entries.transactionId UNIQUE¶
// services/ledger-service/src/ledger.ts:85-90
const existing = await db.ledgerEntry.findFirst({ where: { transactionId } });
if (existing) {
return { success: true, newBalance: existing.balance.toString() };
}
The transactionId field is UNIQUE in Prisma. The check-then-insert is wrapped in a db.$transaction, so even a TOCTOU race resolves: the second writer hits the unique constraint and Prisma throws — which is caught and falls back to the existing branch on retry.
This is the production-grade idempotency mechanism in the platform. Every cross-service write that mutates balances goes through it:
| Caller | transactionId shape |
Source |
|---|---|---|
| Deposit mirror | deposit:${txHash} |
order-router main.ts:754 |
| Trade settlement | trade ID from order-router | ledger settleTrade() generates internally via generateTransactionId() |
| Future: withdrawal mirror | withdrawal:${withdrawalId} |
not yet on main |
What's NOT deduped at the bus¶
custody.withdrawal.requestedhas no idempotency key on the payload. A duplicate publish could double-reserve. Withdrawal flows are still gated behind admin approval today so this isn't yet causing harm — but it's a real gap once auto-withdrawal lands.orders.submituses the matching engine's internalorderId(numeric, monotonic via Redis INCR) for uniqueness. IfclientOrderIdis supplied by the user, it is stored but not enforced unique (noUNIQUEconstraint onorders.clientOrderIdper-user).trades.executedper-market and consolidated subjects fire twice (taker + maker) per fill. Subscribers must be designed for both — the ledger only acts on the consolidatedtrades.executedsubject for this reason.
Venue adapter subscription model (PR#6)¶
Publisher: venue-adapter-binance emits two subjects per symbol — venue.binance.bookTicker.<SYM> and venue.binance.trade.<SYM>.
Planned subscriber: order-router will subscribe to venue.*.trade.* (wildcard across venues and symbols) and call riskChecker.setVenuePrice(venue, symbol, price). This is not yet wired on main — risk-checker.ts only has setLastPrice (internal-trade-driven). The PR#6 follow-up adds the wildcard subscription in order-router/main.ts and the venue-price store in risk-checker.ts.
Internal-wins precedence (design, to be implemented in the follow-up):
effective_reference_price(symbol) =
internal_last_trade_price(symbol) if exists and < N seconds old
else venue_last_trade_price(symbol, venue) for first venue with fresh data
else null → reject market orders, skip fat-finger check
The rationale: our own book is authoritative for our pricing once it has activity; external venues are seed data and fallback. If the internal book hasn't traded recently, Binance fills the gap so a brand-new market can still accept market orders with a slippage guard.
Cross-link: 04-risk-controls.md covers the RiskChecker semantics this feeds into.
What looks like an event but isn't on NATS¶
- Matching-engine internal events — the Java engine uses LMAX Disruptor's ring buffer internally for order-book mutation events. Those never leave the engine; what crosses the boundary is the gRPC response on
placeOrder/cancelOrderand the WebSocket/ws/eventsstream the order-router consumes. The Disruptor events are invisible to the platform. - WS-gateway → client streams — WebSocket frames over
ws://.../?token=..., not NATS. ws-gateway is a NATS subscriber that fans out to its in-memory channel map (services/ws-gateway/src/connections.ts) and writes to client sockets. The channel name (ticker:BTC-USD) is a ws-gateway construct, not a subject. - Temporal workflows — ledger-service runs a Temporal worker (
services/ledger-service/src/worker.ts) for retry-safe trade settlement. Workflows talk to Temporal, not NATS; activities can still publish NATS events as a side effect (trades.settledis one —activities/trade-settlement-activities.ts:54). - Engine REST + gRPC — order-router fetches symbol specs via REST (
MATCHING_ENGINE_URL/api/v1/markets) at startup, and places orders via gRPC. Neither is on the bus. - api-gateway HTTP → matching engine — accounts/balances/markets read paths sometimes hit the engine REST directly using shared-secret headers (
x-api-key,x-api-secret,x-participant-type).
Known gaps and future-hardening¶
| Gap | Impact | Fix |
|---|---|---|
| No dead-letter queue | A consumer that throws on a poison message logs and moves on (packages/nats/src/subscriber.ts:43). The message is acked-by-default. |
JetStream durable consumer + manual ack, or a *.dlq mirror subject. |
| No schema registry | Payloads are TypeScript types in @quantatrade/types. A publisher and subscriber on different versions can silently disagree. |
Protobuf or JSON Schema in @quantatrade/types/contracts/. |
| Subject-mismatch between order-router output and ws-gateway expectations | ws-gateway listens on marketdata.trade.*, order-router publishes on trades.executed.<SYM>. End-user real-time trade fan-out doesn't reach the WS today. |
Either map in ws-gateway, or have order-router also publish on marketdata.trade.<SYM>. |
No producer for marketdata.{ticker,orderbook}.* |
ws-gateway has subscribers but no engine-side publisher on main. |
Wire engine WS events that aren't trades to marketdata.*, or publish from order-router. |
No idempotency on custody.withdrawal.requested |
Possible double-reserve in matching engine if the message is replayed. | Add a withdrawalId field and dedupe at order-router. |
Circuit breaker state not in /metrics |
Hard to alert on a tripped breaker. | Export publisher.getCircuitBreakerStatus() via @quantatrade/metrics. |
Publisher/subscriber singletons (getPublisher/getSubscriber) ignore reconnect state |
If NATS drops and reconnects, the existing publisher reference is still valid but in-flight requests during the drop fail with timeout. | Acceptable today — the underlying nats client handles reconnect — but worth documenting per call site. |
Reading the inventory¶
Conventions used in the tables above:
- Publisher — file:line of the call that emits the message. If "external" or "no caller on
main", the subject is defined insubjects.tsbut has no in-repo publisher yet (typically because the producing service lives in another repo, or the feature is M-future). - Subscribers — every consumer of the subject as of
platform/main. "QueueX" means the subscriber joined queue groupX, so multiple replicas share load; without a queue, every subscriber gets every message. - Mode —
RPCforrequest/reply(one consumer wins, response returns),pub/subfor fire-and-forget broadcast. - Payload — typed in
@quantatrade/typesor inferred from the publisher call site. There is no schema registry; the contract is whatever both sides agree on in TypeScript.
A handy grep recipe for finding all subjects exercised in code:
grep -rnE "subscribe\(|subscribeService\(|publish\(|publisher\.request" services/ packages/ \
| grep -v ".test." | grep -v node_modules
This is how the inventory tables were built. Subjects that appear only in packages/nats/src/subjects.ts and nowhere else in the code are listed in the inventory as "no caller on main" — they're reserved namespaces, not active channels.
Operational notes¶
- Single NATS broker in dev/prod compose. No cluster, no HA. Acceptable for current scale; document upgrade path before going multi-AZ.
- All subjects are subject to NATS's 1MB default
max_payload. Trade-event payloads are <1KB; no current concern. - Queue groups in use:
order-router(orders.submit, orders.cancel),ledger-service(all fiveledger.*RPCs),risk-service(all threerisk.*RPCs). This allows horizontal scaling — multiple order-router instances would share the queue and each message goes to exactly one.
Lifecycle: a deposit, traced through the bus¶
To make the patterns above concrete, here's a deposit-confirmed event walking through the system end-to-end.
%%{init: {'theme':'base','themeVariables':{'background':'#ffffff','primaryColor':'#ddf4ff','primaryBorderColor':'#0969da','primaryTextColor':'#0a0a0a','lineColor':'#1f2328','secondaryColor':'#fff8c5','tertiaryColor':'#dafbe1','clusterBkg':'#f6f8fa','clusterBorder':'#d0d7de'}}}%%
sequenceDiagram
autonumber
participant BG as BitGo<br/>(external)
participant CUST as custody-service<br/>(out-of-repo)
participant OR as order-router
participant ME as exchange-core<br/>(Java engine)
participant LED as ledger-service
participant PG as Postgres
participant WSGW as ws-gateway
participant CLI as Client
BG->>CUST: webhook deposit confirmed
CUST->>OR: NATS pub custody.deposit.confirmed<br/>{txHash, userId, asset, amount}
OR->>ME: gRPC deposit(currency, scaled-amount)
ME-->>OR: ok
OR->>LED: NATS RPC ledger.credit<br/>{transactionId: "deposit:<txHash>", ...}
Note over LED: idempotent on transactionId<br/>UNIQUE in ledger_entries
LED->>PG: db.$transaction(account++, ledger_entry insert)
LED-->>OR: {success: true, newBalance}
LED->>WSGW: NATS pub balances.updated.<userId>
WSGW->>CLI: WS push balance.updated
Three things to notice:
1. The engine is updated before the ledger. If the ledger call fails, the engine has already credited the user — that's by design (engine is the authority; ledger is the audit mirror).
2. The transactionId is deterministic: deposit:${txHash}. Replaying the same NATS message produces zero side-effects in the ledger because the UNIQUE constraint short-circuits at ledger.ts:85-90.
3. The downstream balances.updated.<userId> is a side-effect of a successful credit (ledger-service/main.ts:97). If the user is connected to ws-gateway, they see the balance change in <100ms; if not, the next GET /accounts/balances returns the new value.
A trade settlement follows the same pattern with one structural difference: the transactionId is generated inside ledger.settleTrade() (ledger.ts:339 via generateTransactionId()), not supplied by the caller, because the consolidated trades.executed subject has multiple settlements per trade ID (buyer + seller + fees in six legs). The whole settlement is one Postgres transaction, so the six legs share fate.
When to update this doc¶
- Adding a new subject in
packages/nats/src/subjects.ts→ row in the relevant section, file:line citation, idempotency note. - Adding JetStream usage → new section on durable consumers.
- Removing or renaming a subject → mark it deprecated for at least one release before deletion; update consumer code first.
- A new service starts publishing → update both this doc and the service's section in 05-services-reference.md.
- A consumer changes its dedup strategy → update the Idempotency key column.