UpbitWebSocketService.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. using Application.Abstractions.Cache;
  2. using Application.Abstractions.Crypto;
  3. using Application.Abstractions.Data;
  4. using Microsoft.EntityFrameworkCore;
  5. using Microsoft.Extensions.DependencyInjection;
  6. using Microsoft.Extensions.Hosting;
  7. using Microsoft.Extensions.Logging;
  8. using System.Buffers;
  9. using System.Net.WebSockets;
  10. using System.Text;
  11. using System.Text.Json;
  12. namespace Infrastructure.Crypto;
  13. public sealed class UpbitWebSocketService(
  14. IServiceScopeFactory scopeFactory,
  15. ICacheService cache,
  16. ILogger<UpbitWebSocketService> logger
  17. ) : BackgroundService
  18. {
  19. private static readonly Uri _wsUri = new("wss://api.upbit.com/websocket/v1");
  20. private static readonly TimeSpan _reconnectDelay = TimeSpan.FromSeconds(5);
  21. private static readonly TimeSpan _pingInterval = TimeSpan.FromSeconds(60);
  22. protected override async Task ExecuteAsync(CancellationToken ct)
  23. {
  24. // 서비스 시작 직후 잠시 대기 (DB 준비)
  25. await Task.Delay(3000, ct);
  26. while (!ct.IsCancellationRequested)
  27. {
  28. try
  29. {
  30. await ConnectAndReceiveAsync(ct);
  31. }
  32. catch (OperationCanceledException) when (ct.IsCancellationRequested)
  33. {
  34. break;
  35. }
  36. catch (Exception ex)
  37. {
  38. logger.LogWarning(ex, "Upbit WebSocket 연결 실패. {Delay}초 후 재연결...", _reconnectDelay.TotalSeconds);
  39. await Task.Delay(_reconnectDelay, ct);
  40. }
  41. }
  42. }
  43. // 연결 진행
  44. private async Task ConnectAndReceiveAsync(CancellationToken ct)
  45. {
  46. var marketCodes = await GetMarketCodesAsync(ct);
  47. if (marketCodes.Count == 0)
  48. {
  49. logger.LogInformation("활성 코인이 없습니다. 30초 후 재확인...");
  50. await Task.Delay(TimeSpan.FromSeconds(30), ct);
  51. return;
  52. }
  53. using var ws = new ClientWebSocket();
  54. await ws.ConnectAsync(_wsUri, ct);
  55. logger.LogInformation("Upbit WebSocket 연결 완료. 코인 수: {Count}", marketCodes.Count);
  56. // 구독 메시지 전송
  57. var subscribeMsg = BuildSubscribeMessage(marketCodes);
  58. var msgBytes = Encoding.UTF8.GetBytes(subscribeMsg);
  59. await ws.SendAsync(new ArraySegment<byte>(msgBytes), WebSocketMessageType.Text, true, ct);
  60. // Ticker 수집 딕셔너리
  61. var tickers = new Dictionary<string, UpbitTicker>();
  62. // PING 타이머
  63. using var pingCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
  64. var pingTask = PingLoopAsync(ws, pingCts.Token);
  65. try
  66. {
  67. var buffer = ArrayPool<byte>.Shared.Rent(8192);
  68. try
  69. {
  70. while (ws.State == WebSocketState.Open && !ct.IsCancellationRequested)
  71. {
  72. using var ms = new MemoryStream();
  73. WebSocketReceiveResult result;
  74. do
  75. {
  76. result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), ct);
  77. if (result.MessageType == WebSocketMessageType.Close)
  78. {
  79. return;
  80. }
  81. ms.Write(buffer, 0, result.Count);
  82. } while (!result.EndOfMessage);
  83. if (result.MessageType is WebSocketMessageType.Text or WebSocketMessageType.Binary)
  84. {
  85. var json = Encoding.UTF8.GetString(ms.ToArray());
  86. await ProcessMessageAsync(json, tickers, ct);
  87. }
  88. }
  89. }
  90. finally
  91. {
  92. ArrayPool<byte>.Shared.Return(buffer);
  93. }
  94. }
  95. finally
  96. {
  97. await pingCts.CancelAsync();
  98. try
  99. {
  100. await pingTask;
  101. }
  102. catch
  103. {
  104. /* ping 태스크 정리 */
  105. }
  106. }
  107. }
  108. // 수신 메시지를 ty 필드로 분기 처리
  109. private async Task ProcessMessageAsync(string json, Dictionary<string, UpbitTicker> tickers, CancellationToken ct)
  110. {
  111. try
  112. {
  113. using var doc = JsonDocument.Parse(json);
  114. var root = doc.RootElement;
  115. if (!root.TryGetProperty("ty", out var tyProp))
  116. {
  117. return;
  118. }
  119. var type = tyProp.GetString() ?? "";
  120. if (type == "ticker")
  121. {
  122. ProcessTickerMessage(root, tickers);
  123. // 전체 Ticker 목록을 Redis에 저장
  124. if (tickers.Count > 0)
  125. {
  126. await cache.SetAsync(CacheKeys.CryptoTickers, tickers.Values.ToList(), ct);
  127. }
  128. }
  129. else if (type == "trade")
  130. {
  131. await ProcessTradeMessageAsync(root, ct);
  132. }
  133. else if (type == "orderbook")
  134. {
  135. await ProcessOrderbookMessageAsync(root, ct);
  136. }
  137. else if (type.StartsWith("candle"))
  138. {
  139. await ProcessCandleMessageAsync(root, ct);
  140. }
  141. }
  142. catch (JsonException ex)
  143. {
  144. logger.LogDebug(ex, "WebSocket 메시지 파싱 실패: {Json}", json[..Math.Min(json.Length, 200)]);
  145. }
  146. }
  147. // DB에서 활성화된 코인 심볼 목록을 가져와서 "KRW-심볼" 형식의 마켓 코드 리스트로 반환
  148. private async Task<List<string>> GetMarketCodesAsync(CancellationToken ct)
  149. {
  150. using var scope = scopeFactory.CreateScope();
  151. var db = scope.ServiceProvider.GetRequiredService<IAppDbContext>();
  152. var symbols = await db.Coin.AsNoTracking().Where(c => c.IsActive && !c.IsDelisted).Select(c => c.Symbol).ToListAsync(ct);
  153. return [..symbols.Select(s => $"KRW-{s.ToUpper()}")];
  154. }
  155. // 구독 메시지 생성 (Ticker + Trade + Orderbook + Candle.1m)
  156. private static string BuildSubscribeMessage(List<string> marketCodes)
  157. {
  158. var codes = string.Join("\",\"", marketCodes);
  159. var codesJson = $"[\"{codes}\"]";
  160. return $"[{{\"ticket\":\"bitforum\"}},"
  161. + $"{{\"type\":\"ticker\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
  162. + $"{{\"type\":\"trade\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
  163. + $"{{\"type\":\"orderbook\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
  164. + $"{{\"type\":\"candle.1m\",\"codes\":{codesJson},\"isOnlyRealtime\":true}},"
  165. + $"{{\"format\":\"SIMPLE\"}}]";
  166. }
  167. // ─── Ticker ─────────────────────────────────────────────────────
  168. private void ProcessTickerMessage(JsonElement root, Dictionary<string, UpbitTicker> tickers)
  169. {
  170. var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
  171. if (string.IsNullOrEmpty(market))
  172. {
  173. return;
  174. }
  175. var tradePrice = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m;
  176. var change = root.TryGetProperty("c", out var c) ? c.GetString() ?? "" : "";
  177. var signedChangePrice = root.TryGetProperty("scp", out var scp) ? scp.GetDecimal() : 0m;
  178. var signedChangeRate = root.TryGetProperty("scr", out var scr) ? scr.GetDecimal() : 0m;
  179. var accTradePrice24h = root.TryGetProperty("atp24h", out var atp) ? atp.GetDecimal() : 0m;
  180. var ticker = new UpbitTicker(market, tradePrice, change, signedChangePrice, signedChangeRate, accTradePrice24h);
  181. tickers[market] = ticker;
  182. // 개별 Ticker도 Redis에 저장
  183. var symbol = market.Replace("KRW-", "");
  184. _ = cache.SetAsync(CacheKeys.CryptoTicker(symbol), ticker);
  185. }
  186. // ─── Trade ──────────────────────────────────────────────────────
  187. // SIMPLE 필드: cd, tp, tv, ab, pcp, td, ttm, ttms, sid, tms, st
  188. private async Task ProcessTradeMessageAsync(JsonElement root, CancellationToken ct)
  189. {
  190. var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
  191. if (string.IsNullOrEmpty(market))
  192. {
  193. return;
  194. }
  195. var symbol = market.Replace("KRW-", "");
  196. var tradePrice = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m;
  197. var tradeVolume = root.TryGetProperty("tv", out var tv) ? tv.GetDecimal() : 0m;
  198. var askBid = root.TryGetProperty("ab", out var ab) ? ab.GetString() ?? "" : "";
  199. var timestamp = root.TryGetProperty("ttms", out var ttms) ? ttms.GetInt64() : 0L;
  200. var trade = new LiveTrade(timestamp, tradePrice, tradeVolume, askBid);
  201. await cache.SetAsync(CacheKeys.CryptoTradeLive(symbol), trade, ct);
  202. }
  203. // ─── Orderbook ──────────────────────────────────────────────────
  204. // SIMPLE 필드: cd, tas, tbs, obu[{ap, bp, as, bs}], tms, st
  205. private async Task ProcessOrderbookMessageAsync(JsonElement root, CancellationToken ct)
  206. {
  207. var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
  208. if (string.IsNullOrEmpty(market))
  209. {
  210. return;
  211. }
  212. var symbol = market.Replace("KRW-", "");
  213. var totalAskSize = root.TryGetProperty("tas", out var tas) ? tas.GetDecimal() : 0m;
  214. var totalBidSize = root.TryGetProperty("tbs", out var tbs) ? tbs.GetDecimal() : 0m;
  215. var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L;
  216. var units = new List<LiveOrderbookUnit>();
  217. if (root.TryGetProperty("obu", out var obu) && obu.ValueKind == JsonValueKind.Array)
  218. {
  219. foreach (var u in obu.EnumerateArray())
  220. {
  221. var askPrice = u.TryGetProperty("ap", out var ap) ? ap.GetDecimal() : 0m;
  222. var bidPrice = u.TryGetProperty("bp", out var bp) ? bp.GetDecimal() : 0m;
  223. var askSize = u.TryGetProperty("as", out var asVal) ? asVal.GetDecimal() : 0m;
  224. var bidSize = u.TryGetProperty("bs", out var bs) ? bs.GetDecimal() : 0m;
  225. units.Add(new LiveOrderbookUnit(askPrice, bidPrice, askSize, bidSize));
  226. }
  227. }
  228. var orderbook = new LiveOrderbook(totalAskSize, totalBidSize, units, timestamp);
  229. await cache.SetAsync(CacheKeys.CryptoOrderbookLive(symbol), orderbook, ct);
  230. }
  231. // ─── Candle ─────────────────────────────────────────────────────
  232. // SIMPLE 필드: cd, op, hp, lp, tp, catv, catp, tms, st
  233. private async Task ProcessCandleMessageAsync(JsonElement root, CancellationToken ct)
  234. {
  235. var market = root.TryGetProperty("cd", out var cd) ? cd.GetString() ?? "" : "";
  236. if (string.IsNullOrEmpty(market))
  237. {
  238. return;
  239. }
  240. var symbol = market.Replace("KRW-", "");
  241. var open = root.TryGetProperty("op", out var op) ? op.GetDecimal() : 0m;
  242. var high = root.TryGetProperty("hp", out var hp) ? hp.GetDecimal() : 0m;
  243. var low = root.TryGetProperty("lp", out var lp) ? lp.GetDecimal() : 0m;
  244. var close = root.TryGetProperty("tp", out var tp) ? tp.GetDecimal() : 0m;
  245. var volume = root.TryGetProperty("catv", out var catv) ? catv.GetDecimal() : 0m;
  246. var tradePrice = root.TryGetProperty("catp", out var catp) ? catp.GetDecimal() : 0m;
  247. var timestamp = root.TryGetProperty("tms", out var tms) ? tms.GetInt64() : 0L;
  248. // tms는 ms 단위 → 초 단위로 변환 (캔들 Time 필드와 일관성)
  249. var timeSec = timestamp / 1000;
  250. var candle = new LiveCandle(timeSec, open, high, low, close, volume, tradePrice);
  251. await cache.SetAsync(CacheKeys.CryptoCandleLive(symbol, "m1"), candle, ct);
  252. }
  253. // ─── PING ───────────────────────────────────────────────────────
  254. private static async Task PingLoopAsync(ClientWebSocket ws, CancellationToken ct)
  255. {
  256. try
  257. {
  258. while (!ct.IsCancellationRequested && ws.State == WebSocketState.Open)
  259. {
  260. await Task.Delay(_pingInterval, ct);
  261. if (ws.State == WebSocketState.Open)
  262. {
  263. var pingBytes = "PING"u8.ToArray();
  264. await ws.SendAsync(new ArraySegment<byte>(pingBytes), WebSocketMessageType.Text, true, ct);
  265. }
  266. }
  267. }
  268. catch (OperationCanceledException)
  269. {
  270. // 정상 종료
  271. }
  272. }
  273. // ─── Live 데이터 Redis 저장용 Records ──────────────────────────
  274. internal sealed record LiveTrade(
  275. long Timestamp,
  276. decimal TradePrice,
  277. decimal TradeVolume,
  278. string AskBid
  279. );
  280. internal sealed record LiveOrderbook(
  281. decimal TotalAskSize,
  282. decimal TotalBidSize,
  283. List<LiveOrderbookUnit> Units,
  284. long Timestamp
  285. );
  286. internal sealed record LiveOrderbookUnit(
  287. decimal AskPrice,
  288. decimal BidPrice,
  289. decimal AskSize,
  290. decimal BidSize
  291. );
  292. internal sealed record LiveCandle(
  293. long Time,
  294. decimal Open,
  295. decimal High,
  296. decimal Low,
  297. decimal Close,
  298. decimal Volume,
  299. decimal TradePrice
  300. );
  301. }