using System.Collections.Concurrent;
using System.Text.Json;
using Application.Abstractions.YouTube;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Infrastructure.YouTube;
///
/// YouTube 라이브 채팅 수집 서비스
///
/// 동작 흐름:
/// 1. StartAsync(videoId) 호출 → videos.list로 liveChatId 획득 (1 unit)
/// 2. liveChatMessages.list 폴링 시작 (pollingIntervalMillis 준수)
/// 3. 새 메시지 → OnMessageReceived 이벤트 발행 → ChatHub에서 브로드캐스트
/// 4. 방송 종료 감지 → 자동 정리
///
/// 향후 gRPC streamList로 전환 시 이 클래스 내부만 교체하면 됨
/// (인터페이스 IYouTubeLiveChatService는 동일하게 유지)
///
internal sealed class YouTubeLiveChatService(
IYouTubeApiService youTubeApiService,
IHttpClientFactory httpClientFactory,
ILogger logger
) : BackgroundService, IYouTubeLiveChatService
{
private const string BaseUrl = "https://www.googleapis.com/youtube/v3";
private const int DefaultPollingMs = 10_000; // 10초 (API 응답 없으면 기본값)
private const int MaxPollingMs = 30_000; // 최대 30초
// liveChatId → CancellationTokenSource
private readonly ConcurrentDictionary _monitors = new();
public event Func? OnMessageReceived;
// ── IYouTubeLiveChatService ──────────────────────────────────────
public async Task StartAsync(string videoId, CancellationToken ct)
{
// videos.list로 liveChatId 획득 (~1 unit)
var liveInfo = await youTubeApiService.GetLiveStreamInfoAsync(videoId, ct);
if (liveInfo is null)
{
logger.LogWarning("[LiveChat] 영상 정보를 가져올 수 없음: videoId={VideoId}", videoId);
return false;
}
if (!liveInfo.IsLive || string.IsNullOrEmpty(liveInfo.ActiveLiveChatId))
{
logger.LogWarning("[LiveChat] 라이브 방송이 아니거나 채팅이 비활성: videoId={VideoId}, status={Status}",
videoId, liveInfo.LiveBroadcastContent);
return false;
}
var liveChatId = liveInfo.ActiveLiveChatId;
if (_monitors.ContainsKey(liveChatId))
{
logger.LogInformation("[LiveChat] 이미 모니터링 중: liveChatId={LiveChatId}", liveChatId);
return true;
}
var cts = new CancellationTokenSource();
var context = new ChatMonitorContext(videoId, liveChatId, liveInfo.ChannelId, cts);
if (_monitors.TryAdd(liveChatId, context))
{
// 백그라운드에서 폴링 시작
_ = Task.Run(() => PollChatMessagesAsync(context), CancellationToken.None);
logger.LogInformation("[LiveChat] 모니터링 시작: videoId={VideoId}, liveChatId={LiveChatId}", videoId, liveChatId);
return true;
}
return false;
}
public Task StopAsync(string liveChatId)
{
if (_monitors.TryRemove(liveChatId, out var context))
{
context.Cts.Cancel();
context.Cts.Dispose();
logger.LogInformation("[LiveChat] 모니터링 중지: liveChatId={LiveChatId}", liveChatId);
}
return Task.CompletedTask;
}
public bool IsMonitoring(string liveChatId) => _monitors.ContainsKey(liveChatId);
public IReadOnlyList GetActiveChatIds() => _monitors.Keys.ToList().AsReadOnly();
// ── BackgroundService ────────────────────────────────────────────
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// 서비스 시작만 해둠 — 실제 폴링은 StartAsync 호출 시 개별 Task로 실행
logger.LogInformation("[LiveChat] YouTubeLiveChatService 준비 완료");
return Task.CompletedTask;
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
logger.LogInformation("[LiveChat] 서비스 종료 — 모든 모니터 중지");
foreach (var (liveChatId, context) in _monitors)
{
context.Cts.Cancel();
context.Cts.Dispose();
}
_monitors.Clear();
await base.StopAsync(cancellationToken);
}
// ── 폴링 루프 ───────────────────────────────────────────────────
private async Task PollChatMessagesAsync(ChatMonitorContext context)
{
var ct = context.Cts.Token;
string? pageToken = null;
var seenMessageIds = new HashSet();
try
{
while (!ct.IsCancellationRequested)
{
var (messages, nextPageToken, pollingIntervalMs) = await FetchMessagesAsync(context.LiveChatId, pageToken, ct);
if (messages is not null)
{
foreach (var msg in messages)
{
// 중복 제거
if (!seenMessageIds.Add(msg.MessageId))
{
continue;
}
// 메모리 절약: 최근 2000개만 유지
if (seenMessageIds.Count > 2000)
{
seenMessageIds.Clear();
seenMessageIds.Add(msg.MessageId);
}
try
{
if (OnMessageReceived is not null)
{
await OnMessageReceived.Invoke(msg);
}
}
catch (Exception ex)
{
logger.LogError(ex, "[LiveChat] OnMessageReceived handler error");
}
}
}
pageToken = nextPageToken;
// pollingIntervalMillis 준수 (할당량 절약 핵심)
var delay = Math.Clamp(pollingIntervalMs, 1000, MaxPollingMs);
await Task.Delay(delay, ct);
}
}
catch (OperationCanceledException)
{
// 정상 종료
}
catch (Exception ex)
{
logger.LogError(ex, "[LiveChat] 폴링 루프 오류: liveChatId={LiveChatId}", context.LiveChatId);
}
finally
{
_monitors.TryRemove(context.LiveChatId, out _);
logger.LogInformation("[LiveChat] 폴링 종료: liveChatId={LiveChatId}", context.LiveChatId);
}
}
private async Task<(List? Messages, string? NextPageToken, int PollingIntervalMs)>
FetchMessagesAsync(string liveChatId, string? pageToken, CancellationToken ct)
{
try
{
var client = httpClientFactory.CreateClient("YouTubeApi");
var url = $"{BaseUrl}/liveChat/messages?liveChatId={Uri.EscapeDataString(liveChatId)}&part=snippet,authorDetails&maxResults=500";
if (!string.IsNullOrEmpty(pageToken))
{
url += $"&pageToken={Uri.EscapeDataString(pageToken)}";
}
var response = await client.GetAsync(url, ct);
if (!response.IsSuccessStatusCode)
{
var statusCode = (int)response.StatusCode;
// 403: 채팅 종료 또는 권한 없음
if (statusCode is 403 or 404)
{
logger.LogInformation("[LiveChat] 채팅 종료 또는 접근 불가 ({StatusCode}): liveChatId={LiveChatId}",
statusCode, liveChatId);
// 모니터링 자동 중지
if (_monitors.TryRemove(liveChatId, out var ctx))
{
ctx.Cts.Cancel();
}
return (null, null, MaxPollingMs);
}
logger.LogWarning("[LiveChat] API 오류: {StatusCode}", statusCode);
return (null, pageToken, DefaultPollingMs);
}
var json = await response.Content.ReadAsStringAsync(ct);
var doc = JsonSerializer.Deserialize(json);
var nextPageToken = doc.TryGetProperty("nextPageToken", out var nptProp) ? nptProp.GetString() : null;
var pollingIntervalMs = doc.TryGetProperty("pollingIntervalMillis", out var piProp) ? piProp.GetInt32() : DefaultPollingMs;
// offlineAt → 방송 종료
if (doc.TryGetProperty("offlineAt", out _))
{
logger.LogInformation("[LiveChat] 방송 종료 감지: liveChatId={LiveChatId}", liveChatId);
if (_monitors.TryRemove(liveChatId, out var ctx))
{
ctx.Cts.Cancel();
}
return (null, null, MaxPollingMs);
}
var items = doc.GetProperty("items");
var messages = new List();
foreach (var item in items.EnumerateArray())
{
var msg = ParseChatMessage(item, liveChatId);
if (msg is not null)
{
messages.Add(msg);
}
}
return (messages, nextPageToken, pollingIntervalMs);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "[LiveChat] FetchMessages 오류: liveChatId={LiveChatId}", liveChatId);
return (null, pageToken, DefaultPollingMs);
}
}
private static YouTubeChatMessage? ParseChatMessage(JsonElement item, string liveChatId)
{
try
{
var id = item.GetProperty("id").GetString()!;
var snippet = item.GetProperty("snippet");
var author = item.GetProperty("authorDetails");
var typeStr = snippet.GetProperty("type").GetString() ?? "textMessageEvent";
var messageType = typeStr switch
{
"textMessageEvent" => YouTubeChatMessageType.TextMessage,
"superChatEvent" => YouTubeChatMessageType.SuperChat,
"superStickerEvent" => YouTubeChatMessageType.SuperSticker,
"newSponsorEvent" => YouTubeChatMessageType.NewSponsor,
"memberMilestoneChatEvent" => YouTubeChatMessageType.MemberMilestone,
"giftMembershipEvent" => YouTubeChatMessageType.GiftMembership,
_ => YouTubeChatMessageType.TextMessage
};
var displayMessage = snippet.TryGetProperty("displayMessage", out var dmProp)
? dmProp.GetString() ?? ""
: "";
// textMessageEvent의 경우 textMessageDetails에서 메시지 추출
if (messageType == YouTubeChatMessageType.TextMessage
&& snippet.TryGetProperty("textMessageDetails", out var tmd)
&& tmd.TryGetProperty("messageText", out var mtProp))
{
displayMessage = mtProp.GetString() ?? displayMessage;
}
var publishedAt = snippet.TryGetProperty("publishedAt", out var paProp)
&& DateTime.TryParse(paProp.GetString(), out var pa)
? pa
: DateTime.UtcNow;
// SuperChat 금액
decimal? superChatAmount = null;
string? superChatCurrency = null;
if (messageType == YouTubeChatMessageType.SuperChat
&& snippet.TryGetProperty("superChatDetails", out var scd))
{
superChatAmount = scd.TryGetProperty("amountMicros", out var amProp)
&& long.TryParse(amProp.GetString(), out var micros)
? micros / 1_000_000m
: null;
superChatCurrency = scd.TryGetProperty("currency", out var curProp) ? curProp.GetString() : null;
}
return new YouTubeChatMessage(
id,
liveChatId,
author.GetProperty("channelId").GetString() ?? "",
author.GetProperty("displayName").GetString() ?? "",
author.TryGetProperty("profileImageUrl", out var imgProp) ? imgProp.GetString() ?? "" : "",
displayMessage,
publishedAt,
messageType,
superChatAmount,
superChatCurrency
);
}
catch
{
return null;
}
}
// ── 내부 모니터 컨텍스트 ─────────────────────────────────────────
private sealed record ChatMonitorContext(
string VideoId,
string LiveChatId,
string ChannelId,
CancellationTokenSource Cts
);
}