using Serilog; using System.Threading.Channels; using TriliumMind.Models; using TriliumMind.Services; using TriliumMind.Services.Mappers; namespace TriliumMind.Workers; public class AppDbWorker : BackgroundService { private readonly Serilog.ILogger _log; private readonly AppConfigs _configs; private readonly IServiceScopeFactory _scopeFactory; // Singleton services cannot directly inject Scoped services. private readonly Channel _issueChannel; private readonly int _batchSize; private readonly TimeSpan _batchTimeout; public AppDbWorker(AppConfigs configs, IServiceScopeFactory scopeFactory, Channel issueChannel) { _log = Log.ForContext(); _configs = configs; _scopeFactory = scopeFactory; _issueChannel = issueChannel; _batchSize = 50; _batchTimeout = TimeSpan.FromSeconds(5); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _log.Information("AppDbWorker started"); var batch = new List(); try { while (!stoppingToken.IsCancellationRequested) { try { using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); timeoutCts.CancelAfter(_batchTimeout); while (batch.Count < _batchSize && !stoppingToken.IsCancellationRequested) { try { if (_issueChannel.Reader.TryRead(out var issue)) { batch.Add(issue); } else { // Wait if channel is empty var readTask = _issueChannel.Reader.WaitToReadAsync(timeoutCts.Token); if (await readTask.ConfigureAwait(false)) { if (_issueChannel.Reader.TryRead(out issue)) { batch.Add(issue); } } else { break; // Channel is closed. } } } catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested && !stoppingToken.IsCancellationRequested) { break; // Timeout } } if (batch.Count > 0) { using var scope = _scopeFactory.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); var jiraIssues = batch.Select(i => i.ToEntity()).ToList(); await db.UpsertJiraIssuesBatchAsync(jiraIssues, stoppingToken); _log.Information("Processed batch of {count} Jira issues", batch.Count); batch.Clear(); } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _log.Error(ex, "Error processing batch"); batch.Clear(); await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); } } // Process any remaining issues in the batch if (batch.Count > 0) { using var scope = _scopeFactory.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); var jiraIssues = batch.Select(i => i.ToEntity()).ToList(); await db.UpsertJiraIssuesBatchAsync(jiraIssues, CancellationToken.None); _log.Information("Processed final batch of {count} Jira issues", batch.Count); } } catch (Exception ex) { _log.Error(ex, "Fatal error in JiraDbWorker"); } _log.Information("AppDbWorker stopped"); } }