All files / apps/marketdata/src nats-publisher.ts

0% Statements 0/31
0% Branches 0/4
0% Functions 0/3
0% Lines 0/27

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54                                                                                                           
import { Injectable, Logger } from '@nestjs/common';
import { connect, NatsConnection, StringCodec, JetStreamClient } from 'nats';
import { subjects, TickIngestedV1 } from '@connector/contracts';
 
@Injectable()
export class NatsPublisher {
  private readonly logger = new Logger(NatsPublisher.name);
  private conn: NatsConnection | null = null;
  private js: JetStreamClient | null = null;
  private readonly sc = StringCodec();
 
  async ensure(natsUrl: string) {
    if (this.conn) return;
    this.conn = await connect({
      servers: natsUrl,
      name: 'marketdata-publisher',
    });
    this.js = this.conn.jetstream();
    this.logger.log('Connected to NATS for publishing');
    // Auto-provision stream for ticks
    try {
      const jsm = await this.conn.jetstreamManager();
      const subjects = ['marketdata.ticks.*.*.v1'];
      try {
        await jsm.streams.info('marketdata_ticks');
      } catch {
        await jsm.streams.add({
          name: 'marketdata_ticks',
          subjects,
          storage: 'file',
          retention: 'limits',
          max_age: 0, // no TTL by default; adjust as needed
        } as any);
        this.logger.log('Created JetStream stream marketdata_ticks');
      }
    } catch (err) {
      this.logger.warn(`JetStream provisioning error: ${String(err)}`);
    }
  }
 
  async publishTick(evt: TickIngestedV1) {
    if (!this.js) throw new Error('JetStream not initialized');
    const subject = subjects.ticksIngested(evt.exchangeId, evt.symbol);
    const payload = this.sc.encode(JSON.stringify(evt));
    await this.js.publish(subject, payload);
  }
 
  async close() {
    await this.conn?.drain();
    this.conn = null;
    this.js = null;
  }
}