DB 작업용 신규 워커 추가.

This commit is contained in:
2025-12-11 14:35:27 +09:00
parent 9aaa4670ae
commit 632f834ac5
3 changed files with 167 additions and 5 deletions

View File

@@ -58,6 +58,7 @@ namespace TriliumMind
// Register Workers // Register Workers
builder.Services.AddHostedService<JiraWorker>(); builder.Services.AddHostedService<JiraWorker>();
builder.Services.AddHostedService<TriliumWorker>(); builder.Services.AddHostedService<TriliumWorker>();
builder.Services.AddHostedService<AppDbWorker>();
// Register Channels // Register Channels
builder.Services.AddSingleton(Channel.CreateUnbounded<Issue>()); builder.Services.AddSingleton(Channel.CreateUnbounded<Issue>());
@@ -69,12 +70,19 @@ namespace TriliumMind
Log.Information("Starting database migration..."); Log.Information("Starting database migration...");
using var scope = host.Services.CreateScope(); using var scope = host.Services.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<IAppDbContext>();
if(dbContext is DbContext context)
{
var created = await context.Database.EnsureCreatedAsync();
if(created)
Log.Information("Database created successfully");
else
Log.Information("Database already exists");
}
var db = scope.ServiceProvider.GetRequiredService<AppDbService>(); var dbService = scope.ServiceProvider.GetRequiredService<AppDbService>();
await db.InitializeDatabaseAsync();
var appCoinfigs = host.Services.GetRequiredService<AppConfigs>(); var appCoinfigs = host.Services.GetRequiredService<AppConfigs>();
appCoinfigs.RuntimeConfigs = await db.LoadConfigAsync(); appCoinfigs.RuntimeConfigs = await dbService.LoadConfigAsync();
Log.Information("Database initialization completed successfully"); Log.Information("Database initialization completed successfully");
} }

View File

@@ -0,0 +1,26 @@
using TriliumMind.Data.Entities;
using TriliumMind.Models;
namespace TriliumMind.Services.Mappers;
public static class JiraIssueMapper
{
public static JiraIssue ToEntity(this Issue issue)
{
return new JiraIssue
{
Key = issue.key,
Summary = issue.fields.summary ?? string.Empty,
Parent = issue.fields.parent?.key ?? issue.fields.customfield_10808,
Type = issue.fields.issuetype?.name ?? string.Empty,
Status = issue.fields.status?.description ?? string.Empty,
Assignee = issue.fields.assignee?.displayName ?? string.Empty,
Manager = issue.fields.reporter?.displayName ?? string.Empty,
Due = issue.fields.duedate?.ToUniversalTime() ?? DateTimeOffset.MinValue,
Updated = issue.fields.UpdatedAt,
Published = DateTimeOffset.MinValue,
ObjectId = null,
NeedNotify = 0
};
}
}

128
Workers/AppDbWorker.cs Normal file
View File

@@ -0,0 +1,128 @@
using Serilog;
using System.Threading.Channels;
using TriliumMind.Models;
using TriliumMind.Services;
using TriliumMind.Services.Mappers;
namespace TriliumMind.Workers;
public class AppDbWorker : BackgroundService
{
private readonly Serilog.ILogger _log;
private readonly AppConfigs _configs;
private readonly IServiceScopeFactory _scopeFactory; // Singleton services cannot directly inject Scoped services.
private readonly Channel<Issue> _issueChannel;
private readonly int _batchSize;
private readonly TimeSpan _batchTimeout;
public AppDbWorker(AppConfigs configs, IServiceScopeFactory scopeFactory, Channel<Issue> issueChannel)
{
_log = Log.ForContext<AppDbWorker>();
_configs = configs;
_scopeFactory = scopeFactory;
_issueChannel = issueChannel;
_batchSize = 50;
_batchTimeout = TimeSpan.FromSeconds(5);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_log.Information("AppDbWorker started");
var batch = new List<Issue>();
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
timeoutCts.CancelAfter(_batchTimeout);
while (batch.Count < _batchSize && !stoppingToken.IsCancellationRequested)
{
try
{
if (_issueChannel.Reader.TryRead(out var issue))
{
batch.Add(issue);
}
else
{
// Wait if channel is empty
var readTask = _issueChannel.Reader.WaitToReadAsync(timeoutCts.Token);
if (await readTask.ConfigureAwait(false))
{
if (_issueChannel.Reader.TryRead(out issue))
{
batch.Add(issue);
}
}
else
{
break; // Channel is closed.
}
}
}
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested && !stoppingToken.IsCancellationRequested)
{
break; // Timeout
}
}
if (batch.Count > 0)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbService>();
var jiraIssues = batch.Select(i => i.ToEntity()).ToList();
await db.UpsertJiraIssuesBatchAsync(jiraIssues, stoppingToken);
_log.Information("Processed batch of {count} Jira issues", batch.Count);
batch.Clear();
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_log.Error(ex, "Error processing batch");
batch.Clear();
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
}
}
// Process any remaining issues in the batch
if (batch.Count > 0)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbService>();
var jiraIssues = batch.Select(i => i.ToEntity()).ToList();
await db.UpsertJiraIssuesBatchAsync(jiraIssues, CancellationToken.None);
_log.Information("Processed final batch of {count} Jira issues", batch.Count);
}
}
catch (Exception ex)
{
_log.Error(ex, "Fatal error in JiraDbWorker");
}
//await foreach (var issue in _issueChannel.Reader.ReadAllAsync(stoppingToken))
//{
// try
// {
// var jiraIssue = issue.ToEntity();
// await _db.UpsertJiraIssueAsync(jiraIssue, stoppingToken);
// _log.Debug("Processed Jira issue: {key}", issue.key);
// }
// catch (Exception ex)
// {
// _log.Error(ex, "Failed to process Jira issue: {key}", issue.key);
// }
//}
_log.Information("AppDbWorker stopped");
}
}