Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | import { Injectable, Logger } from '@nestjs/common'; import { connect, NatsConnection, StringCodec, JetStreamClient } from 'nats'; import { subjects, TickIngestedV1 } from '@connector/contracts'; @Injectable() export class NatsPublisher { private readonly logger = new Logger(NatsPublisher.name); private conn: NatsConnection | null = null; private js: JetStreamClient | null = null; private readonly sc = StringCodec(); async ensure(natsUrl: string) { if (this.conn) return; this.conn = await connect({ servers: natsUrl, name: 'marketdata-publisher', }); this.js = this.conn.jetstream(); this.logger.log('Connected to NATS for publishing'); // Auto-provision stream for ticks try { const jsm = await this.conn.jetstreamManager(); const subjects = ['marketdata.ticks.*.*.v1']; try { await jsm.streams.info('marketdata_ticks'); } catch { await jsm.streams.add({ name: 'marketdata_ticks', subjects, storage: 'file', retention: 'limits', max_age: 0, // no TTL by default; adjust as needed } as any); this.logger.log('Created JetStream stream marketdata_ticks'); } } catch (err) { this.logger.warn(`JetStream provisioning error: ${String(err)}`); } } async publishTick(evt: TickIngestedV1) { if (!this.js) throw new Error('JetStream not initialized'); const subject = subjects.ticksIngested(evt.exchangeId, evt.symbol); const payload = this.sc.encode(JSON.stringify(evt)); await this.js.publish(subject, payload); } async close() { await this.conn?.drain(); this.conn = null; this.js = null; } } |