Skip to content

Commit

Permalink
Added support for graceful shutdown in struct based actors
Browse files Browse the repository at this point in the history
  • Loading branch information
andresgutierrez committed Jul 26, 2024
1 parent 972c0a0 commit c2c0fb4
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 61 deletions.
25 changes: 24 additions & 1 deletion Nixie.Tests/TestShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,32 @@ public async Task TestSpawnFireAndForgetSlowActorAndGracefulShutdownShortDelayBy

await asx.Wait();

Assert.Equal(2, ((ShutdownSlowActor)actor.Runner.Actor!).GetMessages());
Assert.Equal(1, ((ShutdownSlowActor)actor.Runner.Actor!).GetMessages());

IActorRef<ShutdownSlowActor, string>? actor2 = asx.Get<ShutdownSlowActor, string>("my-actor");
Assert.Null(actor2);
}

[Fact]
public async Task TestSpawnFireAndForgetActorStructAndGracefulShutdownByName()
{
using ActorSystem asx = new();

IActorRefStruct<ShutdownActorStruct, int> actor = asx.SpawnStruct<ShutdownActorStruct, int>("my-actor");

Assert.IsAssignableFrom<ShutdownActorStruct>(actor.Runner.Actor);

actor.Send(100);

await asx.Wait();

Assert.True(await asx.GracefulShutdownStruct<ShutdownActorStruct, int>("my-actor", TimeSpan.FromMinutes(1)));

await asx.Wait();

Assert.Equal(1, ((ShutdownActorStruct)actor.Runner.Actor!).GetMessages());

IActorRefStruct<ShutdownActorStruct, int>? actor2 = asx.GetStruct<ShutdownActorStruct, int>("my-actor");
Assert.Null(actor2);
}
}
21 changes: 21 additions & 0 deletions Nixie/ActorRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,25 @@ public async Task<bool> GracefulShutdown(string name, TimeSpan maxWait)

return true;
}

/// <summary>
/// Tries to shutdown an actor by its name and returns a task whose result confirms shutdown within the specified timespan
/// </summary>
/// <param name="actorRef"></param>
/// <param name="maxWait"></param>
/// <returns></returns>
public async Task<bool> GracefulShutdown(IActorRef<TActor, TRequest> actorRef, TimeSpan maxWait)
{
string name = actorRef.Runner.Name;

if (actors.TryGetValue(name, out Lazy<(ActorRunner<TActor, TRequest> runner, ActorRef<TActor, TRequest> actorRef)>? actor))
{
bool success = await actor.Value.runner.GracefulShutdown(maxWait);
actors.TryRemove(name, out _);
actorSystem.StopAllTimers(actor.Value.actorRef);
return success;
}

return true;
}
}
48 changes: 25 additions & 23 deletions Nixie/ActorRepositoryReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,49 +189,52 @@ public bool Shutdown(string name)
if (actor.Value.runner.Shutdown())
{
actors.TryRemove(name, out _);
actorSystem.StopAllTimers(actor.Value.actorRef);
return true;
}
}

return true;
}
}

/// <summary>
/// Tries to shutdown an actor by its name and returns a task whose result confirms shutdown within the specified timespan
/// Shutdowns an actor by its reference
/// </summary>
/// <param name="name"></param>
/// <param name="maxWait"></param>
/// <param name="actorRef"></param>
/// <returns></returns>
public async Task<bool> GracefulShutdown(string name, TimeSpan maxWait)
public bool Shutdown(IActorRef<TActor, TRequest, TResponse> actorRef)
{
name = name.ToLowerInvariant();
string name = actorRef.Runner.Name;

if (actors.TryGetValue(name, out Lazy<(ActorRunner<TActor, TRequest, TResponse> runner, ActorRef<TActor, TRequest, TResponse> actorRef)>? actor))
{
bool success = await actor.Value.runner.GracefulShutdown(maxWait);
actors.TryRemove(name, out _);
return success;
if (actor.Value.runner.Shutdown())
{
actors.TryRemove(name, out _);
actorSystem.StopAllTimers(actor.Value.actorRef);
return true;
}
}

return true;
}

/// <summary>
/// Shutdowns an actor by its reference
/// Tries to shutdown an actor by its name and returns a task whose result confirms shutdown within the specified timespan
/// </summary>
/// <param name="actorRef"></param>
/// <param name="name"></param>
/// <param name="maxWait"></param>
/// <returns></returns>
public bool Shutdown(IActorRef<TActor, TRequest, TResponse> actorRef)
public async Task<bool> GracefulShutdown(string name, TimeSpan maxWait)
{
string name = actorRef.Runner.Name;
name = name.ToLowerInvariant();

if (actors.TryGetValue(name, out Lazy<(ActorRunner<TActor, TRequest, TResponse> runner, ActorRef<TActor, TRequest, TResponse> actorRef)>? actor))
{
if (actor.Value.runner.Shutdown())
{
actors.TryRemove(name, out _);
return true;
}
bool success = await actor.Value.runner.GracefulShutdown(maxWait);
actors.TryRemove(name, out _);
actorSystem.StopAllTimers(actor.Value.actorRef);
return success;
}

return true;
Expand All @@ -249,11 +252,10 @@ public async Task<bool> GracefulShutdown(IActorRef<TActor, TRequest, TResponse>

if (actors.TryGetValue(name, out Lazy<(ActorRunner<TActor, TRequest, TResponse> runner, ActorRef<TActor, TRequest, TResponse> actorRef)>? actor))
{
if (await actor.Value.runner.GracefulShutdown(maxWait))
{
actors.TryRemove(name, out _);
return true;
}
bool success = await actor.Value.runner.GracefulShutdown(maxWait);
actors.TryRemove(name, out _);
actorSystem.StopAllTimers(actor.Value.actorRef);
return success;
}

return true;
Expand Down
21 changes: 21 additions & 0 deletions Nixie/ActorRepositoryStruct.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,25 @@ public bool Shutdown(IActorRefStruct<TActor, TRequest> actorRef)

return true;
}

/// <summary>
/// Tries to shutdown an actor by its name and returns a task whose result confirms shutdown within the specified timespan
/// </summary>
/// <param name="name"></param>
/// <param name="maxWait"></param>
/// <returns></returns>
public async Task<bool> GracefulShutdown(string name, TimeSpan maxWait)
{
name = name.ToLowerInvariant();

if (actors.TryGetValue(name, out Lazy<(ActorRunnerStruct<TActor, TRequest> runner, ActorRefStruct<TActor, TRequest> actorRef)>? actor))
{
bool success = await actor.Value.runner.GracefulShutdown(maxWait);
actorSystem.StopAllTimers(actor.Value.actorRef);
actors.TryRemove(name, out _);
return success;
}

return true;
}
}
21 changes: 21 additions & 0 deletions Nixie/ActorRepositoryStructReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,25 @@ public bool Shutdown(IActorRefStruct<TActor, TRequest, TResponse> actorRef)

return true;
}

/// <summary>
/// Tries to shutdown an actor by its name and returns a task whose result confirms shutdown within the specified timespan
/// </summary>
/// <param name="actorRef"></param>
/// <param name="maxWait"></param>
/// <returns></returns>
public async Task<bool> GracefulShutdown(IActorRefStruct<TActor, TRequest, TResponse> actorRef, TimeSpan maxWait)
{
string name = actorRef.Runner.Name;

if (actors.TryGetValue(name, out Lazy<(ActorRunnerStruct<TActor, TRequest, TResponse> runner, ActorRefStruct<TActor, TRequest, TResponse> actorRef)>? actor))
{
bool success = await actor.Value.runner.GracefulShutdown(maxWait);
actors.TryRemove(name, out _);
actorSystem.StopAllTimers(actor.Value.actorRef);
return success;
}

return true;
}
}
35 changes: 35 additions & 0 deletions Nixie/ActorRunnerStruct.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public sealed class ActorRunnerStruct<TActor, TRequest> where TActor : IActorStr

private readonly ConcurrentQueue<ActorMessage<TRequest>> inbox = new();

private TaskCompletionSource? gracefulShutdown;

private int processing = 1;

private int shutdown = 1;
Expand Down Expand Up @@ -95,6 +97,34 @@ public bool Shutdown()
return 1 == Interlocked.Exchange(ref shutdown, 0);
}

/// <summary>
/// Tries to shutdown the actor returns a task whose result confirms shutdown within the specified timespan
/// </summary>
/// <param name="maxWait"></param>
/// <returns></returns>
public async ValueTask<bool> GracefulShutdown(TimeSpan maxWait)
{
if (inbox.IsEmpty)
return Shutdown();

if (gracefulShutdown is not null)
return false;

gracefulShutdown = new(TaskCreationOptions.RunContinuationsAsynchronously);

Task timeout = Task.Delay(maxWait);

Task completed = await Task.WhenAny(
timeout,
gracefulShutdown.Task
);

if (completed == timeout)
Shutdown();

return completed != timeout;
}

/// <summary>
/// Enqueues a message to the actor and tries to deliver it.
/// The request/response type actors use an object to assign the response once completed.
Expand All @@ -105,7 +135,10 @@ private async Task DeliverMessages()
try
{
if (Actor is null || ActorContext is null || shutdown == 0)
{
gracefulShutdown?.SetResult();
return;
}

ActorContext.Runner = this;

Expand Down Expand Up @@ -134,6 +167,8 @@ private async Task DeliverMessages()
}
}
} while (shutdown == 1 && Interlocked.CompareExchange(ref processing, 1, 0) != 0);

gracefulShutdown?.SetResult();
}
catch (Exception ex)
{
Expand Down
35 changes: 35 additions & 0 deletions Nixie/ActorRunnerStructReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public sealed class ActorRunnerStruct<TActor, TRequest, TResponse> where TActor

private readonly ConcurrentQueue<ActorMessageReply<TRequest, TResponse>> inbox = new();

private TaskCompletionSource? gracefulShutdown;

private int processing = 1;

private int shutdown = 1;
Expand Down Expand Up @@ -113,6 +115,34 @@ public bool Shutdown()
return 1 == Interlocked.Exchange(ref shutdown, 0);
}

/// <summary>
/// Tries to shutdown the actor returns a task whose result confirms shutdown within the specified timespan
/// </summary>
/// <param name="maxWait"></param>
/// <returns></returns>
public async ValueTask<bool> GracefulShutdown(TimeSpan maxWait)
{
if (inbox.IsEmpty)
return Shutdown();

if (gracefulShutdown is not null)
return false;

gracefulShutdown = new(TaskCreationOptions.RunContinuationsAsynchronously);

Task timeout = Task.Delay(maxWait);

Task completed = await Task.WhenAny(
timeout,
gracefulShutdown.Task
);

if (completed == timeout)
Shutdown();

return completed != timeout;
}

/// <summary>
/// It retrieves a message from the inbox and invokes the actor by passing one message
/// at a time until the pending message list is cleared.
Expand All @@ -123,7 +153,10 @@ private async Task DeliverMessages()
try
{
if (Actor is null || ActorContext is null || shutdown == 0)
{
gracefulShutdown?.SetResult();
return;
}

ActorContext.Runner = this;

Expand Down Expand Up @@ -157,6 +190,8 @@ private async Task DeliverMessages()
}
}
} while (shutdown == 1 && (Interlocked.CompareExchange(ref processing, 1, 0) != 0));

gracefulShutdown?.SetResult();
}
catch (Exception ex)
{
Expand Down
13 changes: 13 additions & 0 deletions Nixie/ActorScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@ public void StopAllTimers<TActor, TRequest>(IActorRefStruct<TActor, TRequest> ac
StopAllTimersInternal(actorRef);
}

/// <summary>
/// Stops all timers running in an actor
/// </summary>
/// <typeparam name="TActor"></typeparam>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="actorRef"></param>
public void StopAllTimers<TActor, TRequest, TResponse>(IActorRefStruct<TActor, TRequest, TResponse> actorRef)
where TActor : IActorStruct<TRequest, TResponse> where TRequest : struct where TResponse : struct
{
StopAllTimersInternal(actorRef);
}

private Timer AddPeriodicTimerInternal<TActor, TRequest>(IActorRef<TActor, TRequest> actorRef, TRequest request, TimeSpan initialDelay, TimeSpan interval)
where TActor : IActor<TRequest> where TRequest : class
{
Expand Down
Loading

0 comments on commit c2c0fb4

Please sign in to comment.