Compare commits
3 Commits
057481803f
...
632f834ac5
| Author | SHA1 | Date | |
|---|---|---|---|
| 632f834ac5 | |||
| 9aaa4670ae | |||
| e17889748a |
16
Program.cs
16
Program.cs
@@ -58,6 +58,7 @@ namespace TriliumMind
|
||||
// Register Workers
|
||||
builder.Services.AddHostedService<JiraWorker>();
|
||||
builder.Services.AddHostedService<TriliumWorker>();
|
||||
builder.Services.AddHostedService<AppDbWorker>();
|
||||
|
||||
// Register Channels
|
||||
builder.Services.AddSingleton(Channel.CreateUnbounded<Issue>());
|
||||
@@ -69,12 +70,19 @@ namespace TriliumMind
|
||||
Log.Information("Starting database migration...");
|
||||
|
||||
using var scope = host.Services.CreateScope();
|
||||
var dbContext = scope.ServiceProvider.GetRequiredService<IAppDbContext>();
|
||||
if(dbContext is DbContext context)
|
||||
{
|
||||
var created = await context.Database.EnsureCreatedAsync();
|
||||
if(created)
|
||||
Log.Information("Database created successfully");
|
||||
else
|
||||
Log.Information("Database already exists");
|
||||
}
|
||||
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDbService>();
|
||||
await db.InitializeDatabaseAsync();
|
||||
|
||||
var dbService = scope.ServiceProvider.GetRequiredService<AppDbService>();
|
||||
var appCoinfigs = host.Services.GetRequiredService<AppConfigs>();
|
||||
appCoinfigs.RuntimeConfigs = await db.LoadConfigAsync();
|
||||
appCoinfigs.RuntimeConfigs = await dbService.LoadConfigAsync();
|
||||
|
||||
Log.Information("Database initialization completed successfully");
|
||||
}
|
||||
|
||||
@@ -31,15 +31,6 @@ public class AppDbService
|
||||
{
|
||||
_log.Information("Database already exists");
|
||||
}
|
||||
|
||||
// Migration
|
||||
var pendingMigrations = await _db.Database.GetPendingMigrationsAsync();
|
||||
if (pendingMigrations.Any())
|
||||
{
|
||||
_log.Information("Applying pending migrations...");
|
||||
await _db.Database.MigrateAsync();
|
||||
_log.Information("Migrations applied successfully");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -48,6 +39,7 @@ public class AppDbService
|
||||
}
|
||||
}
|
||||
|
||||
#region Table: Configs
|
||||
public async Task<Dictionary<string, string>> LoadConfigAsync()
|
||||
{
|
||||
try
|
||||
@@ -100,6 +92,105 @@ public class AppDbService
|
||||
throw;
|
||||
}
|
||||
}
|
||||
#endregion Table: Configs
|
||||
|
||||
|
||||
public async Task UpsertJiraIssueAsync(JiraIssue issue, CancellationToken ct = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var existing = await _db.JiraIssues.FindAsync([issue.Key], ct);
|
||||
|
||||
if (existing == null)
|
||||
{
|
||||
_db.JiraIssues.Add(issue);
|
||||
_log.Debug("Inserting new Jira issue: {key}", issue.Key);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (issue.Updated > existing.Updated)
|
||||
{
|
||||
existing.Summary = issue.Summary;
|
||||
existing.Parent = issue.Parent;
|
||||
existing.Type = issue.Type;
|
||||
existing.Status = issue.Status;
|
||||
existing.Assignee = issue.Assignee;
|
||||
existing.Manager = issue.Manager;
|
||||
existing.Due = issue.Due;
|
||||
existing.Updated = issue.Updated;
|
||||
existing.ObjectId = issue.ObjectId;
|
||||
existing.NeedNotify = issue.NeedNotify;
|
||||
|
||||
_db.JiraIssues.Update(existing);
|
||||
_log.Debug("Updating existing Jira issue: {key}", issue.Key);
|
||||
}
|
||||
else
|
||||
{
|
||||
_log.Debug("Skipping Jira issue (not newer): {key}", issue.Key);
|
||||
}
|
||||
}
|
||||
|
||||
await _db.SaveChangesAsync(ct);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Error(ex, "Failed to upsert Jira issue: {key}", issue.Key);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task UpsertJiraIssuesBatchAsync(IEnumerable<JiraIssue> issues, CancellationToken ct = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
var issueList = issues.ToList();
|
||||
if (!issueList.Any())
|
||||
return;
|
||||
|
||||
var keys = issueList.Select(i => i.Key).ToList();
|
||||
var existingIssues = await _db.JiraIssues
|
||||
.Where(ji => keys.Contains(ji.Key))
|
||||
.ToDictionaryAsync(ji => ji.Key, ct);
|
||||
|
||||
int insertCount = 0, updateCount = 0, skipCount = 0;
|
||||
|
||||
foreach (var issue in issueList)
|
||||
{
|
||||
if (!existingIssues.TryGetValue(issue.Key, out var existing))
|
||||
{
|
||||
_db.JiraIssues.Add(issue);
|
||||
insertCount++;
|
||||
}
|
||||
else if (issue.Updated > existing.Updated)
|
||||
{
|
||||
existing.Summary = issue.Summary;
|
||||
existing.Parent = issue.Parent;
|
||||
existing.Type = issue.Type;
|
||||
existing.Status = issue.Status;
|
||||
existing.Assignee = issue.Assignee;
|
||||
existing.Manager = issue.Manager;
|
||||
existing.Due = issue.Due;
|
||||
existing.Updated = issue.Updated;
|
||||
existing.ObjectId = issue.ObjectId;
|
||||
existing.NeedNotify = issue.NeedNotify;
|
||||
|
||||
_db.JiraIssues.Update(existing);
|
||||
updateCount++;
|
||||
}
|
||||
else
|
||||
{
|
||||
skipCount++;
|
||||
}
|
||||
}
|
||||
|
||||
await _db.SaveChangesAsync(ct);
|
||||
_log.Information("Batch completed: {insert} inserted, {update} updated, {skip} skipped out of {total} issues",
|
||||
insertCount, updateCount, skipCount, issueList.Count);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Error(ex, "Failed to batch upsert Jira issues");
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
26
Services/Mappers/JiraIssueMapper.cs
Normal file
26
Services/Mappers/JiraIssueMapper.cs
Normal file
@@ -0,0 +1,26 @@
|
||||
using TriliumMind.Data.Entities;
|
||||
using TriliumMind.Models;
|
||||
|
||||
namespace TriliumMind.Services.Mappers;
|
||||
|
||||
public static class JiraIssueMapper
|
||||
{
|
||||
public static JiraIssue ToEntity(this Issue issue)
|
||||
{
|
||||
return new JiraIssue
|
||||
{
|
||||
Key = issue.key,
|
||||
Summary = issue.fields.summary ?? string.Empty,
|
||||
Parent = issue.fields.parent?.key ?? issue.fields.customfield_10808,
|
||||
Type = issue.fields.issuetype?.name ?? string.Empty,
|
||||
Status = issue.fields.status?.description ?? string.Empty,
|
||||
Assignee = issue.fields.assignee?.displayName ?? string.Empty,
|
||||
Manager = issue.fields.reporter?.displayName ?? string.Empty,
|
||||
Due = issue.fields.duedate?.ToUniversalTime() ?? DateTimeOffset.MinValue,
|
||||
Updated = issue.fields.UpdatedAt,
|
||||
Published = DateTimeOffset.MinValue,
|
||||
ObjectId = null,
|
||||
NeedNotify = 0
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,13 @@
|
||||
<UserSecretsId>dotnet-TriliumMind-bf69239d-751f-426d-afd4-3f66bcf0dc42</UserSecretsId>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Remove="publish\**" />
|
||||
<Content Remove="publish\**" />
|
||||
<EmbeddedResource Remove="publish\**" />
|
||||
<None Remove="publish\**" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="10.0.0" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" Version="10.0.0" />
|
||||
|
||||
128
Workers/AppDbWorker.cs
Normal file
128
Workers/AppDbWorker.cs
Normal file
@@ -0,0 +1,128 @@
|
||||
using Serilog;
|
||||
using System.Threading.Channels;
|
||||
using TriliumMind.Models;
|
||||
using TriliumMind.Services;
|
||||
using TriliumMind.Services.Mappers;
|
||||
|
||||
namespace TriliumMind.Workers;
|
||||
|
||||
public class AppDbWorker : BackgroundService
|
||||
{
|
||||
private readonly Serilog.ILogger _log;
|
||||
private readonly AppConfigs _configs;
|
||||
private readonly IServiceScopeFactory _scopeFactory; // Singleton services cannot directly inject Scoped services.
|
||||
private readonly Channel<Issue> _issueChannel;
|
||||
|
||||
private readonly int _batchSize;
|
||||
private readonly TimeSpan _batchTimeout;
|
||||
|
||||
public AppDbWorker(AppConfigs configs, IServiceScopeFactory scopeFactory, Channel<Issue> issueChannel)
|
||||
{
|
||||
_log = Log.ForContext<AppDbWorker>();
|
||||
_configs = configs;
|
||||
_scopeFactory = scopeFactory;
|
||||
_issueChannel = issueChannel;
|
||||
|
||||
_batchSize = 50;
|
||||
_batchTimeout = TimeSpan.FromSeconds(5);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_log.Information("AppDbWorker started");
|
||||
|
||||
var batch = new List<Issue>();
|
||||
try
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
||||
timeoutCts.CancelAfter(_batchTimeout);
|
||||
|
||||
while (batch.Count < _batchSize && !stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_issueChannel.Reader.TryRead(out var issue))
|
||||
{
|
||||
batch.Add(issue);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Wait if channel is empty
|
||||
var readTask = _issueChannel.Reader.WaitToReadAsync(timeoutCts.Token);
|
||||
if (await readTask.ConfigureAwait(false))
|
||||
{
|
||||
if (_issueChannel.Reader.TryRead(out issue))
|
||||
{
|
||||
batch.Add(issue);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
break; // Channel is closed.
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested && !stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break; // Timeout
|
||||
}
|
||||
}
|
||||
|
||||
if (batch.Count > 0)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDbService>();
|
||||
var jiraIssues = batch.Select(i => i.ToEntity()).ToList();
|
||||
await db.UpsertJiraIssuesBatchAsync(jiraIssues, stoppingToken);
|
||||
|
||||
_log.Information("Processed batch of {count} Jira issues", batch.Count);
|
||||
batch.Clear();
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Error(ex, "Error processing batch");
|
||||
batch.Clear();
|
||||
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
|
||||
}
|
||||
}
|
||||
|
||||
// Process any remaining issues in the batch
|
||||
if (batch.Count > 0)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDbService>();
|
||||
var jiraIssues = batch.Select(i => i.ToEntity()).ToList();
|
||||
await db.UpsertJiraIssuesBatchAsync(jiraIssues, CancellationToken.None);
|
||||
_log.Information("Processed final batch of {count} Jira issues", batch.Count);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_log.Error(ex, "Fatal error in JiraDbWorker");
|
||||
}
|
||||
|
||||
//await foreach (var issue in _issueChannel.Reader.ReadAllAsync(stoppingToken))
|
||||
//{
|
||||
// try
|
||||
// {
|
||||
// var jiraIssue = issue.ToEntity();
|
||||
// await _db.UpsertJiraIssueAsync(jiraIssue, stoppingToken);
|
||||
// _log.Debug("Processed Jira issue: {key}", issue.key);
|
||||
// }
|
||||
// catch (Exception ex)
|
||||
// {
|
||||
// _log.Error(ex, "Failed to process Jira issue: {key}", issue.key);
|
||||
// }
|
||||
//}
|
||||
_log.Information("AppDbWorker stopped");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user