Skip to content

08 — Event Bus

As of 2026-05-28.

TL;DR

NATS is the platform's only event bus. Not Kafka — despite what docs/architecture.md shows. Subjects follow <domain>.<action>[.<discriminator>] (e.g. trades.executed, orders.cancelled, custody.deposit.confirmed). RPC (request-reply) and pub/sub use the same broker; idempotency is enforced downstream on ledger_entries.transactionId UNIQUE, not at the bus.

Why NATS, not Kafka

The original delivery-plan diagram (01-architecture.md; docs/architecture.md) shows Kafka/Redpanda. The platform shipped with NATS instead. Three reasons:

  1. Single protocol covers both call shapes. The trading hot path needs RPC (api-gateway → order-router → ledger). NATS's request/reply is first-class — no separate gRPC mesh or RabbitMQ pattern needed. Kafka has no native request-reply.
  2. Simpler ops at our scale. No ZooKeeper / KRaft cluster, no broker rebalancing, no compacted-topic surprises. A single nats container in docker-compose.yml.
  3. JetStream is available if we need persistence later. The @quantatrade/nats package already exports a jetstream.ts (packages/nats/src/jetstream.ts) — durable consumers can be added subject-by-subject without changing publishers. Today, nothing on main uses JetStream; everything is core NATS (at-most-once if the consumer is down, at-least-once if the publisher retries).

If we hit Kafka-shaped requirements (ordered partitions per-symbol for replay, multi-day retention with seek), we revisit. Nothing today needs that.

Subject naming convention

Documented in packages/nats/src/subjects.ts:1-10:

Use dots for hierarchy. Wildcards: * matches one token, > matches multiple trailing tokens.

The actual pattern in use is:

<domain>.<action>[.<discriminator>]
Pattern Example Discriminator
<domain>.<action> trades.executed, ledger.credit, risk.check.failed none — broadcast / RPC default
<domain>.<action>.<symbol> trades.executed.BTC-USDT, marketdata.ticker.ETH-USD trading pair
<domain>.<action>.<userId> balances.updated.usr_abc123, orders.created.usr_abc123 per-user fan-out
<venue>.<domain>.<action>.<symbol> venue.binance.bookTicker.BTC-USDT, venue.binance.trade.BTC-USDT external venue source
<domain>.<workflow>.<step> custody.deposit.confirmed, custody.withdrawal.signed, custody.withdrawal.broadcast workflow step

Discriminator suffixes let subscribers filter at the broker (subscribe('trades.executed.BTC-USDT') instead of subscribing to everything and filtering in app code).

The canonical builders live in packages/nats/src/subjects.ts: - Subjects.TRADES.byMarket(symbol)trades.executed.<symbol> (line 28) - Subjects.BALANCES.byUser(userId)balances.updated.<userId> (line 45) - Subjects.ORDERS.byMarket(event, symbol)orders.<event>.<symbol> (line 21) - Subjects.MARKETDATA.byMarket(type, symbol)marketdata.<type>.<symbol> (line 38)

Inventory

Subjects observed in code on platform/main (plus PR#6's venue subjects, marked separately). All subjects are JSON-encoded via JSONCodec (packages/nats/src/publisher.ts:4). Idempotency-key column shows the field consumers should use to dedupe; "n/a" means the message is intrinsically idempotent (state-event broadcast) or no dedup discipline exists today.

Trading core

Subject Publisher Subscribers Mode Payload (key fields) Idempotency key
orders.submit api-gateway orders.service.ts:61 order-router main.ts:531 (queue order-router) RPC {userId, order: NewOrderRequest}{success, order?, error?} none (client-driven; clientOrderId if present)
orders.cancel api-gateway orders.service.ts:161,230 order-router main.ts:661 (queue order-router) RPC {userId, orderId, symbol}{success, error?} n/a (operation is idempotent server-side: cancelling a missing order returns {success:false, error:'Order not found'})
orders.created order-router main.ts:621 (ws-gateway expects orders.created.> main.ts:49 — see gap below) pub/sub {type:'order.created', order, timestamp} n/a
orders.cancelled order-router main.ts:693,981 none on main pub/sub {type:'order.cancelled', orderId, userId?, timestamp} n/a
orders.filled order-router main.ts:947 none on main pub/sub {type:'order.filled', orderId, timestamp} n/a
orders.rejected order-router main.ts:627,1008 none on main pub/sub {type:'order.rejected', order?, reason, rejectedVolume?, timestamp} n/a
orders.liquidation risk-service liquidation-engine.ts:224 none on main (intended: order-router) pub/sub liquidation order spec n/a
trades.executed (consolidated) order-router main.ts:920 ledger-service main.ts:170; risk-service main.ts:88 pub/sub {id, buyerId, sellerId, symbol, price, quantity, buyerFee, sellerFee} id (trade ID) — ledger uses it inside the db.$transaction to keep settlement atomic
trades.executed.<SYM> order-router main.ts:899,906 (ws-gateway expects marketdata.trade.*, not this — see gap below) pub/sub per-side {type:'trade.executed', trade, timestamp} n/a
trades.settled ledger-service activities/trade-settlement-activities.ts:54 none on main (informational) pub/sub {tradeId, settledAt} n/a
risk.check.failed order-router main.ts:543 none on main pub/sub {userId, order, reason} n/a

Ledger

Subject Publisher Subscribers Mode Payload Idempotency key
ledger.balances.get order-router risk-checker.ts:93 ledger-service main.ts:59 (queue ledger-service) RPC {userId, asset?}{balances: [{asset, available, locked}]} n/a (read)
ledger.credit order-router main.ts:753 ledger-service main.ts:84 (queue ledger-service) RPC {transactionId, userId, asset, amount, entryType, referenceType, referenceId}{success, newBalance?, error?} transactionId — enforced by ledger_entries.transactionId UNIQUE
ledger.debit (no caller on main) ledger-service main.ts:121 RPC same shape as credit transactionId
ledger.lock (no caller on main) ledger-service main.ts:151 RPC {userId, asset, amount, referenceId}{success, error?} referenceId (advisory; not unique-constrained)
ledger.unlock (no caller on main) ledger-service main.ts:163 RPC same as lock referenceId
balances.updated.<userId> ledger-service main.ts:97,134 ws-gateway main.ts:58 pub/sub {asset, available} n/a

Custody (BitGo workflows)

Subjects defined in packages/nats/src/subjects.ts:48-78. Today most have no in-repo publisher — the BitGo-side custody-service is in a separate repo. The order-router is the only platform-side subscriber.

Subject Publisher (intended) Subscriber Mode Payload Idempotency key
custody.deposit.detected custody-service (external) none on main pub/sub per-tx txHash
custody.deposit.confirmed custody-service (external) order-router main.ts:725 pub/sub {txHash, userId, asset, amount} txHash — order-router uses deposit:${txHash} as the ledger transactionId (main.ts:754)
custody.deposit.reorged custody-service none pub/sub per-tx txHash
custody.withdrawal.{initiated,policy_checked,approval_required,approved,signed,broadcast,confirmed,fee_bumped,submitted,completed,failed,reorged} custody-service none on main pub/sub workflow event withdrawalId
custody.withdrawal.requested api-gateway (intended) order-router main.ts:785 pub/sub {userId, asset, amount} none (no dedup today — gap)
custody.user.addresses.assign api-gateway custody.service.ts:100 custody-service (external) RPC {userId}AddressAssignmentResult userId (one-shot per user)
custody.user.addresses.get api-gateway custody.service.ts:139 custody-service RPC {userId}GetUserAddressesResult n/a (read)
custody.address.get api-gateway custody.service.ts:178 custody-service RPC {userId, asset, network?}GetDepositAddressResult n/a (read)
custody.tracker.alert, custody.reorg.detected, custody.alerts, alerts.critical custody-service (alerting sink — external) pub/sub alert payload n/a

Market data

Subject Publisher Subscribers Mode Payload Idempotency key
marketdata.ticker.<SYM> (no publisher on main — gap) ws-gateway main.ts:21 pub/sub ticker snapshot n/a
marketdata.orderbook.<SYM> (no publisher on main — gap) ws-gateway main.ts:30 pub/sub L2 snapshot/diff n/a
marketdata.trade.<SYM> (no publisher on main — gap) ws-gateway main.ts:39 pub/sub per-trade n/a
marketdata.candle.<SYM> (defined, unused) none pub/sub OHLCV n/a

Venue adapter (PR#6, branch feat/venue-adapter-binance-scaffold)

Subjects published in quantatrade-binance/services/venue-adapter-binance/src/main.ts:19-25. The order-router does not yet subscribe on main — that's the next PR.

Subject Publisher Subscribers (planned) Mode Payload Idempotency key
venue.binance.bookTicker.<SYM> venue-adapter-binance main.ts:101 order-router → RiskChecker (planned) pub/sub {venue, symbol, bidPrice, bidQty, askPrice, askQty, timestamp} n/a
venue.binance.trade.<SYM> venue-adapter-binance main.ts:114 order-router → RiskChecker.setVenuePrice (planned) pub/sub {venue, symbol, price, quantity, tradeId, timestamp, buyerIsMaker} tradeId (Binance-side)

Wildcard pattern for the order-router subscribe will be venue.*.trade.* — see the Venue adapter subscription model section below.

Margin / liquidation (M8 prep)

Subject Publisher Subscribers Mode Payload Idempotency key
margin.position.opened (no caller on main) none pub/sub position state positionId
margin.position.updated (no caller on main) risk-service main.ts:55 pub/sub position state with liquidationPrice n/a
margin.position.closed risk-service liquidation-engine.ts:88 none on main pub/sub close event positionId
margin.liquidation.warning risk-service main.ts:113,293 none on main pub/sub {userId, marginAccountId, marginLevel, equity, debt, ...} n/a
margin.liquidation.triggered (defined, no caller) none pub/sub liquidation summary positionId
margin.loan.{created,repaid} (defined, no caller) none pub/sub loan event loanId

Misc

Subject Publisher Subscriber Mode Notes
commands.order.cancel risk-service liquidation-engine.ts:59 (intended: order-router) pub/sub imperative command, not an event
commands.margin.liquidate (defined, no caller) none pub/sub imperative
notifications.send risk-service liquidation-engine.ts:102 (intended: notification sink — external) pub/sub {userId, type, payload}
staking.{staked,unstake.requested,unstaked,rewards.distributed} (defined, no caller) none pub/sub M5 staking events
kyc.{verification.started,verification.completed,level.upgraded} (defined, no caller — commented out at kyc.service.ts:352) none pub/sub M4 KYC events
fiat.{deposit,withdrawal}.* (defined, no caller) none pub/sub FitBank/Pix integration scaffold

RPC pattern

// packages/nats/src/publisher.ts:107
async request<TReq, TRes>(subject: string, data: TReq, timeoutMs = 5000): Promise<TRes>

Default timeout: 5000ms. Every call site that doesn't override gets 5s. Observed overrides: - risk-checker.ts:93ledger.balances.get with explicit 5s, fail-open on error. - All api-gateway → backend RPCs use the default.

Behaviour on timeout: throws NatsError (timeout). Two failure modes are in use across the codebase:

Call site On timeout Rationale
risk-checker.ts:87-104 (ledger.balances.get) fail-open — log warning, return null, skip the balance check. Matching engine enforces balance authoritatively; fail-closed would block trading whenever the ledger blinks.
order-router/main.ts:742-775 (ledger.credit after deposit) log and continue — deposit has already settled in the engine. Alert fires. Engine is source of truth; refusing the deposit in the engine because the audit mirror is slow is worse.
api-gateway/orders.service.ts:57 (orders.submit) fail-closed — propagates to the client as an error. Client must know whether the order was placed.
api-gateway/custody.service.ts:96-180 (custody RPCs) fail-soft — log + return empty/error. Custody is a side channel for address assignment; user can retry.

Circuit breaker (packages/nats/src/publisher.ts:14-76) trips after 5 failures within 30s, blocks new requests for 30s, then half-opens for one probe. Throws CircuitBreakerOpenError while open. Status visible via publisher.getCircuitBreakerStatus() — not currently exposed as a metric (gap).

Idempotency at the bus

NATS provides at-least-once delivery for queue groups and at-most-once for raw subscribers. Neither is exactly-once. Consumers must dedupe.

The reference pattern: ledger_entries.transactionId UNIQUE

// services/ledger-service/src/ledger.ts:85-90
const existing = await db.ledgerEntry.findFirst({ where: { transactionId } });
if (existing) {
  return { success: true, newBalance: existing.balance.toString() };
}

The transactionId field is UNIQUE in Prisma. The check-then-insert is wrapped in a db.$transaction, so even a TOCTOU race resolves: the second writer hits the unique constraint and Prisma throws — which is caught and falls back to the existing branch on retry.

This is the production-grade idempotency mechanism in the platform. Every cross-service write that mutates balances goes through it:

Caller transactionId shape Source
Deposit mirror deposit:${txHash} order-router main.ts:754
Trade settlement trade ID from order-router ledger settleTrade() generates internally via generateTransactionId()
Future: withdrawal mirror withdrawal:${withdrawalId} not yet on main

What's NOT deduped at the bus

  • custody.withdrawal.requested has no idempotency key on the payload. A duplicate publish could double-reserve. Withdrawal flows are still gated behind admin approval today so this isn't yet causing harm — but it's a real gap once auto-withdrawal lands.
  • orders.submit uses the matching engine's internal orderId (numeric, monotonic via Redis INCR) for uniqueness. If clientOrderId is supplied by the user, it is stored but not enforced unique (no UNIQUE constraint on orders.clientOrderId per-user).
  • trades.executed per-market and consolidated subjects fire twice (taker + maker) per fill. Subscribers must be designed for both — the ledger only acts on the consolidated trades.executed subject for this reason.

Venue adapter subscription model (PR#6)

Publisher: venue-adapter-binance emits two subjects per symbol — venue.binance.bookTicker.<SYM> and venue.binance.trade.<SYM>.

Planned subscriber: order-router will subscribe to venue.*.trade.* (wildcard across venues and symbols) and call riskChecker.setVenuePrice(venue, symbol, price). This is not yet wired on mainrisk-checker.ts only has setLastPrice (internal-trade-driven). The PR#6 follow-up adds the wildcard subscription in order-router/main.ts and the venue-price store in risk-checker.ts.

Internal-wins precedence (design, to be implemented in the follow-up):

effective_reference_price(symbol) =
  internal_last_trade_price(symbol)            if exists and < N seconds old
  else venue_last_trade_price(symbol, venue)   for first venue with fresh data
  else null                                     → reject market orders, skip fat-finger check

The rationale: our own book is authoritative for our pricing once it has activity; external venues are seed data and fallback. If the internal book hasn't traded recently, Binance fills the gap so a brand-new market can still accept market orders with a slippage guard.

Cross-link: 04-risk-controls.md covers the RiskChecker semantics this feeds into.

What looks like an event but isn't on NATS

  • Matching-engine internal events — the Java engine uses LMAX Disruptor's ring buffer internally for order-book mutation events. Those never leave the engine; what crosses the boundary is the gRPC response on placeOrder/cancelOrder and the WebSocket /ws/events stream the order-router consumes. The Disruptor events are invisible to the platform.
  • WS-gateway → client streams — WebSocket frames over ws://.../?token=..., not NATS. ws-gateway is a NATS subscriber that fans out to its in-memory channel map (services/ws-gateway/src/connections.ts) and writes to client sockets. The channel name (ticker:BTC-USD) is a ws-gateway construct, not a subject.
  • Temporal workflows — ledger-service runs a Temporal worker (services/ledger-service/src/worker.ts) for retry-safe trade settlement. Workflows talk to Temporal, not NATS; activities can still publish NATS events as a side effect (trades.settled is one — activities/trade-settlement-activities.ts:54).
  • Engine REST + gRPC — order-router fetches symbol specs via REST (MATCHING_ENGINE_URL/api/v1/markets) at startup, and places orders via gRPC. Neither is on the bus.
  • api-gateway HTTP → matching engine — accounts/balances/markets read paths sometimes hit the engine REST directly using shared-secret headers (x-api-key, x-api-secret, x-participant-type).

Known gaps and future-hardening

Gap Impact Fix
No dead-letter queue A consumer that throws on a poison message logs and moves on (packages/nats/src/subscriber.ts:43). The message is acked-by-default. JetStream durable consumer + manual ack, or a *.dlq mirror subject.
No schema registry Payloads are TypeScript types in @quantatrade/types. A publisher and subscriber on different versions can silently disagree. Protobuf or JSON Schema in @quantatrade/types/contracts/.
Subject-mismatch between order-router output and ws-gateway expectations ws-gateway listens on marketdata.trade.*, order-router publishes on trades.executed.<SYM>. End-user real-time trade fan-out doesn't reach the WS today. Either map in ws-gateway, or have order-router also publish on marketdata.trade.<SYM>.
No producer for marketdata.{ticker,orderbook}.* ws-gateway has subscribers but no engine-side publisher on main. Wire engine WS events that aren't trades to marketdata.*, or publish from order-router.
No idempotency on custody.withdrawal.requested Possible double-reserve in matching engine if the message is replayed. Add a withdrawalId field and dedupe at order-router.
Circuit breaker state not in /metrics Hard to alert on a tripped breaker. Export publisher.getCircuitBreakerStatus() via @quantatrade/metrics.
Publisher/subscriber singletons (getPublisher/getSubscriber) ignore reconnect state If NATS drops and reconnects, the existing publisher reference is still valid but in-flight requests during the drop fail with timeout. Acceptable today — the underlying nats client handles reconnect — but worth documenting per call site.

Reading the inventory

Conventions used in the tables above:

  • Publisher — file:line of the call that emits the message. If "external" or "no caller on main", the subject is defined in subjects.ts but has no in-repo publisher yet (typically because the producing service lives in another repo, or the feature is M-future).
  • Subscribers — every consumer of the subject as of platform/main. "Queue X" means the subscriber joined queue group X, so multiple replicas share load; without a queue, every subscriber gets every message.
  • ModeRPC for request/reply (one consumer wins, response returns), pub/sub for fire-and-forget broadcast.
  • Payload — typed in @quantatrade/types or inferred from the publisher call site. There is no schema registry; the contract is whatever both sides agree on in TypeScript.

A handy grep recipe for finding all subjects exercised in code:

grep -rnE "subscribe\(|subscribeService\(|publish\(|publisher\.request" services/ packages/ \
  | grep -v ".test." | grep -v node_modules

This is how the inventory tables were built. Subjects that appear only in packages/nats/src/subjects.ts and nowhere else in the code are listed in the inventory as "no caller on main" — they're reserved namespaces, not active channels.

Operational notes

  • Single NATS broker in dev/prod compose. No cluster, no HA. Acceptable for current scale; document upgrade path before going multi-AZ.
  • All subjects are subject to NATS's 1MB default max_payload. Trade-event payloads are <1KB; no current concern.
  • Queue groups in use: order-router (orders.submit, orders.cancel), ledger-service (all five ledger.* RPCs), risk-service (all three risk.* RPCs). This allows horizontal scaling — multiple order-router instances would share the queue and each message goes to exactly one.

Lifecycle: a deposit, traced through the bus

To make the patterns above concrete, here's a deposit-confirmed event walking through the system end-to-end.

%%{init: {'theme':'base','themeVariables':{'background':'#ffffff','primaryColor':'#ddf4ff','primaryBorderColor':'#0969da','primaryTextColor':'#0a0a0a','lineColor':'#1f2328','secondaryColor':'#fff8c5','tertiaryColor':'#dafbe1','clusterBkg':'#f6f8fa','clusterBorder':'#d0d7de'}}}%%
sequenceDiagram
    autonumber
    participant BG as BitGo<br/>(external)
    participant CUST as custody-service<br/>(out-of-repo)
    participant OR as order-router
    participant ME as exchange-core<br/>(Java engine)
    participant LED as ledger-service
    participant PG as Postgres
    participant WSGW as ws-gateway
    participant CLI as Client

    BG->>CUST: webhook deposit confirmed
    CUST->>OR: NATS pub custody.deposit.confirmed<br/>{txHash, userId, asset, amount}
    OR->>ME: gRPC deposit(currency, scaled-amount)
    ME-->>OR: ok
    OR->>LED: NATS RPC ledger.credit<br/>{transactionId: "deposit:<txHash>", ...}
    Note over LED: idempotent on transactionId<br/>UNIQUE in ledger_entries
    LED->>PG: db.$transaction(account++, ledger_entry insert)
    LED-->>OR: {success: true, newBalance}
    LED->>WSGW: NATS pub balances.updated.<userId>
    WSGW->>CLI: WS push balance.updated

Three things to notice: 1. The engine is updated before the ledger. If the ledger call fails, the engine has already credited the user — that's by design (engine is the authority; ledger is the audit mirror). 2. The transactionId is deterministic: deposit:${txHash}. Replaying the same NATS message produces zero side-effects in the ledger because the UNIQUE constraint short-circuits at ledger.ts:85-90. 3. The downstream balances.updated.<userId> is a side-effect of a successful credit (ledger-service/main.ts:97). If the user is connected to ws-gateway, they see the balance change in <100ms; if not, the next GET /accounts/balances returns the new value.

A trade settlement follows the same pattern with one structural difference: the transactionId is generated inside ledger.settleTrade() (ledger.ts:339 via generateTransactionId()), not supplied by the caller, because the consolidated trades.executed subject has multiple settlements per trade ID (buyer + seller + fees in six legs). The whole settlement is one Postgres transaction, so the six legs share fate.

When to update this doc

  • Adding a new subject in packages/nats/src/subjects.ts → row in the relevant section, file:line citation, idempotency note.
  • Adding JetStream usage → new section on durable consumers.
  • Removing or renaming a subject → mark it deprecated for at least one release before deletion; update consumer code first.
  • A new service starts publishing → update both this doc and the service's section in 05-services-reference.md.
  • A consumer changes its dedup strategy → update the Idempotency key column.