| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- using Application.Abstractions.Cache;
- using Application.Abstractions.Crypto;
- using Application.Abstractions.Data;
- using Microsoft.EntityFrameworkCore;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Hosting;
- using Microsoft.Extensions.Logging;
- using System.Buffers;
- using System.Net.WebSockets;
- using System.Text;
- using System.Text.Json;
- namespace Infrastructure.Crypto;
- public sealed class UpbitWebSocketService(
- IServiceScopeFactory scopeFactory,
- ICacheService cache,
- ILogger<UpbitWebSocketService> logger
- ) : BackgroundService
- {
- private static readonly Uri _wsUri = new("wss://api.upbit.com/websocket/v1");
- private static readonly TimeSpan _reconnectDelay = TimeSpan.FromSeconds(5);
- private static readonly TimeSpan _pingInterval = TimeSpan.FromSeconds(60);
- protected override async Task ExecuteAsync(CancellationToken ct)
- {
- // 서비스 시작 직후 잠시 대기 (DB 준비)
- await Task.Delay(3000, ct);
- while (!ct.IsCancellationRequested)
- {
- try
- {
- await ConnectAndReceiveAsync(ct);
- }
- catch (OperationCanceledException) when (ct.IsCancellationRequested)
- {
- break;
- }
- catch (Exception ex)
- {
- logger.LogWarning(ex, "Upbit WebSocket 연결 실패. {Delay}초 후 재연결...", _reconnectDelay.TotalSeconds);
- await Task.Delay(_reconnectDelay, ct);
- }
- }
- }
- // 연결 진행
- private async Task ConnectAndReceiveAsync(CancellationToken ct)
- {
- var marketCodes = await GetMarketCodesAsync(ct);
- if (marketCodes.Count == 0)
- {
- logger.LogInformation("활성 코인이 없습니다. 30초 후 재확인...");
- await Task.Delay(TimeSpan.FromSeconds(30), ct);
- return;
- }
- using var ws = new ClientWebSocket();
- await ws.ConnectAsync(_wsUri, ct);
- logger.LogInformation("Upbit WebSocket 연결 완료. 코인 수: {Count}", marketCodes.Count);
- // 구독 메시지 전송
- var subscribeMsg = BuildSubscribeMessage(marketCodes);
- var msgBytes = Encoding.UTF8.GetBytes(subscribeMsg);
- await ws.SendAsync(new ArraySegment<byte>(msgBytes), WebSocketMessageType.Text, true, ct);
- // Ticker 수집 딕셔너리
- var tickers = new Dictionary<string, UpbitTicker>();
- // PING 타이머
- using var pingCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
- var pingTask = PingLoopAsync(ws, pingCts.Token);
- try
- {
- var buffer = ArrayPool<byte>.Shared.Rent(8192);
- try
- {
- while (ws.State == WebSocketState.Open && !ct.IsCancellationRequested)
- {
- using var ms = new MemoryStream();
- WebSocketReceiveResult result;
- do
- {
- result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), ct);
- if (result.MessageType == WebSocketMessageType.Close)
- {
- return;
- }
- ms.Write(buffer, 0, result.Count);
- } while (!result.EndOfMessage);
- if (result.MessageType is WebSocketMessageType.Text or WebSocketMessageType.Binary)
- {
- var json = Encoding.UTF8.GetString(ms.ToArray());
- await ProcessMessageAsync(json, tickers, ct);
- }
- }
- }
- finally
- {
- ArrayPool<byte>.Shared.Return(buffer);
- }
- }
- finally
- {
- await pingCts.CancelAsync();
- try
- {
- await pingTask;
- }
- catch
- {
- /* ping 태스크 정리 */
- }
- }
- }
- // 수신 메시지를 ty 필드로 분기 처리
- private async Task ProcessMessageAsync(string json, Dictionary<string, UpbitTicker> tickers, CancellationToken ct)
- {
- try
- {
- using var doc = JsonDocument.Parse(json);
- var root = doc.RootElement;
- if (!root.TryGetProperty("ty", out var tyProp))
- {
- return;
- }
- var type = tyProp.GetString() ?? "";
- if (type == "ticker")
- {
- ProcessTickerMessage(root, tickers);
- // 전체 Ticker 목록을 Redis에 저장
- if (tickers.Count > 0)
- {
- await cache.SetAsync(CacheKeys.CryptoTickers, tickers.Values.ToList(), ct);
- }
- }
- else if (type == "trade")
- {
- await ProcessTradeMessageAsync(root, ct);
- }
- else if (type == "orderbook")
- {
- await ProcessOrderbookMessageAsync(root, ct);
- }
- else if (type.StartsWith("candle"))
- {
- await ProcessCandleMessageAsync(root, ct);
- }
- }
- catch (JsonException ex)
- {
- logger.LogDebug(ex, "WebSocket 메시지 파싱 실패: {Json}", json[..Math.Min(json.Length, 200)]);
- }
- }
- // DB에서 활성화된 코인 심볼 목록을 가져와서 "KRW-심볼" 형식의 마켓 코드 리스트로 반환
- private async Task<List<string>> GetMarketCodesAsync(CancellationToken ct)
- {
- using var scope = scopeFactory.CreateScope();
- var db = scope.ServiceProvider.GetRequiredService<IAppDbContext>();
- var symbols = await db.Coin.AsNoTracking().Where(c => c.IsActive && !c.IsDelisted).Select(c => c.Symbol).ToListAsync(ct);
- return [..symbols.Select(s => $"KRW-{s.ToUpper()}")];
- }
- // 구독 메시지 생성 (Ticker + Trade + Orderbook + Candle.1m)
- private static string BuildSubscribeMessage(List<string> marketCodes)
- {
- var codes = string.Join("\",\"", marketCodes);
- var codesJson = $"[\"{codes}\"]";
- return $"[{{\"ticket\":\"bitforum\"}},"
- + $"{{\"type\":\"ticker\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"type\":\"trade\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"type\":\"orderbook\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"type\":\"candle.1m\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
- + $"{{\"format\":\"SIMPLE\"}}]";
- }
- // ─── Ticker ─────────────────────────────────────────────────────
- private void ProcessTickerMessage(JsonElement root, Dictionary<string, UpbitTicker> tickers)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var tradePrice = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m;
- var change = root.TryGetProperty("c", out var c) ? c.GetString() ?? "" : "";
- var signedChangePrice = root.TryGetProperty("scp", out var scp) ? scp.GetDecimal() : 0m;
- var signedChangeRate = root.TryGetProperty("scr", out var scr) ? scr.GetDecimal() : 0m;
- var accTradePrice24h = root.TryGetProperty("atp24h", out var atp) ? atp.GetDecimal() : 0m;
- var ticker = new UpbitTicker(market, tradePrice, change, signedChangePrice, signedChangeRate, accTradePrice24h);
- tickers[market] = ticker;
- // 개별 Ticker도 Redis에 저장
- var symbol = market.Replace("KRW-", "");
- _ = cache.SetAsync(CacheKeys.CryptoTicker(symbol), ticker);
- }
- // ─── Trade ──────────────────────────────────────────────────────
- // SIMPLE 필드: cd, tp, tv, ab, pcp, td, ttm, ttms, sid, tms, st
- private async Task ProcessTradeMessageAsync(JsonElement root, CancellationToken ct)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var symbol = market.Replace("KRW-", "");
- var tradePrice = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m;
- var tradeVolume = root.TryGetProperty("tv", out var tv) ? tv.GetDecimal() : 0m;
- var askBid = root.TryGetProperty("ab", out var ab) ? ab.GetString() ?? "" : "";
- var timestamp = root.TryGetProperty("ttms", out var ttms) ? ttms.GetInt64() : 0L;
- var trade = new LiveTrade(timestamp, tradePrice, tradeVolume, askBid);
- await cache.SetAsync(CacheKeys.CryptoTradeLive(symbol), trade, ct);
- }
- // ─── Orderbook ──────────────────────────────────────────────────
- // SIMPLE 필드: cd, tas, tbs, obu[{ap, bp, as, bs}], tms, st
- private async Task ProcessOrderbookMessageAsync(JsonElement root, CancellationToken ct)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var symbol = market.Replace("KRW-", "");
- var totalAskSize = root.TryGetProperty("tas", out var tas) ? tas.GetDecimal() : 0m;
- var totalBidSize = root.TryGetProperty("tbs", out var tbs) ? tbs.GetDecimal() : 0m;
- var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L;
- var units = new List<LiveOrderbookUnit>();
- if (root.TryGetProperty("obu", out var obu) && obu.ValueKind == JsonValueKind.Array)
- {
- foreach (var u in obu.EnumerateArray())
- {
- var askPrice = u.TryGetProperty("ap", out var ap) ? ap.GetDecimal() : 0m;
- var bidPrice = u.TryGetProperty("bp", out var bp) ? bp.GetDecimal() : 0m;
- var askSize = u.TryGetProperty("as", out var asVal) ? asVal.GetDecimal() : 0m;
- var bidSize = u.TryGetProperty("bs", out var bs) ? bs.GetDecimal() : 0m;
- units.Add(new LiveOrderbookUnit(askPrice, bidPrice, askSize, bidSize));
- }
- }
- var orderbook = new LiveOrderbook(totalAskSize, totalBidSize, units, timestamp);
- await cache.SetAsync(CacheKeys.CryptoOrderbookLive(symbol), orderbook, ct);
- }
- // ─── Candle ─────────────────────────────────────────────────────
- // SIMPLE 필드: cd, op, hp, lp, tp, catv, catp, tms, st
- private async Task ProcessCandleMessageAsync(JsonElement root, CancellationToken ct)
- {
- var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
- if (string.IsNullOrEmpty(market))
- {
- return;
- }
- var symbol = market.Replace("KRW-", "");
- var open = root.TryGetProperty("op", out var op) ? op.GetDecimal() : 0m;
- var high = root.TryGetProperty("hp", out var hp) ? hp.GetDecimal() : 0m;
- var low = root.TryGetProperty("lp", out var lp) ? lp.GetDecimal() : 0m;
- var close = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m;
- var volume = root.TryGetProperty("catv", out var catv) ? catv.GetDecimal() : 0m;
- var tradePrice = root.TryGetProperty("catp", out var catp) ? catp.GetDecimal() : 0m;
- var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L;
- // tms는 ms 단위 → 초 단위로 변환 (캔들 Time 필드와 일관성)
- var timeSec = timestamp / 1000;
- var candle = new LiveCandle(timeSec, open, high, low, close, volume, tradePrice);
- await cache.SetAsync(CacheKeys.CryptoCandleLive(symbol, "m1"), candle, ct);
- }
- // ─── PING ───────────────────────────────────────────────────────
- private static async Task PingLoopAsync(ClientWebSocket ws, CancellationToken ct)
- {
- try
- {
- while (!ct.IsCancellationRequested && ws.State == WebSocketState.Open)
- {
- await Task.Delay(_pingInterval, ct);
- if (ws.State == WebSocketState.Open)
- {
- var pingBytes = "PING"u8.ToArray();
- await ws.SendAsync(new ArraySegment<byte>(pingBytes), WebSocketMessageType.Text, true, ct);
- }
- }
- }
- catch (OperationCanceledException)
- {
- // 정상 종료
- }
- }
- // ─── Live 데이터 Redis 저장용 Records ──────────────────────────
- internal sealed record LiveTrade(
- long Timestamp,
- decimal TradePrice,
- decimal TradeVolume,
- string AskBid
- );
- internal sealed record LiveOrderbook(
- decimal TotalAskSize,
- decimal TotalBidSize,
- List<LiveOrderbookUnit> Units,
- long Timestamp
- );
- internal sealed record LiveOrderbookUnit(
- decimal AskPrice,
- decimal BidPrice,
- decimal AskSize,
- decimal BidSize
- );
- internal sealed record LiveCandle(
- long Time,
- decimal Open,
- decimal High,
- decimal Low,
- decimal Close,
- decimal Volume,
- decimal TradePrice
- );
- }
|