outbox checkpoint
This commit is contained in:
@@ -0,0 +1,78 @@
|
||||
using System.Reflection;
|
||||
using System.Text.Json;
|
||||
using Mccn.Common.Application.EventBus;
|
||||
using Mccn.Common.Domain.Abstractions;
|
||||
using Mccn.Common.Infrastructure.Outbox;
|
||||
using Mccn.Modules.Users.Domain.Users;
|
||||
using Mccn.Modules.Users.Infrastructure.Database;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Quartz;
|
||||
|
||||
namespace Mccn.Modules.Users.Infrastructure.Outbox;
|
||||
|
||||
[DisallowConcurrentExecution]
|
||||
internal sealed class ProcessOutboxJob(
|
||||
UsersDbContext dbContext,
|
||||
IServiceScopeFactory serviceScopeFactory,
|
||||
ILogger<ProcessOutboxJob> logger) : IJob
|
||||
{
|
||||
private static readonly Assembly DomainAssembly = typeof(User).Assembly;
|
||||
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
List<OutboxMessage> messages = await dbContext.OutboxMessages
|
||||
.Where(m => m.ProcessedOnUtc == null)
|
||||
.OrderBy(m => m.OccurredOnUtc)
|
||||
.Take(20)
|
||||
.ToListAsync(context.CancellationToken);
|
||||
|
||||
foreach (OutboxMessage message in messages)
|
||||
{
|
||||
try
|
||||
{
|
||||
Type? eventType = DomainAssembly.GetTypes()
|
||||
.FirstOrDefault(t => t.Name == message.Type);
|
||||
|
||||
if (eventType is null)
|
||||
{
|
||||
logger.LogWarning("Could not find domain event type {Type}", message.Type);
|
||||
message.ProcessedOnUtc = DateTime.UtcNow;
|
||||
message.Error = $"Unknown domain event type: {message.Type}";
|
||||
continue;
|
||||
}
|
||||
|
||||
IDomainEvent? domainEvent = (IDomainEvent?)JsonSerializer.Deserialize(message.Content, eventType);
|
||||
|
||||
if (domainEvent is null)
|
||||
{
|
||||
message.ProcessedOnUtc = DateTime.UtcNow;
|
||||
message.Error = "Failed to deserialize domain event.";
|
||||
continue;
|
||||
}
|
||||
|
||||
using IServiceScope scope = serviceScopeFactory.CreateScope();
|
||||
|
||||
Type handlerInterfaceType = typeof(IDomainEventHandler<>).MakeGenericType(eventType);
|
||||
|
||||
IEnumerable<IDomainEventHandler> handlers = scope.ServiceProvider
|
||||
.GetServices(handlerInterfaceType)
|
||||
.Cast<IDomainEventHandler>();
|
||||
|
||||
foreach (IDomainEventHandler handler in handlers)
|
||||
await handler.Handle(domainEvent, context.CancellationToken);
|
||||
|
||||
message.ProcessedOnUtc = DateTime.UtcNow;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error processing outbox message {MessageId}", message.Id);
|
||||
message.Error = ex.Message;
|
||||
message.ProcessedOnUtc = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
|
||||
await dbContext.SaveChangesAsync(context.CancellationToken);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user