All files / apps/api/src app.controller.ts

29.78% Statements 14/47
36.36% Branches 8/22
28.57% Functions 2/7
27.27% Lines 12/44

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 942x 2x 2x     2x   2x 2x       2x         2x         2x                     2x         1x 1x                                                                                                            
import { Controller, Get, Inject, Query } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { connect } from 'nats';
 
@Controller()
export class AppController {
  constructor(
    @Inject('CATALOG') private readonly catalog: ClientProxy,
    @Inject('ANALYSIS') private readonly analysis: ClientProxy,
  ) {}
 
  @Get('/exchanges')
  listExchanges() {
    return this.catalog.send('catalog.exchanges.list', {});
  }
 
  @Get('/symbols')
  listSymbols(@Query('exchangeId') exchangeId: string) {
    return this.catalog.send('catalog.symbols.list', { exchangeId });
  }
 
  @Get('/analysis/symbol')
  analyzeSymbol(
    @Query('exchangeId') exchangeId: string,
    @Query('symbol') symbol: string,
  ) {
    return this.analysis.send('analysis.symbol.requested', {
      exchangeId,
      symbol,
    });
  }
 
  @Get('/ticks')
  async getTicks(
    @Query('exchangeId') exchangeId: string,
    @Query('symbol') symbol: string,
    @Query('limit') limitStr?: string,
  ) {
    Eif (!exchangeId || !symbol)
      return { error: 'exchangeId and symbol are required' };
    const limit = Math.max(1, Math.min(50, Number(limitStr ?? '20')));
    const conn = await connect({
      servers: process.env.NATS_URL || 'nats://localhost:4222',
    });
    const jsm = await conn.jetstreamManager();
    const subject = `marketdata.ticks.${exchangeId}.${symbol}.v1`;
    const stream = await jsm.streams.find(subject);
    // Create or use durable consumer for filtered subject
    const durable = `api_ticks_${exchangeId}_${symbol}`;
    try {
      await jsm.consumers.info(stream, durable);
    } catch {
      await jsm.consumers.add(stream, {
        durable_name: durable,
        filter_subject: subject,
        ack_policy: 'explicit',
      } as any);
    }
    const js = conn.jetstream();
    const res: unknown[] = [];
    // Simplified: fetch last N messages directly from the stream (sequence-based)
    const si = await jsm.streams
      .getMessage(stream, { last_by_subj: subject } as any)
      .catch(() => null);
    if (si && (si as any).data) {
      // direct last message
      try {
        res.push(JSON.parse(new TextDecoder().decode((si as any).data)));
      } catch (_e) {
        /* ignore */
      }
      await conn.drain();
      return res;
    }
    // Fallback to consumer pull (if getMessage not available)
    const _durableName = `api_ticks_${exchangeId}_${symbol}`;
    const sub = await js.pullSubscribe(subject, {
      durable: (name: string) => name,
    } as any);
    await sub.pull({ batch: limit, expires: 1000 });
    for await (const m of sub) {
      try {
        res.push(JSON.parse(new TextDecoder().decode(m.data)));
      } catch (_e) {
        /* ignore malformed */
      }
      m.ack();
      if (res.length >= limit) break;
    }
    await conn.drain();
    return res;
  }
}