using System.Collections.Concurrent; using System.Text.Json; using Application.Abstractions.YouTube; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace Infrastructure.YouTube; /// /// YouTube 라이브 채팅 수집 서비스 /// /// 동작 흐름: /// 1. StartAsync(videoId) 호출 → videos.list로 liveChatId 획득 (1 unit) /// 2. liveChatMessages.list 폴링 시작 (pollingIntervalMillis 준수) /// 3. 새 메시지 → OnMessageReceived 이벤트 발행 → ChatHub에서 브로드캐스트 /// 4. 방송 종료 감지 → 자동 정리 /// /// 향후 gRPC streamList로 전환 시 이 클래스 내부만 교체하면 됨 /// (인터페이스 IYouTubeLiveChatService는 동일하게 유지) /// internal sealed class YouTubeLiveChatService( IYouTubeApiService youTubeApiService, IHttpClientFactory httpClientFactory, ILogger logger ) : BackgroundService, IYouTubeLiveChatService { private const string BaseUrl = "https://www.googleapis.com/youtube/v3"; private const int DefaultPollingMs = 10_000; // 10초 (API 응답 없으면 기본값) private const int MaxPollingMs = 30_000; // 최대 30초 // liveChatId → CancellationTokenSource private readonly ConcurrentDictionary _monitors = new(); public event Func? OnMessageReceived; // ── IYouTubeLiveChatService ────────────────────────────────────── public async Task StartAsync(string videoId, CancellationToken ct) { // videos.list로 liveChatId 획득 (~1 unit) var liveInfo = await youTubeApiService.GetLiveStreamInfoAsync(videoId, ct); if (liveInfo is null) { logger.LogWarning("[LiveChat] 영상 정보를 가져올 수 없음: videoId={VideoId}", videoId); return false; } if (!liveInfo.IsLive || string.IsNullOrEmpty(liveInfo.ActiveLiveChatId)) { logger.LogWarning("[LiveChat] 라이브 방송이 아니거나 채팅이 비활성: videoId={VideoId}, status={Status}", videoId, liveInfo.LiveBroadcastContent); return false; } var liveChatId = liveInfo.ActiveLiveChatId; if (_monitors.ContainsKey(liveChatId)) { logger.LogInformation("[LiveChat] 이미 모니터링 중: liveChatId={LiveChatId}", liveChatId); return true; } var cts = new CancellationTokenSource(); var context = new ChatMonitorContext(videoId, liveChatId, liveInfo.ChannelId, cts); if (_monitors.TryAdd(liveChatId, context)) { // 백그라운드에서 폴링 시작 _ = Task.Run(() => PollChatMessagesAsync(context), CancellationToken.None); logger.LogInformation("[LiveChat] 모니터링 시작: videoId={VideoId}, liveChatId={LiveChatId}", videoId, liveChatId); return true; } return false; } public Task StopAsync(string liveChatId) { if (_monitors.TryRemove(liveChatId, out var context)) { context.Cts.Cancel(); context.Cts.Dispose(); logger.LogInformation("[LiveChat] 모니터링 중지: liveChatId={LiveChatId}", liveChatId); } return Task.CompletedTask; } public bool IsMonitoring(string liveChatId) => _monitors.ContainsKey(liveChatId); public IReadOnlyList GetActiveChatIds() => _monitors.Keys.ToList().AsReadOnly(); // ── BackgroundService ──────────────────────────────────────────── protected override Task ExecuteAsync(CancellationToken stoppingToken) { // 서비스 시작만 해둠 — 실제 폴링은 StartAsync 호출 시 개별 Task로 실행 logger.LogInformation("[LiveChat] YouTubeLiveChatService 준비 완료"); return Task.CompletedTask; } public override async Task StopAsync(CancellationToken cancellationToken) { logger.LogInformation("[LiveChat] 서비스 종료 — 모든 모니터 중지"); foreach (var (liveChatId, context) in _monitors) { context.Cts.Cancel(); context.Cts.Dispose(); } _monitors.Clear(); await base.StopAsync(cancellationToken); } // ── 폴링 루프 ─────────────────────────────────────────────────── private async Task PollChatMessagesAsync(ChatMonitorContext context) { var ct = context.Cts.Token; string? pageToken = null; var seenMessageIds = new HashSet(); try { while (!ct.IsCancellationRequested) { var (messages, nextPageToken, pollingIntervalMs) = await FetchMessagesAsync(context.LiveChatId, pageToken, ct); if (messages is not null) { foreach (var msg in messages) { // 중복 제거 if (!seenMessageIds.Add(msg.MessageId)) { continue; } // 메모리 절약: 최근 2000개만 유지 if (seenMessageIds.Count > 2000) { seenMessageIds.Clear(); seenMessageIds.Add(msg.MessageId); } try { if (OnMessageReceived is not null) { await OnMessageReceived.Invoke(msg); } } catch (Exception ex) { logger.LogError(ex, "[LiveChat] OnMessageReceived handler error"); } } } pageToken = nextPageToken; // pollingIntervalMillis 준수 (할당량 절약 핵심) var delay = Math.Clamp(pollingIntervalMs, 1000, MaxPollingMs); await Task.Delay(delay, ct); } } catch (OperationCanceledException) { // 정상 종료 } catch (Exception ex) { logger.LogError(ex, "[LiveChat] 폴링 루프 오류: liveChatId={LiveChatId}", context.LiveChatId); } finally { _monitors.TryRemove(context.LiveChatId, out _); logger.LogInformation("[LiveChat] 폴링 종료: liveChatId={LiveChatId}", context.LiveChatId); } } private async Task<(List? Messages, string? NextPageToken, int PollingIntervalMs)> FetchMessagesAsync(string liveChatId, string? pageToken, CancellationToken ct) { try { var client = httpClientFactory.CreateClient("YouTubeApi"); var url = $"{BaseUrl}/liveChat/messages?liveChatId={Uri.EscapeDataString(liveChatId)}&part=snippet,authorDetails&maxResults=500"; if (!string.IsNullOrEmpty(pageToken)) { url += $"&pageToken={Uri.EscapeDataString(pageToken)}"; } var response = await client.GetAsync(url, ct); if (!response.IsSuccessStatusCode) { var statusCode = (int)response.StatusCode; // 403: 채팅 종료 또는 권한 없음 if (statusCode is 403 or 404) { logger.LogInformation("[LiveChat] 채팅 종료 또는 접근 불가 ({StatusCode}): liveChatId={LiveChatId}", statusCode, liveChatId); // 모니터링 자동 중지 if (_monitors.TryRemove(liveChatId, out var ctx)) { ctx.Cts.Cancel(); } return (null, null, MaxPollingMs); } logger.LogWarning("[LiveChat] API 오류: {StatusCode}", statusCode); return (null, pageToken, DefaultPollingMs); } var json = await response.Content.ReadAsStringAsync(ct); var doc = JsonSerializer.Deserialize(json); var nextPageToken = doc.TryGetProperty("nextPageToken", out var nptProp) ? nptProp.GetString() : null; var pollingIntervalMs = doc.TryGetProperty("pollingIntervalMillis", out var piProp) ? piProp.GetInt32() : DefaultPollingMs; // offlineAt → 방송 종료 if (doc.TryGetProperty("offlineAt", out _)) { logger.LogInformation("[LiveChat] 방송 종료 감지: liveChatId={LiveChatId}", liveChatId); if (_monitors.TryRemove(liveChatId, out var ctx)) { ctx.Cts.Cancel(); } return (null, null, MaxPollingMs); } var items = doc.GetProperty("items"); var messages = new List(); foreach (var item in items.EnumerateArray()) { var msg = ParseChatMessage(item, liveChatId); if (msg is not null) { messages.Add(msg); } } return (messages, nextPageToken, pollingIntervalMs); } catch (OperationCanceledException) { throw; } catch (Exception ex) { logger.LogError(ex, "[LiveChat] FetchMessages 오류: liveChatId={LiveChatId}", liveChatId); return (null, pageToken, DefaultPollingMs); } } private static YouTubeChatMessage? ParseChatMessage(JsonElement item, string liveChatId) { try { var id = item.GetProperty("id").GetString()!; var snippet = item.GetProperty("snippet"); var author = item.GetProperty("authorDetails"); var typeStr = snippet.GetProperty("type").GetString() ?? "textMessageEvent"; var messageType = typeStr switch { "textMessageEvent" => YouTubeChatMessageType.TextMessage, "superChatEvent" => YouTubeChatMessageType.SuperChat, "superStickerEvent" => YouTubeChatMessageType.SuperSticker, "newSponsorEvent" => YouTubeChatMessageType.NewSponsor, "memberMilestoneChatEvent" => YouTubeChatMessageType.MemberMilestone, "giftMembershipEvent" => YouTubeChatMessageType.GiftMembership, _ => YouTubeChatMessageType.TextMessage }; var displayMessage = snippet.TryGetProperty("displayMessage", out var dmProp) ? dmProp.GetString() ?? "" : ""; // textMessageEvent의 경우 textMessageDetails에서 메시지 추출 if (messageType == YouTubeChatMessageType.TextMessage && snippet.TryGetProperty("textMessageDetails", out var tmd) && tmd.TryGetProperty("messageText", out var mtProp)) { displayMessage = mtProp.GetString() ?? displayMessage; } var publishedAt = snippet.TryGetProperty("publishedAt", out var paProp) && DateTime.TryParse(paProp.GetString(), out var pa) ? pa : DateTime.UtcNow; // SuperChat 금액 decimal? superChatAmount = null; string? superChatCurrency = null; if (messageType == YouTubeChatMessageType.SuperChat && snippet.TryGetProperty("superChatDetails", out var scd)) { superChatAmount = scd.TryGetProperty("amountMicros", out var amProp) && long.TryParse(amProp.GetString(), out var micros) ? micros / 1_000_000m : null; superChatCurrency = scd.TryGetProperty("currency", out var curProp) ? curProp.GetString() : null; } return new YouTubeChatMessage( id, liveChatId, author.GetProperty("channelId").GetString() ?? "", author.GetProperty("displayName").GetString() ?? "", author.TryGetProperty("profileImageUrl", out var imgProp) ? imgProp.GetString() ?? "" : "", displayMessage, publishedAt, messageType, superChatAmount, superChatCurrency ); } catch { return null; } } // ── 내부 모니터 컨텍스트 ───────────────────────────────────────── private sealed record ChatMonitorContext( string VideoId, string LiveChatId, string ChannelId, CancellationTokenSource Cts ); }