Skip to content

Commit

Permalink
chore(event): save unfinished work.
Browse files Browse the repository at this point in the history
  • Loading branch information
DingpingZhang committed Oct 4, 2022
1 parent 1f0081a commit ed82ac8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
27 changes: 27 additions & 0 deletions HandyIpc/Core/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -32,5 +33,31 @@ public static async Task<byte[]> InvokeAsync(this IConnection connection, byte[]
throw;
}
}

public static void Subscribe(this IConnection connection, Guid id, Func<byte[], byte[]> callback, CancellationToken token)
{
// TODO: Subscribe by id.
connection.Write(id.ToByteArray());
byte[] subscribeResult = connection.Read();
// Check result and retry.

Task.Run(() => connection.Loop(id, callback, token), token);
}

private static async Task Loop(this IConnection connection, Guid id, Func<byte[], byte[]> callback, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
// Will blocked until accepted a notification.
byte[] input = await connection.ReadAsync(token);
byte[] output = callback(input);
await connection.WriteAsync(output, token);
}

// TODO: Unsubscribe by id.
await connection.WriteAsync(id.ToByteArray(), token);
byte[] unsubscribeResult = await connection.ReadAsync(token);
// Check result and retry.
}
}
}
23 changes: 23 additions & 0 deletions HandyIpc/Core/Subscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace HandyIpc.Core
{
public class Subscription
{
private const string AddHeader = "handyipc/cb/add";
private const string RemoveHeader = "handyipc/cb/remove";

public string Name { get; set; }

public string CallbackName { get; set; }

public string CallbackId { get; set; }

internal static bool TryParse(byte[] input, ISerializer serializer, out Subscription subscription)
{
throw new NotImplementedException();
}
}
}

0 comments on commit ed82ac8

Please sign in to comment.