| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- using Application.Abstractions.Cache;
- using Application.Abstractions.Crypto;
- using Application.Abstractions.Data;
- using Microsoft.EntityFrameworkCore;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Hosting;
- using Microsoft.Extensions.Logging;
- using System.Buffers;
- using System.Net.WebSockets;
- using System.Text;
- using System.Text.Json;
- namespace Infrastructure.Crypto;
- public sealed class UpbitWebSocketService(
- IServiceScopeFactory scopeFactory,
- ICacheService cache,
- ICryptoHubService hub,
- ILogger<UpbitWebSocketService> logger
- ) : BackgroundService {
- private static readonly Uri _wsUri = new("wss://api.upbit.com/websocket/v1");
- private static readonly TimeSpan _reconnectDelay = TimeSpan.FromSeconds(5);
- private static readonly TimeSpan _pingInterval = TimeSpan.FromSeconds(60);
- protected override async Task ExecuteAsync(CancellationToken ct)
- {
- // 서비스 시작 직후 잠시 대기 (DB 준비)
- await Task.Delay(3000, ct);
- while (!ct.IsCancellationRequested)
- {
- try
- {
- await ConnectAndReceiveAsync(ct);
- }
- catch (OperationCanceledException) when (ct.IsCancellationRequested)
- {
- break;
- }
- catch (Exception ex)
- {
- logger.LogWarning(ex, "Upbit WebSocket 연결 실패. {Delay}초 후 재연결...", _reconnectDelay.TotalSeconds);
- await Task.Delay(_reconnectDelay, ct);
- }
- }
- }
- // 연결 진행
- private async Task ConnectAndReceiveAsync(CancellationToken ct)
- {
- var marketCodes = await GetMarketCodesAsync(ct);
- if (marketCodes.Count == 0)
- {
- logger.LogInformation("활성 거래쌍이 없습니다. 30초 후 재확인...");
- await Task.Delay(TimeSpan.FromSeconds(30), ct);
- return;
- }
- var uniqueQuotes = marketCodes.Select(x => x.Split('-')[0]).Distinct().Count();
- using var ws = new ClientWebSocket();
- await ws.ConnectAsync(_wsUri, ct);
- logger.LogInformation("Upbit WebSocket 연결 완료. 코인 수: {0}, 마켓 수: {1}", marketCodes.Count, uniqueQuotes);
- // 구독 메시지 전송
- var subscribeMsg = BuildSubscribeMessage(marketCodes);
- var msgBytes = Encoding.UTF8.GetBytes(subscribeMsg);
- await ws.SendAsync(new ArraySegment<byte>(msgBytes), WebSocketMessageType.Text, true, ct);
- // Ticker 수집 딕셔너리
- var tickers = new Dictionary<string, UpbitTicker>();
- // PING 타이머
- using var pingCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
- var pingTask = PingLoopAsync(ws, pingCts.Token);
- try
- {
- var buffer = ArrayPool<byte>.Shared.Rent(8192);
- try
- {
- while (ws.State == WebSocketState.Open && !ct.IsCancellationRequested)
- {
- using var ms = new MemoryStream();
- WebSocketReceiveResult result;
- do
- {
- result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), ct);
- if (result.MessageType == WebSocketMessageType.Close)
- {
- return;
- }
- ms.Write(buffer, 0, result.Count);
- } while (!result.EndOfMessage);
- if (result.MessageType is WebSocketMessageType.Text or WebSocketMessageType.Binary)
- {
- var json = Encoding.UTF8.GetString(ms.ToArray());
- await ProcessMessageAsync(json, tickers, ct);
- }
- }
- }
- finally
- {
- ArrayPool<byte>.Shared.Return(buffer);
- }
- }
- finally
- {
- await pingCts.CancelAsync();
- try
- {
- await pingTask;
- }
- catch
- {
- /* ping 태스크 정리 */
- }
- }
- }
- // 수신 메시지를 ty 필드로 분기 처리
- private async Task ProcessMessageAsync(string json, Dictionary<string, UpbitTicker> tickers, CancellationToken ct)
- {
- try
- {
- using var doc = JsonDocument.Parse(json);
- var root = doc.RootElement;
- if (!root.TryGetProperty("ty", out var tyProp))
- {
- return;
- }
- var type = tyProp.GetString() ?? "";
- if (type == "ticker")
- {
- await ProcessTickerMessageAsync(root, tickers, ct);
- // 전체 Ticker 목록을 quote별로 Redis 저장 + SignalR 전송
- if (tickers.Count > 0)
- {
- var grouped = tickers.Values.GroupBy(t => t.Market.Split('-')[0]);
- foreach (var group in grouped)
- {
- await cache.SetAsync(CacheKeys.CryptoTickers(group.Key), group.ToList(), ct);
- }
- try
- {
- foreach (var group in grouped)
- {
- var hubTickers = group.Select(t => new CryptoHubData.TickerData(
- t.Market, t.Market.Split('-')[^1],
- t.OpeningPrice, t.HighPrice, t.LowPrice, t.TradePrice,
- t.PrevClosingPrice, t.Change, t.ChangePrice, t.SignedChangePrice,
- t.ChangeRate, t.SignedChangeRate, t.TradeVolume, t.AccTradeVolume,
- t.AccTradeVolume24h, t.AccTradePrice, t.AccTradePrice24h,
- t.TradeDate, t.TradeTime, t.TradeTimestamp, t.AskBid,
- t.AccAskVolume, t.AccBidVolume, t.Highest52WeekPrice,
- t.Highest52WeekDate, t.Lowest52WeekPrice, t.Lowest52WeekDate,
- t.MarketState, t.DelistingDate, t.MarketWarning,
- t.Timestamp, t.StreamType
- )).ToList();
- await hub.SendTickersAsync(hubTickers, ct);
- }
- }
- catch (Exception ex)
- {
- logger.LogDebug(ex, "SignalR tickers 전송 실패");
- }
- }
- }
- else if (type == "trade")
- {
- await ProcessTradeMessageAsync(root, ct);
- }
- else if (type == "orderbook")
- {
- await ProcessOrderbookMessageAsync(root, ct);
- }
- else if (type.StartsWith("candle"))
- {
- await ProcessCandleMessageAsync(root, ct);
- }
- }
- catch (JsonException ex)
- {
- logger.LogDebug(ex, "WebSocket 메시지 파싱 실패: {Json}", json[..Math.Min(json.Length, 200)]);
- }
- }
- // DB에서 활성 코인의 전체 마켓 코드를 CoinMarket 테이블에서 조회
- private async Task<List<string>> GetMarketCodesAsync(CancellationToken ct)
- {
- using var scope = scopeFactory.CreateScope();
- var db = scope.ServiceProvider.GetRequiredService<IAppDbContext>();
- return await db.CoinMarket.AsNoTracking().Join(db.Coin.AsNoTracking().Where(c => c.IsActive && !c.IsDelisted), cm => cm.CoinID, c => c.ID, (cm, c) => cm.Market).ToListAsync(ct);
- }
- // 구독 메시지 생성 (Ticker + Trade + Orderbook + Candle.1m)
- private static string BuildSubscribeMessage(List<string> marketCodes)
- {
- var codes = string.Join("\",\"", marketCodes);
- var codesJson = $"[\"{codes}\"]";
- return $"[{{\"ticket\":\"bitforum\"}},"
- + $"{{\"type\":\"ticker\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"type\":\"trade\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"type\":\"orderbook\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"type\":\"candle.1m\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"format\":\"SIMPLE\"}}]";
- }
- // ─── Ticker ─────────────────────────────────────────────────────
- private async Task ProcessTickerMessageAsync(JsonElement root, Dictionary<string, UpbitTicker> tickers, CancellationToken ct)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var ticker = new UpbitTicker(
- market,
- root.TryGetProperty("op", out var op) ? op.GetDecimal() : 0m,
- root.TryGetProperty("hp", out var hp) ? hp.GetDecimal() : 0m,
- root.TryGetProperty("lp", out var lp) ? lp.GetDecimal() : 0m,
- root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m,
- root.TryGetProperty("pcp", out var pcp) ? pcp.GetDecimal() : 0m,
- GetSafeString(root, "c"),
- root.TryGetProperty("cp", out var cpVal) ? cpVal.GetDecimal() : 0m,
- root.TryGetProperty("scp", out var scp) ? scp.GetDecimal() : 0m,
- root.TryGetProperty("cr", out var cr) ? cr.GetDecimal() : 0m,
- root.TryGetProperty("scr", out var scr) ? scr.GetDecimal() : 0m,
- root.TryGetProperty("tv", out var tv) ? tv.GetDecimal() : 0m,
- root.TryGetProperty("atv", out var atv) ? atv.GetDecimal() : 0m,
- root.TryGetProperty("atv24h", out var atv24h) ? atv24h.GetDecimal() : 0m,
- root.TryGetProperty("atp", out var atp) ? atp.GetDecimal() : 0m,
- root.TryGetProperty("atp24h", out var atp24h) ? atp24h.GetDecimal() : 0m,
- GetSafeString(root, "tdt"),
- GetSafeString(root, "ttm"),
- root.TryGetProperty("ttms", out var ttms) ? ttms.GetInt64() : 0L,
- GetSafeString(root, "ab"),
- root.TryGetProperty("aav", out var aav) ? aav.GetDecimal() : 0m,
- root.TryGetProperty("abv", out var abv) ? abv.GetDecimal() : 0m,
- root.TryGetProperty("h52wp", out var h52wp) ? h52wp.GetDecimal() : 0m,
- GetSafeString(root, "h52wdt"),
- root.TryGetProperty("l52wp", out var l52wp) ? l52wp.GetDecimal() : 0m,
- GetSafeString(root, "l52wdt"),
- GetSafeString(root, "ms"),
- GetSafeNullableString(root, "dd"),
- GetSafeString(root, "mw"),
- root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L,
- GetSafeString(root, "st")
- );
- tickers[market] = ticker;
- // 개별 Ticker Redis 저장 + SignalR 전송
- var symbol = market.Split('-')[^1];
- await cache.SetAsync(CacheKeys.CryptoTicker(market), ticker, ct);
- try
- {
- await hub.SendTickerAsync(new CryptoHubData.TickerData(
- market, symbol, ticker.OpeningPrice, ticker.HighPrice, ticker.LowPrice,
- ticker.TradePrice, ticker.PrevClosingPrice, ticker.Change,
- ticker.ChangePrice, ticker.SignedChangePrice, ticker.ChangeRate,
- ticker.SignedChangeRate, ticker.TradeVolume, ticker.AccTradeVolume,
- ticker.AccTradeVolume24h, ticker.AccTradePrice, ticker.AccTradePrice24h,
- ticker.TradeDate, ticker.TradeTime, ticker.TradeTimestamp, ticker.AskBid,
- ticker.AccAskVolume, ticker.AccBidVolume, ticker.Highest52WeekPrice,
- ticker.Highest52WeekDate, ticker.Lowest52WeekPrice, ticker.Lowest52WeekDate,
- ticker.MarketState, ticker.DelistingDate, ticker.MarketWarning,
- ticker.Timestamp, ticker.StreamType
- ), ct);
- }
- catch (Exception ex)
- {
- logger.LogDebug(ex, "SignalR ticker 전송 실패: {Market}", market);
- }
- }
- // ─── Trade ──────────────────────────────────────────────────────
- // SIMPLE 필드: cd, tp, tv, ab, pcp, c, cp, td, ttm, ttms, sid, tms, st, bap, bas, bbp, bbs
- private async Task ProcessTradeMessageAsync(JsonElement root, CancellationToken ct)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var symbol = market.Split('-')[^1];
- var trade = new LiveTrade(
- root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m,
- root.TryGetProperty("tv", out var tv) ? tv.GetDecimal() : 0m,
- root.TryGetProperty("ab", out var ab) ? ab.GetString() ?? "" : "",
- root.TryGetProperty("pcp", out var pcp) ? pcp.GetDecimal() : 0m,
- root.TryGetProperty("c", out var c) ? c.GetString() ?? "" : "",
- root.TryGetProperty("cp", out var cp) ? cp.GetDecimal() : 0m,
- root.TryGetProperty("td", out var td) ? td.GetString() ?? "" : "",
- root.TryGetProperty("ttm", out var ttm) ? ttm.GetString() ?? "" : "",
- root.TryGetProperty("ttms", out var ttms) ? ttms.GetInt64() : 0L,
- root.TryGetProperty("sid", out var sid) ? sid.GetInt64() : 0L,
- root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L,
- root.TryGetProperty("st", out var st) ? st.GetString() ?? "" : "",
- root.TryGetProperty("bap", out var bap) ? bap.GetDecimal() : 0m,
- root.TryGetProperty("bas", out var bas) ? bas.GetDecimal() : 0m,
- root.TryGetProperty("bbp", out var bbp) ? bbp.GetDecimal() : 0m,
- root.TryGetProperty("bbs", out var bbs) ? bbs.GetDecimal() : 0m
- );
- await cache.SetAsync(CacheKeys.CryptoTradeLive(market), trade, ct);
- try
- {
- await hub.SendTradeAsync(new CryptoHubData.TradeData(
- market, symbol, trade.TradePrice, trade.TradeVolume, trade.AskBid,
- trade.PrevClosingPrice, trade.Change, trade.ChangePrice,
- trade.TradeDate, trade.TradeTime, trade.TradeTimestamp,
- trade.SequentialId, trade.Timestamp, trade.StreamType,
- trade.BestAskPrice, trade.BestAskSize, trade.BestBidPrice, trade.BestBidSize
- ), ct);
- }
- catch (Exception ex)
- {
- logger.LogDebug(ex, "SignalR trade 전송 실패: {Market}", market);
- }
- }
- // ─── Orderbook ──────────────────────────────────────────────────
- // SIMPLE 필드: cd, tas, tbs, obu[{ap, bp, as, bs}], tms, lv, st
- private async Task ProcessOrderbookMessageAsync(JsonElement root, CancellationToken ct)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var symbol = market.Split('-')[^1];
- var totalAskSize = root.TryGetProperty("tas", out var tas) ? tas.GetDecimal() : 0m;
- var totalBidSize = root.TryGetProperty("tbs", out var tbs) ? tbs.GetDecimal() : 0m;
- var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L;
- var level = root.TryGetProperty("lv", out var lv) ? lv.GetDecimal() : 0m;
- var streamType = root.TryGetProperty("st", out var st) ? st.GetString() ?? "" : "";
- var units = new List<LiveOrderbookUnit>();
- if (root.TryGetProperty("obu", out var obu) && obu.ValueKind == JsonValueKind.Array)
- {
- foreach (var u in obu.EnumerateArray())
- {
- var askPrice = u.TryGetProperty("ap", out var ap) ? ap.GetDecimal() : 0m;
- var bidPrice = u.TryGetProperty("bp", out var bp) ? bp.GetDecimal() : 0m;
- var askSize = u.TryGetProperty("as", out var asVal) ? asVal.GetDecimal() : 0m;
- var bidSize = u.TryGetProperty("bs", out var bs) ? bs.GetDecimal() : 0m;
- units.Add(new LiveOrderbookUnit(askPrice, bidPrice, askSize, bidSize));
- }
- }
- var orderbook = new LiveOrderbook(totalAskSize, totalBidSize, units, timestamp, level, streamType);
- await cache.SetAsync(CacheKeys.CryptoOrderbookLive(market), orderbook, ct);
- try
- {
- await hub.SendOrderbookAsync(new CryptoHubData.OrderbookData(
- market, symbol, orderbook.TotalAskSize, orderbook.TotalBidSize,
- [..orderbook.Units.Select(u => new CryptoHubData.OrderbookUnitData(u.AskPrice, u.BidPrice, u.AskSize, u.BidSize))],
- orderbook.Timestamp, orderbook.Level, orderbook.StreamType
- ), ct);
- }
- catch (Exception ex)
- {
- logger.LogDebug(ex, "SignalR orderbook 전송 실패: {Market}", market);
- }
- }
- // ─── Candle ─────────────────────────────────────────────────────
- // SIMPLE 필드: cd, cdttmu, cdttmk, op, hp, lp, tp, catv, catp, tms, st
- private async Task ProcessCandleMessageAsync(JsonElement root, CancellationToken ct)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var symbol = market.Split('-')[^1];
- var candle = new LiveCandle(
- root.TryGetProperty("cdttmu", out var cdttmu) ? cdttmu.GetString() ?? "" : "",
- root.TryGetProperty("cdttmk", out var cdttmk) ? cdttmk.GetString() ?? "" : "",
- root.TryGetProperty("op", out var op) ? op.GetDecimal() : 0m,
- root.TryGetProperty("hp", out var hp) ? hp.GetDecimal() : 0m,
- root.TryGetProperty("lp", out var lp) ? lp.GetDecimal() : 0m,
- root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m,
- root.TryGetProperty("catv", out var catv) ? catv.GetDecimal() : 0m,
- root.TryGetProperty("catp", out var catp) ? catp.GetDecimal() : 0m,
- root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L,
- root.TryGetProperty("st", out var st) ? st.GetString() ?? "" : ""
- );
- await cache.SetAsync(CacheKeys.CryptoCandleLive(market, "m1"), candle, ct);
- try
- {
- await hub.SendCandleAsync(new CryptoHubData.CandleData(
- market, symbol, candle.CandleDateTimeUtc, candle.CandleDateTimeKst,
- candle.OpeningPrice, candle.HighPrice, candle.LowPrice, candle.TradePrice,
- candle.CandleAccTradeVolume, candle.CandleAccTradePrice,
- candle.Timestamp, candle.StreamType
- ), ct);
- }
- catch (Exception ex)
- {
- logger.LogDebug(ex, "SignalR candle 전송 실패: {Market}", market);
- }
- }
- // ─── PING ───────────────────────────────────────────────────────
- private static async Task PingLoopAsync(ClientWebSocket ws, CancellationToken ct)
- {
- try
- {
- while (!ct.IsCancellationRequested && ws.State == WebSocketState.Open)
- {
- await Task.Delay(_pingInterval, ct);
- if (ws.State == WebSocketState.Open)
- {
- var pingBytes = "PING"u8.ToArray();
- await ws.SendAsync(new ArraySegment<byte>(pingBytes), WebSocketMessageType.Text, true, ct);
- }
- }
- }
- catch (OperationCanceledException)
- {
- // 정상 종료
- }
- }
- // ─── JSON 안전 파싱 헬퍼 ────────────────────────────────────────
- private static string GetSafeString(JsonElement root, string prop, string fallback = "")
- {
- if (!root.TryGetProperty(prop, out var el))
- {
- return fallback;
- }
- return el.ValueKind == JsonValueKind.String ? el.GetString() ?? fallback : fallback;
- }
- private static string? GetSafeNullableString(JsonElement root, string prop)
- {
- if (!root.TryGetProperty(prop, out var el))
- {
- return null;
- }
- return el.ValueKind == JsonValueKind.String ? el.GetString() : el.ToString();
- }
- // ─── Live 데이터 Redis 저장용 Records ──────────────────────────
- internal sealed record LiveTrade(
- decimal TradePrice,
- decimal TradeVolume,
- string AskBid,
- decimal PrevClosingPrice,
- string Change,
- decimal ChangePrice,
- string TradeDate,
- string TradeTime,
- long TradeTimestamp,
- long SequentialId,
- long Timestamp,
- string StreamType,
- decimal BestAskPrice,
- decimal BestAskSize,
- decimal BestBidPrice,
- decimal BestBidSize
- );
- internal sealed record LiveOrderbook(
- decimal TotalAskSize,
- decimal TotalBidSize,
- List<LiveOrderbookUnit> Units,
- long Timestamp,
- decimal Level,
- string StreamType
- );
- internal sealed record LiveOrderbookUnit(
- decimal AskPrice,
- decimal BidPrice,
- decimal AskSize,
- decimal BidSize
- );
- internal sealed record LiveCandle(
- string CandleDateTimeUtc,
- string CandleDateTimeKst,
- decimal OpeningPrice,
- decimal HighPrice,
- decimal LowPrice,
- decimal TradePrice,
- decimal CandleAccTradeVolume,
- decimal CandleAccTradePrice,
- long Timestamp,
- string StreamType
- );
- }
|