Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 1x 1x | import { Controller, Get, Inject, Query } from '@nestjs/common'; import { ClientProxy } from '@nestjs/microservices'; import { connect } from 'nats'; @Controller() export class AppController { constructor( @Inject('CATALOG') private readonly catalog: ClientProxy, @Inject('ANALYSIS') private readonly analysis: ClientProxy, ) {} @Get('/exchanges') listExchanges() { return this.catalog.send('catalog.exchanges.list', {}); } @Get('/symbols') listSymbols(@Query('exchangeId') exchangeId: string) { return this.catalog.send('catalog.symbols.list', { exchangeId }); } @Get('/analysis/symbol') analyzeSymbol( @Query('exchangeId') exchangeId: string, @Query('symbol') symbol: string, ) { return this.analysis.send('analysis.symbol.requested', { exchangeId, symbol, }); } @Get('/ticks') async getTicks( @Query('exchangeId') exchangeId: string, @Query('symbol') symbol: string, @Query('limit') limitStr?: string, ) { Eif (!exchangeId || !symbol) return { error: 'exchangeId and symbol are required' }; const limit = Math.max(1, Math.min(50, Number(limitStr ?? '20'))); const conn = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222', }); const jsm = await conn.jetstreamManager(); const subject = `marketdata.ticks.${exchangeId}.${symbol}.v1`; const stream = await jsm.streams.find(subject); // Create or use durable consumer for filtered subject const durable = `api_ticks_${exchangeId}_${symbol}`; try { await jsm.consumers.info(stream, durable); } catch { await jsm.consumers.add(stream, { durable_name: durable, filter_subject: subject, ack_policy: 'explicit', } as any); } const js = conn.jetstream(); const res: unknown[] = []; // Simplified: fetch last N messages directly from the stream (sequence-based) const si = await jsm.streams .getMessage(stream, { last_by_subj: subject } as any) .catch(() => null); if (si && (si as any).data) { // direct last message try { res.push(JSON.parse(new TextDecoder().decode((si as any).data))); } catch (_e) { /* ignore */ } await conn.drain(); return res; } // Fallback to consumer pull (if getMessage not available) const _durableName = `api_ticks_${exchangeId}_${symbol}`; const sub = await js.pullSubscribe(subject, { durable: (name: string) => name, } as any); await sub.pull({ batch: limit, expires: 1000 }); for await (const m of sub) { try { res.push(JSON.parse(new TextDecoder().decode(m.data))); } catch (_e) { /* ignore malformed */ } m.ack(); if (res.length >= limit) break; } await conn.drain(); return res; } } |