Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | 2x 2x 2x 2x 2x 2x | import { SubscribeMessage, WebSocketGateway, WebSocketServer, OnGatewayConnection, } from '@nestjs/websockets'; import { Server, WebSocket } from 'ws'; import { connect, StringCodec } from 'nats'; type Filters = { exchange?: string; symbol?: string }; @WebSocketGateway({ path: '/ws', transports: [], cors: { origin: '*' } }) export class ApiWsGateway implements OnGatewayConnection { @WebSocketServer() server!: Server; private sc = StringCodec(); private filters = new WeakMap<WebSocket, Filters>(); // This method runs on gateway init if needed (simplified) async afterInit() { // Bridge NATS ticks to WS clients (basic fan-out) const conn = await connect({ servers: process.env.NATS_URL || 'nats://localhost:4222', }); const sub = await conn.subscribe('marketdata.ticks.>'); (async () => { for await (const m of sub) { const json = this.sc.decode(m.data); this.server.clients.forEach((client: any) => { if (client.readyState !== 1) return; const f = this.filters.get(client as WebSocket); if (!f) { client.send(json); return; } try { const obj = JSON.parse(json) as { exchangeId: string; symbol: string; }; if (f.exchange && f.exchange !== obj.exchangeId) return; if (f.symbol && f.symbol !== obj.symbol) return; client.send(json); } catch { client.send(json); } }); } })().catch(() => undefined); } @SubscribeMessage('ping') handlePing(): string { return 'pong'; } handleConnection(client: WebSocket, req: any) { try { const url = new URL(req.url, `http://${req.headers.host}`); const exchange = url.searchParams.get('exchange') || undefined; const symbol = url.searchParams.get('symbol') || undefined; if (exchange || symbol) this.filters.set(client, { exchange, symbol }); } catch { // ignore parsing errors, no filters applied } } } |