diff --git a/Program.cs b/Program.cs index db3929b..7424870 100644 --- a/Program.cs +++ b/Program.cs @@ -58,6 +58,7 @@ namespace TriliumMind // Register Workers builder.Services.AddHostedService(); builder.Services.AddHostedService(); + builder.Services.AddHostedService(); // Register Channels builder.Services.AddSingleton(Channel.CreateUnbounded()); @@ -69,12 +70,19 @@ namespace TriliumMind Log.Information("Starting database migration..."); using var scope = host.Services.CreateScope(); - - var db = scope.ServiceProvider.GetRequiredService(); - await db.InitializeDatabaseAsync(); - + var dbContext = scope.ServiceProvider.GetRequiredService(); + if(dbContext is DbContext context) + { + var created = await context.Database.EnsureCreatedAsync(); + if(created) + Log.Information("Database created successfully"); + else + Log.Information("Database already exists"); + } + + var dbService = scope.ServiceProvider.GetRequiredService(); var appCoinfigs = host.Services.GetRequiredService(); - appCoinfigs.RuntimeConfigs = await db.LoadConfigAsync(); + appCoinfigs.RuntimeConfigs = await dbService.LoadConfigAsync(); Log.Information("Database initialization completed successfully"); } diff --git a/Services/Mappers/JiraIssueMapper.cs b/Services/Mappers/JiraIssueMapper.cs new file mode 100644 index 0000000..5a4100a --- /dev/null +++ b/Services/Mappers/JiraIssueMapper.cs @@ -0,0 +1,26 @@ +using TriliumMind.Data.Entities; +using TriliumMind.Models; + +namespace TriliumMind.Services.Mappers; + +public static class JiraIssueMapper +{ + public static JiraIssue ToEntity(this Issue issue) + { + return new JiraIssue + { + Key = issue.key, + Summary = issue.fields.summary ?? string.Empty, + Parent = issue.fields.parent?.key ?? issue.fields.customfield_10808, + Type = issue.fields.issuetype?.name ?? string.Empty, + Status = issue.fields.status?.description ?? string.Empty, + Assignee = issue.fields.assignee?.displayName ?? string.Empty, + Manager = issue.fields.reporter?.displayName ?? string.Empty, + Due = issue.fields.duedate?.ToUniversalTime() ?? DateTimeOffset.MinValue, + Updated = issue.fields.UpdatedAt, + Published = DateTimeOffset.MinValue, + ObjectId = null, + NeedNotify = 0 + }; + } +} \ No newline at end of file diff --git a/Workers/AppDbWorker.cs b/Workers/AppDbWorker.cs new file mode 100644 index 0000000..04e7955 --- /dev/null +++ b/Workers/AppDbWorker.cs @@ -0,0 +1,128 @@ +using Serilog; +using System.Threading.Channels; +using TriliumMind.Models; +using TriliumMind.Services; +using TriliumMind.Services.Mappers; + +namespace TriliumMind.Workers; + +public class AppDbWorker : BackgroundService +{ + private readonly Serilog.ILogger _log; + private readonly AppConfigs _configs; + private readonly IServiceScopeFactory _scopeFactory; // Singleton services cannot directly inject Scoped services. + private readonly Channel _issueChannel; + + private readonly int _batchSize; + private readonly TimeSpan _batchTimeout; + + public AppDbWorker(AppConfigs configs, IServiceScopeFactory scopeFactory, Channel issueChannel) + { + _log = Log.ForContext(); + _configs = configs; + _scopeFactory = scopeFactory; + _issueChannel = issueChannel; + + _batchSize = 50; + _batchTimeout = TimeSpan.FromSeconds(5); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _log.Information("AppDbWorker started"); + + var batch = new List(); + try + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + timeoutCts.CancelAfter(_batchTimeout); + + while (batch.Count < _batchSize && !stoppingToken.IsCancellationRequested) + { + try + { + if (_issueChannel.Reader.TryRead(out var issue)) + { + batch.Add(issue); + } + else + { + // Wait if channel is empty + var readTask = _issueChannel.Reader.WaitToReadAsync(timeoutCts.Token); + if (await readTask.ConfigureAwait(false)) + { + if (_issueChannel.Reader.TryRead(out issue)) + { + batch.Add(issue); + } + } + else + { + break; // Channel is closed. + } + } + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested && !stoppingToken.IsCancellationRequested) + { + break; // Timeout + } + } + + if (batch.Count > 0) + { + using var scope = _scopeFactory.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var jiraIssues = batch.Select(i => i.ToEntity()).ToList(); + await db.UpsertJiraIssuesBatchAsync(jiraIssues, stoppingToken); + + _log.Information("Processed batch of {count} Jira issues", batch.Count); + batch.Clear(); + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _log.Error(ex, "Error processing batch"); + batch.Clear(); + await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); + } + } + + // Process any remaining issues in the batch + if (batch.Count > 0) + { + using var scope = _scopeFactory.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var jiraIssues = batch.Select(i => i.ToEntity()).ToList(); + await db.UpsertJiraIssuesBatchAsync(jiraIssues, CancellationToken.None); + _log.Information("Processed final batch of {count} Jira issues", batch.Count); + } + } + catch (Exception ex) + { + _log.Error(ex, "Fatal error in JiraDbWorker"); + } + + //await foreach (var issue in _issueChannel.Reader.ReadAllAsync(stoppingToken)) + //{ + // try + // { + // var jiraIssue = issue.ToEntity(); + // await _db.UpsertJiraIssueAsync(jiraIssue, stoppingToken); + // _log.Debug("Processed Jira issue: {key}", issue.key); + // } + // catch (Exception ex) + // { + // _log.Error(ex, "Failed to process Jira issue: {key}", issue.key); + // } + //} + _log.Information("AppDbWorker stopped"); + } +}