using Application.Abstractions.Cache; using Application.Abstractions.Crypto; using Application.Abstractions.Data; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Buffers; using System.Net.WebSockets; using System.Text; using System.Text.Json; namespace Infrastructure.Crypto; public sealed class UpbitWebSocketService( IServiceScopeFactory scopeFactory, ICacheService cache, ILogger logger ) : BackgroundService { private static readonly Uri _wsUri = new("wss://api.upbit.com/websocket/v1"); private static readonly TimeSpan _reconnectDelay = TimeSpan.FromSeconds(5); private static readonly TimeSpan _pingInterval = TimeSpan.FromSeconds(60); protected override async Task ExecuteAsync(CancellationToken ct) { // 서비스 시작 직후 잠시 대기 (DB 준비) await Task.Delay(3000, ct); while (!ct.IsCancellationRequested) { try { await ConnectAndReceiveAsync(ct); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { break; } catch (Exception ex) { logger.LogWarning(ex, "Upbit WebSocket 연결 실패. {Delay}초 후 재연결...", _reconnectDelay.TotalSeconds); await Task.Delay(_reconnectDelay, ct); } } } // 연결 진행 private async Task ConnectAndReceiveAsync(CancellationToken ct) { var marketCodes = await GetMarketCodesAsync(ct); if (marketCodes.Count == 0) { logger.LogInformation("활성 코인이 없습니다. 30초 후 재확인..."); await Task.Delay(TimeSpan.FromSeconds(30), ct); return; } using var ws = new ClientWebSocket(); await ws.ConnectAsync(_wsUri, ct); logger.LogInformation("Upbit WebSocket 연결 완료. 코인 수: {Count}", marketCodes.Count); // 구독 메시지 전송 var subscribeMsg = BuildSubscribeMessage(marketCodes); var msgBytes = Encoding.UTF8.GetBytes(subscribeMsg); await ws.SendAsync(new ArraySegment(msgBytes), WebSocketMessageType.Text, true, ct); // Ticker 수집 딕셔너리 var tickers = new Dictionary(); // PING 타이머 using var pingCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var pingTask = PingLoopAsync(ws, pingCts.Token); try { var buffer = ArrayPool.Shared.Rent(8192); try { while (ws.State == WebSocketState.Open && !ct.IsCancellationRequested) { using var ms = new MemoryStream(); WebSocketReceiveResult result; do { result = await ws.ReceiveAsync(new ArraySegment(buffer), ct); if (result.MessageType == WebSocketMessageType.Close) { return; } ms.Write(buffer, 0, result.Count); } while (!result.EndOfMessage); if (result.MessageType is WebSocketMessageType.Text or WebSocketMessageType.Binary) { var json = Encoding.UTF8.GetString(ms.ToArray()); await ProcessMessageAsync(json, tickers, ct); } } } finally { ArrayPool.Shared.Return(buffer); } } finally { await pingCts.CancelAsync(); try { await pingTask; } catch { /* ping 태스크 정리 */ } } } // 수신 메시지를 ty 필드로 분기 처리 private async Task ProcessMessageAsync(string json, Dictionary tickers, CancellationToken ct) { try { using var doc = JsonDocument.Parse(json); var root = doc.RootElement; if (!root.TryGetProperty("ty", out var tyProp)) { return; } var type = tyProp.GetString() ?? ""; if (type == "ticker") { ProcessTickerMessage(root, tickers); // 전체 Ticker 목록을 Redis에 저장 if (tickers.Count > 0) { await cache.SetAsync(CacheKeys.CryptoTickers, tickers.Values.ToList(), ct); } } else if (type == "trade") { await ProcessTradeMessageAsync(root, ct); } else if (type == "orderbook") { await ProcessOrderbookMessageAsync(root, ct); } else if (type.StartsWith("candle")) { await ProcessCandleMessageAsync(root, ct); } } catch (JsonException ex) { logger.LogDebug(ex, "WebSocket 메시지 파싱 실패: {Json}", json[..Math.Min(json.Length, 200)]); } } // DB에서 활성화된 코인 심볼 목록을 가져와서 "KRW-심볼" 형식의 마켓 코드 리스트로 반환 private async Task> GetMarketCodesAsync(CancellationToken ct) { using var scope = scopeFactory.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); var symbols = await db.Coin.AsNoTracking().Where(c => c.IsActive && !c.IsDelisted).Select(c => c.Symbol).ToListAsync(ct); return [..symbols.Select(s => $"KRW-{s.ToUpper()}")]; } // 구독 메시지 생성 (Ticker + Trade + Orderbook + Candle.1m) private static string BuildSubscribeMessage(List marketCodes) { var codes = string.Join("\",\"", marketCodes); var codesJson = $"[\"{codes}\"]"; return $"[{{\"ticket\":\"bitforum\"}}," + $"{{\"type\":\"ticker\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"type\":\"trade\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"type\":\"orderbook\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"type\":\"candle.1m\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"format\":\"SIMPLE\"}}]"; } // ─── Ticker ───────────────────────────────────────────────────── private void ProcessTickerMessage(JsonElement root, Dictionary tickers) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var tradePrice = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m; var change = root.TryGetProperty("c", out var c) ? c.GetString() ?? "" : ""; var signedChangePrice = root.TryGetProperty("scp", out var scp) ? scp.GetDecimal() : 0m; var signedChangeRate = root.TryGetProperty("scr", out var scr) ? scr.GetDecimal() : 0m; var accTradePrice24h = root.TryGetProperty("atp24h", out var atp) ? atp.GetDecimal() : 0m; var ticker = new UpbitTicker(market, tradePrice, change, signedChangePrice, signedChangeRate, accTradePrice24h); tickers[market] = ticker; // 개별 Ticker도 Redis에 저장 var symbol = market.Replace("KRW-", ""); _ = cache.SetAsync(CacheKeys.CryptoTicker(symbol), ticker); } // ─── Trade ────────────────────────────────────────────────────── // SIMPLE 필드: cd, tp, tv, ab, pcp, td, ttm, ttms, sid, tms, st private async Task ProcessTradeMessageAsync(JsonElement root, CancellationToken ct) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var symbol = market.Replace("KRW-", ""); var tradePrice = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m; var tradeVolume = root.TryGetProperty("tv", out var tv) ? tv.GetDecimal() : 0m; var askBid = root.TryGetProperty("ab", out var ab) ? ab.GetString() ?? "" : ""; var timestamp = root.TryGetProperty("ttms", out var ttms) ? ttms.GetInt64() : 0L; var trade = new LiveTrade(timestamp, tradePrice, tradeVolume, askBid); await cache.SetAsync(CacheKeys.CryptoTradeLive(symbol), trade, ct); } // ─── Orderbook ────────────────────────────────────────────────── // SIMPLE 필드: cd, tas, tbs, obu[{ap, bp, as, bs}], tms, st private async Task ProcessOrderbookMessageAsync(JsonElement root, CancellationToken ct) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var symbol = market.Replace("KRW-", ""); var totalAskSize = root.TryGetProperty("tas", out var tas) ? tas.GetDecimal() : 0m; var totalBidSize = root.TryGetProperty("tbs", out var tbs) ? tbs.GetDecimal() : 0m; var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L; var units = new List(); if (root.TryGetProperty("obu", out var obu) && obu.ValueKind == JsonValueKind.Array) { foreach (var u in obu.EnumerateArray()) { var askPrice = u.TryGetProperty("ap", out var ap) ? ap.GetDecimal() : 0m; var bidPrice = u.TryGetProperty("bp", out var bp) ? bp.GetDecimal() : 0m; var askSize = u.TryGetProperty("as", out var asVal) ? asVal.GetDecimal() : 0m; var bidSize = u.TryGetProperty("bs", out var bs) ? bs.GetDecimal() : 0m; units.Add(new LiveOrderbookUnit(askPrice, bidPrice, askSize, bidSize)); } } var orderbook = new LiveOrderbook(totalAskSize, totalBidSize, units, timestamp); await cache.SetAsync(CacheKeys.CryptoOrderbookLive(symbol), orderbook, ct); } // ─── Candle ───────────────────────────────────────────────────── // SIMPLE 필드: cd, op, hp, lp, tp, catv, catp, tms, st private async Task ProcessCandleMessageAsync(JsonElement root, CancellationToken ct) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var symbol = market.Replace("KRW-", ""); var open = root.TryGetProperty("op", out var op) ? op.GetDecimal() : 0m; var high = root.TryGetProperty("hp", out var hp) ? hp.GetDecimal() : 0m; var low = root.TryGetProperty("lp", out var lp) ? lp.GetDecimal() : 0m; var close = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m; var volume = root.TryGetProperty("catv", out var catv) ? catv.GetDecimal() : 0m; var tradePrice = root.TryGetProperty("catp", out var catp) ? catp.GetDecimal() : 0m; var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L; // tms는 ms 단위 → 초 단위로 변환 (캔들 Time 필드와 일관성) var timeSec = timestamp / 1000; var candle = new LiveCandle(timeSec, open, high, low, close, volume, tradePrice); await cache.SetAsync(CacheKeys.CryptoCandleLive(symbol, "m1"), candle, ct); } // ─── PING ─────────────────────────────────────────────────────── private static async Task PingLoopAsync(ClientWebSocket ws, CancellationToken ct) { try { while (!ct.IsCancellationRequested && ws.State == WebSocketState.Open) { await Task.Delay(_pingInterval, ct); if (ws.State == WebSocketState.Open) { var pingBytes = "PING"u8.ToArray(); await ws.SendAsync(new ArraySegment(pingBytes), WebSocketMessageType.Text, true, ct); } } } catch (OperationCanceledException) { // 정상 종료 } } // ─── Live 데이터 Redis 저장용 Records ────────────────────────── internal sealed record LiveTrade( long Timestamp, decimal TradePrice, decimal TradeVolume, string AskBid ); internal sealed record LiveOrderbook( decimal TotalAskSize, decimal TotalBidSize, List Units, long Timestamp ); internal sealed record LiveOrderbookUnit( decimal AskPrice, decimal BidPrice, decimal AskSize, decimal BidSize ); internal sealed record LiveCandle( long Time, decimal Open, decimal High, decimal Low, decimal Close, decimal Volume, decimal TradePrice ); }