Base solution for your next web application
Open Closed

How to Avoid Event Handlers Blocking Function Completion #12290


User avatar
0
[email protected] created

Dear Team,

We are currently experiencing an issue where the time taken to complete a request is being impacted by the completion of event handlers. Specifically, this occurs in the context of our GetAllShipments() function in the Shipments module.

Environment Details:

Product Version: v10.5.0 Product Type: MVC Framework: .NET Core

Scenario:
  • We have an entity called Shipment.
  • Event handlers are triggered on the creation and update events of this entity.
  • When a call is made to the GetAllShipments() function, it initiates the creation and update of another entity through these event handlers.
  • This process is causing a delay in the completion of the GetAllShipments() function, as the function appears to be waiting for the event handler execution to finish.

We want the GetAllShipments() function to complete independently and asynchronously, without being blocked by the operations performed by the event handlers. To provide additional context, I have attached the relevant code snippets of our AppService and Event Handler implementations.

Questions:
  1. How can we configure our system to ensure the GetAllShipments() function executes independently of the event handler's processing?
  2. Are there best practices or architectural improvements we can apply to decouple the event handler execution from the main function's request lifecycle?
  3. Could implementing a queue-based system or utilizing Hangfire for background processing resolve this issue effectively?

Your guidance on resolving this matter would be greatly appreciated.

Best regards,

[AbpAuthorize(AppPermissions.Pages_Shipments, AppPermissions.Pages_Shipment_Operations)]
    public class ShipmentsAppService : EnterpriseBaseAppServiceBase, IShipmentsAppService
    {
        private readonly IRepository<Shipment> _shipmentRepository;
        private readonly IRepository<Machine, int> _lookup_machineRepository;
        

        public ShipmentsAppService(IRepository<Shipment> shipmentRepository, IRepository<Machine, int> lookup_machineRepository)
        {
            _shipmentRepository = shipmentRepository;
            _lookup_machineRepository = lookup_machineRepository;
            
        }

        public async Task<PagedResultDto<GetShipmentForViewDto>> GetAllShipments(GetAllShipmentsInput input)
        {
            using ((CurrentUnitOfWork.GetTenantId() == null) ? CurrentUnitOfWork.DisableFilter(AbpDataFilters.MayHaveTenant) : CurrentUnitOfWork.EnableFilter(AbpDataFilters.MayHaveTenant))
            {
				
				// 1) fetch all shipments.
				
                // 2) check some business logic and decide to create a new record or not.
				
				// 3) create a new shipment record.
				
				// 4) update the newly created shipment record checking some business logic
				
				// 5) return the newly created shipment record.

                
            }
        }
   }
using Abp.Dependency;
using Abp.Domain.Repositories;
using Abp.Domain.Uow;
using Abp.Events.Bus.Entities;
using Abp.Events.Bus.Handlers;
using EnterpriseBase.Devices.Dtos;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace EnterpriseBase.Devices.ShipmentReportEvent
{

    class ShipmentReportEventHandler : EnterpriseBaseServiceBase,
        IEventHandler<EntityCreatedEventData<Shipment>>,
        IEventHandler<EntityUpdatedEventData<Shipment>>,
        ITransientDependency

    {
        private readonly IRepository<ShipmentReport> _shipmentReportRepository;
        private readonly IUnitOfWorkManager _unitOfWorkManager;
        private readonly IRepository<Machine, int> _lookup_machineRepository;
        private readonly IRepository<MachineLockerShipmentOccupancy, Guid> _machineLockerShipmentOccupancyRepository;
        private readonly IRepository<Shipment> _shipmentRepository;
        public ShipmentReportEventHandler(
          IRepository<ShipmentReport> shipmentReportRepository,
          IUnitOfWorkManager unitOfWorkManager,
          IRepository<Machine, int> lookup_machineRepository,
          IRepository<Shipment> shipmentRepository)
        {
            _shipmentReportRepository = shipmentReportRepository;
            _unitOfWorkManager = unitOfWorkManager;
            _lookup_machineRepository = lookup_machineRepository;
            _shipmentRepository = shipmentRepository;
        }

        public void HandleEvent(EntityCreatedEventData<Shipment> eventData)
        {
            Logger.Info("HandleEvent ,EntityCreatedEventData()");
            CreateShipmentReportRecordFromShipment(eventData.Entity);
        }
        public void HandleEvent(EntityUpdatedEventData<Shipment> eventData)
        {
            Logger.Info("HandleEvent ,EntityUpdatedEventData()");
            UpdateShipmentReportRecordFromShipment(eventData.Entity);
        }
        private ShipmentReport GetCurrentShipmentReport(int Id)
        {
            return _shipmentReportRepository.GetAll()
                      .Where(e => e.ShipmentId == Id).FirstOrDefault();
        }


        private string GetDuration(ShipmentReport prevShipment, Shipment currentShipment, string Duration)
        {
            try
            {
                Logger.Info("GetDuration()" + "ShipmentReport Status=" + prevShipment.Status + ",currentShipment Status=" + currentShipment.Status + ", Exsisting Duration=" + Duration);
                int previousDroppedDuration = ConvertDurationToMinutes(Duration);
                Logger.Info("GetDuration() output=" + getForamtedDuration(previousDroppedDuration));
                return getForamtedDuration(previousDroppedDuration);

                string getForamtedDuration(int previousDuration)
                {
                    DateTime prevTime = prevShipment.LastModificationTime == null ? (DateTime)prevShipment.CreationTime : (DateTime)prevShipment.LastModificationTime;
                    DateTime currentTime = currentShipment.LastModificationTime == null ? (DateTime)currentShipment.CreationTime : (DateTime)currentShipment.LastModificationTime;
                    var diffrent = decimal.Round((decimal)(currentTime - prevTime).TotalMinutes, 2, MidpointRounding.AwayFromZero);
                    diffrent = diffrent + previousDuration;
                    return ((int)diffrent / 1440).ToString() + " Days , " + ((int)(diffrent / 60) % 24).ToString("00") + " : " + ((int)diffrent % 60).ToString("00");

                }
            }
            catch (Exception ex)
            {
                Logger.Error("Error GetDuration() " + ex.Message + " StackTrace " + ex.StackTrace);
                return "";
            }

        }

        int ConvertDurationToMinutes(string duration)
        {
            try
            {
                Logger.Info("ConvertDurationToMinutes(duration) duration=" + duration);
                if (string.IsNullOrWhiteSpace(duration))
                    return 0;
                string[] parts = duration.Split(new char[] { ' ', ',', ':' }, StringSplitOptions.RemoveEmptyEntries);
                if (parts.Length < 4)
                    throw new ArgumentException("Invalid duration format.");
                int days = int.Parse(parts[0]);
                int hours = int.Parse(parts[2]);
                int minutes = int.Parse(parts[3]);
                int totalMinutes = days * 24 * 60 + hours * 60 + minutes;

                return totalMinutes;
            }
            catch (Exception ex)
            {
                Logger.Error("Error ConvertDurationToMinutes() " + ex.Message + " StackTrace " + ex.StackTrace);
                return 0;
            }
        }
        public void CreateShipmentReportRecordFromShipment(Shipment shipment)
        {
            Logger.Info("CreateShipmentReportRecordFromShipment payload" + Newtonsoft.Json.JsonConvert.SerializeObject(shipment));
            try
            {
                ShipmentReport CurrentShipmentReport;
                using (var uow = UnitOfWorkManager.Begin())
                {
                    CurrentShipmentReport = GetCurrentShipmentReport(shipment.Id);
                    Logger.Info("CurrentShipmentReport = " + Newtonsoft.Json.JsonConvert.SerializeObject(CurrentShipmentReport));
                    if (CurrentShipmentReport == null)
                    {
                        var shipmentrecord = new ShipmentReport()
                        {
                            ShipmentId = shipment.Id,
                            TenantId = shipment.TenantId != null ? shipment.TenantId.Value : null,
                            AWB = shipment.AWB,
                            Status = shipment.Status,
                            LastStatusChange = shipment.CreationTime,
                            MachineId = shipment.MachineId,
                            MobileNo = shipment.MobileNo,
                            DroppedTime = shipment.Status == ShipmentStatus.Dropped ? shipment.LastModificationTime : null,
                            DroppedDuration = "",
                            ExpiryTime = shipment.Status == ShipmentStatus.Expired ? shipment.LastModificationTime : null,
                            ExpiryDuration = "",
                            PickedUpTime = shipment.Status == ShipmentStatus.PickedUp ? shipment.LastModificationTime : null,
                            PickedUpDuration = "",
                            ForceExpiredTime = shipment.Status == ShipmentStatus.ForceExpired ? shipment.LastModificationTime : null,
                            ForceExpiredDuration = ""
                        };
                        var i = _shipmentReportRepository.InsertAsync(shipmentrecord);
                        CurrentUnitOfWork.SaveChanges();
                    }
                    uow.Complete();
                }
            }
            catch (Exception ex)
            {
                Logger.Error("Error CreateShipmentReportRecordFromShipment() " + ex.Message + " StackTrace " + ex.StackTrace);
                // throw ex;
            }
        }


        public void UpdateShipmentReportRecordFromShipment(Shipment shipment)
        {
            Logger.Info("UpdateShipmentReportRecordFromShipment payload" + Newtonsoft.Json.JsonConvert.SerializeObject(shipment));
            try
            {
                ShipmentReport CurrentShipmentReport;
                using (var uow = UnitOfWorkManager.Begin())
                {
                    CurrentShipmentReport = GetCurrentShipmentReport(shipment.Id);
                    Logger.Info("CurrentShipmentReport = " + Newtonsoft.Json.JsonConvert.SerializeObject(CurrentShipmentReport));
                    if (CurrentShipmentReport != null)
                    {
                        CurrentShipmentReport.AWB = shipment.AWB;
                        CurrentShipmentReport.TenantId = shipment.TenantId != null ? shipment.TenantId.Value : null;
                        CurrentShipmentReport.MachineId = shipment.MachineId;
                        CurrentShipmentReport.PickedUpUntil = shipment.PickupUntil;


                        switch (shipment.Status)
                        {
                            case ShipmentStatus.Dropped:
                                CurrentShipmentReport.DroppedTime = shipment.LastModificationTime;
                                break;
                            case ShipmentStatus.Expired:
                                CurrentShipmentReport.ExpiryTime = shipment.LastModificationTime;
                                break;
                            case ShipmentStatus.PickedUp:
                            case ShipmentStatus.IntransitBack:// Treat InTransitOut like PickedUp
                                CurrentShipmentReport.PickedUpTime = shipment.LastModificationTime;
                                break;
                            case ShipmentStatus.ForceExpired:
                                CurrentShipmentReport.ForceExpiredTime = shipment.LastModificationTime;
                                break;
                        }
                        switch (CurrentShipmentReport.Status)
                        {
                            case ShipmentStatus.Dropped:
                                CurrentShipmentReport.DroppedDuration = GetDuration(CurrentShipmentReport, shipment, CurrentShipmentReport.DroppedDuration);
                                break;
                            case ShipmentStatus.Expired:
                                CurrentShipmentReport.ExpiryDuration = GetDuration(CurrentShipmentReport, shipment, CurrentShipmentReport.ExpiryDuration);
                                break;
                            case ShipmentStatus.PickedUp:
                            case ShipmentStatus.IntransitBack:// Treat IntransitBack like PickedUp
                                CurrentShipmentReport.PickedUpDuration = GetDuration(CurrentShipmentReport, shipment, CurrentShipmentReport.PickedUpDuration);
                                break;
                            case ShipmentStatus.ForceExpired:
                                CurrentShipmentReport.ForceExpiredDuration = GetDuration(CurrentShipmentReport, shipment, CurrentShipmentReport.ForceExpiredDuration);
                                break;
                        }
                        CurrentShipmentReport.LastStatusChange = CurrentShipmentReport.Status != shipment.Status ? (DateTime)shipment.LastModificationTime : CurrentShipmentReport.LastStatusChange;
                        CurrentShipmentReport.Status = shipment.Status;

                    }
                    else
                    {
                        Logger.Info("UpdateShipmentReportRecordFromShipment() Call CreateShipmentReportRecordFromShipment = " + Newtonsoft.Json.JsonConvert.SerializeObject(shipment));
                        CreateShipmentReportRecordFromShipment(shipment);
                    }
                    CurrentUnitOfWork.SaveChanges();
                    uow.Complete();
                }
            }
            catch (Exception ex)
            {
                Logger.Error("Error UpdateShipmentReportRecordFromShipment() " + ex.Message + " StackTrace " + ex.StackTrace);
                // throw ex;
            }
        }
    }


}
![Logs for eventhandler blocking the request.jpeg](/QA/files/cffb2f390b275dca6f463a16e1db6662.jpeg)```

1 Answer(s)
  • User Avatar
    0
    oguzhanagir created
    Support Team

    Hi @mnihal

    • Use Background Jobs (Hangfire or ABP Background Jobs) The simplest and most effective solution is to offload the event handler logic to background jobs so that it executes asynchronously without blocking the main thread.

    Related Document Related Document

    Example Code:

    using Abp.BackgroundJobs;
    using Abp.Dependency;
    using Abp.Events.Bus.Handlers;
    
    public class ShipmentReportEventHandler : EnterpriseBaseServiceBase, 
        IEventHandler<EntityCreatedEventData<Shipment>>, 
        IEventHandler<EntityUpdatedEventData<Shipment>>, 
        ITransientDependency
    {
        private readonly IBackgroundJobManager _backgroundJobManager;
    
        public ShipmentReportEventHandler(IBackgroundJobManager backgroundJobManager)
        {
            _backgroundJobManager = backgroundJobManager;
        }
    
        public void HandleEvent(EntityCreatedEventData<Shipment> eventData)
        {
            // Enqueue the background job for creating the report
            _backgroundJobManager.Enqueue<CreateShipmentReportJob, ShipmentReportJobArgs>(
                new ShipmentReportJobArgs { ShipmentId = eventData.Entity.Id }
            );
        }
    
        public void HandleEvent(EntityUpdatedEventData<Shipment> eventData)
        {
            // Enqueue the background job for updating the report
            _backgroundJobManager.Enqueue<UpdateShipmentReportJob, ShipmentReportJobArgs>(
                new ShipmentReportJobArgs { ShipmentId = eventData.Entity.Id }
            );
        }
    }
    
    using Abp.BackgroundJobs;
    using Abp.Dependency;
    
    public class CreateShipmentReportJob : BackgroundJob<ShipmentReportJobArgs>, ITransientDependency
    {
        private readonly IRepository<ShipmentReport> _shipmentReportRepository;
    
        public CreateShipmentReportJob(IRepository<ShipmentReport> shipmentReportRepository)
        {
            _shipmentReportRepository = shipmentReportRepository;
        }
    
        public override void Execute(ShipmentReportJobArgs args)
        {
            var shipmentReport = new ShipmentReport
            {
                ShipmentId = args.ShipmentId,
                // Map other properties as needed
            };
            _shipmentReportRepository.Insert(shipmentReport);
        }
    }
    
    public class UpdateShipmentReportJob : BackgroundJob<ShipmentReportJobArgs>, ITransientDependency
    {
        private readonly IRepository<ShipmentReport> _shipmentReportRepository;
    
        public UpdateShipmentReportJob(IRepository<ShipmentReport> shipmentReportRepository)
        {
            _shipmentReportRepository = shipmentReportRepository;
        }
    
        public override void Execute(ShipmentReportJobArgs args)
        {
            var shipmentReport = _shipmentReportRepository.FirstOrDefault(s => s.ShipmentId == args.ShipmentId);
            if (shipmentReport != null)
            {
                // Update report logic here
                _shipmentReportRepository.Update(shipmentReport);
            }
        }
    }
    
    public class ShipmentReportJobArgs
    {
        public int ShipmentId { get; set; }
    }
    
    • You can configure your domain event handlers to execute asynchronously. Update your event handler methods to return Task and make them async:

    Example Code:

    public async Task HandleEventAsync(EntityCreatedEventData<Shipment> eventData)
    {
        await Task.Run(() => CreateShipmentReportRecordFromShipment(eventData.Entity));
    }
    
    public async Task HandleEventAsync(EntityUpdatedEventData<Shipment> eventData)
    {
        await Task.Run(() => UpdateShipmentReportRecordFromShipment(eventData.Entity));
    }
    
    • If you want more scalability and control over background processing, implement a queue-based system such as RabbitMQ, Azure Service Bus, or Kafka.