using Application.Abstractions.Cache; using Application.Abstractions.Crypto; using Application.Abstractions.Data; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Buffers; using System.Net.WebSockets; using System.Text; using System.Text.Json; namespace Infrastructure.Crypto; public sealed class UpbitWebSocketService( IServiceScopeFactory scopeFactory, ICacheService cache, ICryptoHubService hub, ILogger logger ) : BackgroundService { private static readonly Uri _wsUri = new("wss://api.upbit.com/websocket/v1"); private static readonly TimeSpan _reconnectDelay = TimeSpan.FromSeconds(5); private static readonly TimeSpan _pingInterval = TimeSpan.FromSeconds(60); protected override async Task ExecuteAsync(CancellationToken ct) { // 서비스 시작 직후 잠시 대기 (DB 준비) await Task.Delay(3000, ct); while (!ct.IsCancellationRequested) { try { await ConnectAndReceiveAsync(ct); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { break; } catch (Exception ex) { logger.LogWarning(ex, "Upbit WebSocket 연결 실패. {Delay}초 후 재연결...", _reconnectDelay.TotalSeconds); await Task.Delay(_reconnectDelay, ct); } } } // 연결 진행 private async Task ConnectAndReceiveAsync(CancellationToken ct) { var marketCodes = await GetMarketCodesAsync(ct); if (marketCodes.Count == 0) { logger.LogInformation("활성 거래쌍이 없습니다. 30초 후 재확인..."); await Task.Delay(TimeSpan.FromSeconds(30), ct); return; } var uniqueQuotes = marketCodes.Select(x => x.Split('-')[0]).Distinct().Count(); using var ws = new ClientWebSocket(); await ws.ConnectAsync(_wsUri, ct); logger.LogInformation("Upbit WebSocket 연결 완료. 코인 수: {0}, 마켓 수: {1}", marketCodes.Count, uniqueQuotes); // 구독 메시지 전송 var subscribeMsg = BuildSubscribeMessage(marketCodes); var msgBytes = Encoding.UTF8.GetBytes(subscribeMsg); await ws.SendAsync(new ArraySegment(msgBytes), WebSocketMessageType.Text, true, ct); // Ticker 수집 딕셔너리 var tickers = new Dictionary(); // PING 타이머 using var pingCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var pingTask = PingLoopAsync(ws, pingCts.Token); try { var buffer = ArrayPool.Shared.Rent(8192); try { while (ws.State == WebSocketState.Open && !ct.IsCancellationRequested) { using var ms = new MemoryStream(); WebSocketReceiveResult result; do { result = await ws.ReceiveAsync(new ArraySegment(buffer), ct); if (result.MessageType == WebSocketMessageType.Close) { return; } ms.Write(buffer, 0, result.Count); } while (!result.EndOfMessage); if (result.MessageType is WebSocketMessageType.Text or WebSocketMessageType.Binary) { var json = Encoding.UTF8.GetString(ms.ToArray()); await ProcessMessageAsync(json, tickers, ct); } } } finally { ArrayPool.Shared.Return(buffer); } } finally { await pingCts.CancelAsync(); try { await pingTask; } catch { /* ping 태스크 정리 */ } } } // 수신 메시지를 ty 필드로 분기 처리 private async Task ProcessMessageAsync(string json, Dictionary tickers, CancellationToken ct) { try { using var doc = JsonDocument.Parse(json); var root = doc.RootElement; if (!root.TryGetProperty("ty", out var tyProp)) { return; } var type = tyProp.GetString() ?? ""; if (type == "ticker") { await ProcessTickerMessageAsync(root, tickers, ct); // 전체 Ticker 목록을 quote별로 Redis 저장 + SignalR 전송 if (tickers.Count > 0) { var grouped = tickers.Values.GroupBy(t => t.Market.Split('-')[0]); foreach (var group in grouped) { await cache.SetAsync(CacheKeys.CryptoTickers(group.Key), group.ToList(), ct); } try { foreach (var group in grouped) { var hubTickers = group.Select(t => new CryptoHubData.TickerData( t.Market, t.Market.Split('-')[^1], t.OpeningPrice, t.HighPrice, t.LowPrice, t.TradePrice, t.PrevClosingPrice, t.Change, t.ChangePrice, t.SignedChangePrice, t.ChangeRate, t.SignedChangeRate, t.TradeVolume, t.AccTradeVolume, t.AccTradeVolume24h, t.AccTradePrice, t.AccTradePrice24h, t.TradeDate, t.TradeTime, t.TradeTimestamp, t.AskBid, t.AccAskVolume, t.AccBidVolume, t.Highest52WeekPrice, t.Highest52WeekDate, t.Lowest52WeekPrice, t.Lowest52WeekDate, t.MarketState, t.DelistingDate, t.MarketWarning, t.Timestamp, t.StreamType )).ToList(); await hub.SendTickersAsync(hubTickers, ct); } } catch (Exception ex) { logger.LogDebug(ex, "SignalR tickers 전송 실패"); } } } else if (type == "trade") { await ProcessTradeMessageAsync(root, ct); } else if (type == "orderbook") { await ProcessOrderbookMessageAsync(root, ct); } else if (type.StartsWith("candle")) { await ProcessCandleMessageAsync(root, ct); } } catch (JsonException ex) { logger.LogDebug(ex, "WebSocket 메시지 파싱 실패: {Json}", json[..Math.Min(json.Length, 200)]); } } // DB에서 활성 코인의 전체 마켓 코드를 CoinMarket 테이블에서 조회 private async Task> GetMarketCodesAsync(CancellationToken ct) { using var scope = scopeFactory.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); return await db.CoinMarket.AsNoTracking().Join(db.Coin.AsNoTracking().Where(c => c.IsActive && !c.IsDelisted), cm => cm.CoinID, c => c.ID, (cm, c) => cm.Market).ToListAsync(ct); } // 구독 메시지 생성 (Ticker + Trade + Orderbook + Candle.1m) private static string BuildSubscribeMessage(List marketCodes) { var codes = string.Join("\",\"", marketCodes); var codesJson = $"[\"{codes}\"]"; return $"[{{\"ticket\":\"bitforum\"}}," + $"{{\"type\":\"ticker\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"type\":\"trade\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"type\":\"orderbook\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"type\":\"candle.1m\",\"codes\":{codesJson},\"isOnlyRealtime\":true}}," + $"{{\"format\":\"SIMPLE\"}}]"; } // ─── Ticker ───────────────────────────────────────────────────── private async Task ProcessTickerMessageAsync(JsonElement root, Dictionary tickers, CancellationToken ct) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var ticker = new UpbitTicker( market, root.TryGetProperty("op", out var op) ? op.GetDecimal() : 0m, root.TryGetProperty("hp", out var hp) ? hp.GetDecimal() : 0m, root.TryGetProperty("lp", out var lp) ? lp.GetDecimal() : 0m, root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m, root.TryGetProperty("pcp", out var pcp) ? pcp.GetDecimal() : 0m, GetSafeString(root, "c"), root.TryGetProperty("cp", out var cpVal) ? cpVal.GetDecimal() : 0m, root.TryGetProperty("scp", out var scp) ? scp.GetDecimal() : 0m, root.TryGetProperty("cr", out var cr) ? cr.GetDecimal() : 0m, root.TryGetProperty("scr", out var scr) ? scr.GetDecimal() : 0m, root.TryGetProperty("tv", out var tv) ? tv.GetDecimal() : 0m, root.TryGetProperty("atv", out var atv) ? atv.GetDecimal() : 0m, root.TryGetProperty("atv24h", out var atv24h) ? atv24h.GetDecimal() : 0m, root.TryGetProperty("atp", out var atp) ? atp.GetDecimal() : 0m, root.TryGetProperty("atp24h", out var atp24h) ? atp24h.GetDecimal() : 0m, GetSafeString(root, "tdt"), GetSafeString(root, "ttm"), root.TryGetProperty("ttms", out var ttms) ? ttms.GetInt64() : 0L, GetSafeString(root, "ab"), root.TryGetProperty("aav", out var aav) ? aav.GetDecimal() : 0m, root.TryGetProperty("abv", out var abv) ? abv.GetDecimal() : 0m, root.TryGetProperty("h52wp", out var h52wp) ? h52wp.GetDecimal() : 0m, GetSafeString(root, "h52wdt"), root.TryGetProperty("l52wp", out var l52wp) ? l52wp.GetDecimal() : 0m, GetSafeString(root, "l52wdt"), GetSafeString(root, "ms"), GetSafeNullableString(root, "dd"), GetSafeString(root, "mw"), root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L, GetSafeString(root, "st") ); tickers[market] = ticker; // 개별 Ticker Redis 저장 + SignalR 전송 var symbol = market.Split('-')[^1]; await cache.SetAsync(CacheKeys.CryptoTicker(market), ticker, ct); try { await hub.SendTickerAsync(new CryptoHubData.TickerData( market, symbol, ticker.OpeningPrice, ticker.HighPrice, ticker.LowPrice, ticker.TradePrice, ticker.PrevClosingPrice, ticker.Change, ticker.ChangePrice, ticker.SignedChangePrice, ticker.ChangeRate, ticker.SignedChangeRate, ticker.TradeVolume, ticker.AccTradeVolume, ticker.AccTradeVolume24h, ticker.AccTradePrice, ticker.AccTradePrice24h, ticker.TradeDate, ticker.TradeTime, ticker.TradeTimestamp, ticker.AskBid, ticker.AccAskVolume, ticker.AccBidVolume, ticker.Highest52WeekPrice, ticker.Highest52WeekDate, ticker.Lowest52WeekPrice, ticker.Lowest52WeekDate, ticker.MarketState, ticker.DelistingDate, ticker.MarketWarning, ticker.Timestamp, ticker.StreamType ), ct); } catch (Exception ex) { logger.LogDebug(ex, "SignalR ticker 전송 실패: {Market}", market); } } // ─── Trade ────────────────────────────────────────────────────── // SIMPLE 필드: cd, tp, tv, ab, pcp, c, cp, td, ttm, ttms, sid, tms, st, bap, bas, bbp, bbs private async Task ProcessTradeMessageAsync(JsonElement root, CancellationToken ct) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var symbol = market.Split('-')[^1]; var trade = new LiveTrade( root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m, root.TryGetProperty("tv", out var tv) ? tv.GetDecimal() : 0m, root.TryGetProperty("ab", out var ab) ? ab.GetString() ?? "" : "", root.TryGetProperty("pcp", out var pcp) ? pcp.GetDecimal() : 0m, root.TryGetProperty("c", out var c) ? c.GetString() ?? "" : "", root.TryGetProperty("cp", out var cp) ? cp.GetDecimal() : 0m, root.TryGetProperty("td", out var td) ? td.GetString() ?? "" : "", root.TryGetProperty("ttm", out var ttm) ? ttm.GetString() ?? "" : "", root.TryGetProperty("ttms", out var ttms) ? ttms.GetInt64() : 0L, root.TryGetProperty("sid", out var sid) ? sid.GetInt64() : 0L, root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L, root.TryGetProperty("st", out var st) ? st.GetString() ?? "" : "", root.TryGetProperty("bap", out var bap) ? bap.GetDecimal() : 0m, root.TryGetProperty("bas", out var bas) ? bas.GetDecimal() : 0m, root.TryGetProperty("bbp", out var bbp) ? bbp.GetDecimal() : 0m, root.TryGetProperty("bbs", out var bbs) ? bbs.GetDecimal() : 0m ); await cache.SetAsync(CacheKeys.CryptoTradeLive(market), trade, ct); try { await hub.SendTradeAsync(new CryptoHubData.TradeData( market, symbol, trade.TradePrice, trade.TradeVolume, trade.AskBid, trade.PrevClosingPrice, trade.Change, trade.ChangePrice, trade.TradeDate, trade.TradeTime, trade.TradeTimestamp, trade.SequentialId, trade.Timestamp, trade.StreamType, trade.BestAskPrice, trade.BestAskSize, trade.BestBidPrice, trade.BestBidSize ), ct); } catch (Exception ex) { logger.LogDebug(ex, "SignalR trade 전송 실패: {Market}", market); } } // ─── Orderbook ────────────────────────────────────────────────── // SIMPLE 필드: cd, tas, tbs, obu[{ap, bp, as, bs}], tms, lv, st private async Task ProcessOrderbookMessageAsync(JsonElement root, CancellationToken ct) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var symbol = market.Split('-')[^1]; var totalAskSize = root.TryGetProperty("tas", out var tas) ? tas.GetDecimal() : 0m; var totalBidSize = root.TryGetProperty("tbs", out var tbs) ? tbs.GetDecimal() : 0m; var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L; var level = root.TryGetProperty("lv", out var lv) ? lv.GetDecimal() : 0m; var streamType = root.TryGetProperty("st", out var st) ? st.GetString() ?? "" : ""; var units = new List(); if (root.TryGetProperty("obu", out var obu) && obu.ValueKind == JsonValueKind.Array) { foreach (var u in obu.EnumerateArray()) { var askPrice = u.TryGetProperty("ap", out var ap) ? ap.GetDecimal() : 0m; var bidPrice = u.TryGetProperty("bp", out var bp) ? bp.GetDecimal() : 0m; var askSize = u.TryGetProperty("as", out var asVal) ? asVal.GetDecimal() : 0m; var bidSize = u.TryGetProperty("bs", out var bs) ? bs.GetDecimal() : 0m; units.Add(new LiveOrderbookUnit(askPrice, bidPrice, askSize, bidSize)); } } var orderbook = new LiveOrderbook(totalAskSize, totalBidSize, units, timestamp, level, streamType); await cache.SetAsync(CacheKeys.CryptoOrderbookLive(market), orderbook, ct); try { await hub.SendOrderbookAsync(new CryptoHubData.OrderbookData( market, symbol, orderbook.TotalAskSize, orderbook.TotalBidSize, [..orderbook.Units.Select(u => new CryptoHubData.OrderbookUnitData(u.AskPrice, u.BidPrice, u.AskSize, u.BidSize))], orderbook.Timestamp, orderbook.Level, orderbook.StreamType ), ct); } catch (Exception ex) { logger.LogDebug(ex, "SignalR orderbook 전송 실패: {Market}", market); } } // ─── Candle ───────────────────────────────────────────────────── // SIMPLE 필드: cd, cdttmu, cdttmk, op, hp, lp, tp, catv, catp, tms, st private async Task ProcessCandleMessageAsync(JsonElement root, CancellationToken ct) { var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : ""; if (string.IsNullOrEmpty(market)) { return; } var symbol = market.Split('-')[^1]; var candle = new LiveCandle( root.TryGetProperty("cdttmu", out var cdttmu) ? cdttmu.GetString() ?? "" : "", root.TryGetProperty("cdttmk", out var cdttmk) ? cdttmk.GetString() ?? "" : "", root.TryGetProperty("op", out var op) ? op.GetDecimal() : 0m, root.TryGetProperty("hp", out var hp) ? hp.GetDecimal() : 0m, root.TryGetProperty("lp", out var lp) ? lp.GetDecimal() : 0m, root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m, root.TryGetProperty("catv", out var catv) ? catv.GetDecimal() : 0m, root.TryGetProperty("catp", out var catp) ? catp.GetDecimal() : 0m, root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L, root.TryGetProperty("st", out var st) ? st.GetString() ?? "" : "" ); await cache.SetAsync(CacheKeys.CryptoCandleLive(market, "m1"), candle, ct); try { await hub.SendCandleAsync(new CryptoHubData.CandleData( market, symbol, candle.CandleDateTimeUtc, candle.CandleDateTimeKst, candle.OpeningPrice, candle.HighPrice, candle.LowPrice, candle.TradePrice, candle.CandleAccTradeVolume, candle.CandleAccTradePrice, candle.Timestamp, candle.StreamType ), ct); } catch (Exception ex) { logger.LogDebug(ex, "SignalR candle 전송 실패: {Market}", market); } } // ─── PING ─────────────────────────────────────────────────────── private static async Task PingLoopAsync(ClientWebSocket ws, CancellationToken ct) { try { while (!ct.IsCancellationRequested && ws.State == WebSocketState.Open) { await Task.Delay(_pingInterval, ct); if (ws.State == WebSocketState.Open) { var pingBytes = "PING"u8.ToArray(); await ws.SendAsync(new ArraySegment(pingBytes), WebSocketMessageType.Text, true, ct); } } } catch (OperationCanceledException) { // 정상 종료 } } // ─── JSON 안전 파싱 헬퍼 ──────────────────────────────────────── private static string GetSafeString(JsonElement root, string prop, string fallback = "") { if (!root.TryGetProperty(prop, out var el)) { return fallback; } return el.ValueKind == JsonValueKind.String ? el.GetString() ?? fallback : fallback; } private static string? GetSafeNullableString(JsonElement root, string prop) { if (!root.TryGetProperty(prop, out var el)) { return null; } return el.ValueKind == JsonValueKind.String ? el.GetString() : el.ToString(); } // ─── Live 데이터 Redis 저장용 Records ────────────────────────── internal sealed record LiveTrade( decimal TradePrice, decimal TradeVolume, string AskBid, decimal PrevClosingPrice, string Change, decimal ChangePrice, string TradeDate, string TradeTime, long TradeTimestamp, long SequentialId, long Timestamp, string StreamType, decimal BestAskPrice, decimal BestAskSize, decimal BestBidPrice, decimal BestBidSize ); internal sealed record LiveOrderbook( decimal TotalAskSize, decimal TotalBidSize, List Units, long Timestamp, decimal Level, string StreamType ); internal sealed record LiveOrderbookUnit( decimal AskPrice, decimal BidPrice, decimal AskSize, decimal BidSize ); internal sealed record LiveCandle( string CandleDateTimeUtc, string CandleDateTimeKst, decimal OpeningPrice, decimal HighPrice, decimal LowPrice, decimal TradePrice, decimal CandleAccTradeVolume, decimal CandleAccTradePrice, long Timestamp, string StreamType ); }