diff --git a/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs b/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs index d12934c..5a015eb 100644 --- a/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs +++ b/src/Asv.IO/Protocol/Connection/IProtocolConnection.cs @@ -20,122 +20,7 @@ public interface IProtocolConnection:ISupportTag,ISupportStatistic, IDisposable, public static class ProtocolConnectionHelper { - public static Observable RxFilter(this IProtocolConnection connection) - where TMessage : IProtocolMessage, new() - { - var messageId = new TMessage().Id; - return connection.OnRxMessage.Where(messageId, (raw, id) => - { - if (raw is TMessage message) - { - return message.Id != null && message.Id.Equals(id); - } - - return false; - - }).Cast(); - } - - public static Observable RxFilter(this IProtocolConnection connection, - Func filter) - where TMessage : IProtocolMessage, new() - { - var messageId = new TMessage().Id; - return connection.OnRxMessage.Where(messageId, (raw, id) => - { - if (raw is TMessage message) - { - return message.Id != null && message.Id.Equals(id) && filter(message); - } - - return false; - - }).Cast(); - } - - public static async Task SendAndWaitAnswer( - this IProtocolConnection connection, - IProtocolMessage request, - FilterDelegate filterAndGetResult, - CancellationToken cancel = default) - where TMessage : IProtocolMessage, new() - { - cancel.ThrowIfCancellationRequested(); - var tcs = new TaskCompletionSource(); - await using var c1 = cancel.Register(() => tcs.TrySetCanceled()); - using var c2 = connection.RxFilter().Subscribe(filterAndGetResult, (res, f) => - { - if (filterAndGetResult(res, out var result)) - { - tcs.TrySetResult(result); - } - }); - await connection.Send(request, cancel).ConfigureAwait(false); - return await tcs.Task.ConfigureAwait(false); - } - - public static async Task SendAndWaitAnswer( - this IProtocolConnection connection, - IProtocolMessage request, - FilterDelegate filterAndGetResult, - TimeSpan timeout, - CancellationToken cancel = default, - TimeProvider? timeProvider = null) - where TMessage : IProtocolMessage, new() - { - timeProvider ??= TimeProvider.System; - using var linkedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancel); - linkedCancel.CancelAfter(timeout, timeProvider); - return await connection.SendAndWaitAnswer(request, filterAndGetResult, cancel); - } - - public static async Task SendAndWaitAnswer( - this IProtocolConnection connection, - TRequestMessage request, - FilterDelegate filterAndGetResult, - TimeSpan timeout, - int maxAttemptCount, - ResendMessageModifyDelegate? modifyRequestOnResend = null, - CancellationToken cancel = default, - TimeProvider? timeProvider = null, - IProgress? progress = null) - where TResultMessage : IProtocolMessage, new() - where TRequestMessage : IProtocolMessage - { - cancel.ThrowIfCancellationRequested(); - TResult? result = default; - byte currentAttempt = 0; - progress ??= new Progress(); - while (IsRetryCondition()) - { - progress.Report(currentAttempt); - if (currentAttempt != 0) - { - modifyRequestOnResend?.Invoke(request, currentAttempt); - } - - ++currentAttempt; - try - { - result = await connection.SendAndWaitAnswer(request, filterAndGetResult, timeout, cancel, timeProvider) - .ConfigureAwait(false); - break; - } - catch (OperationCanceledException) - { - if (IsRetryCondition()) - { - continue; - } - - cancel.ThrowIfCancellationRequested(); - } - } - if (result != null) return result; - throw new TimeoutException($"Timeout to execute '{request}' with {maxAttemptCount} x {timeout}'"); - - bool IsRetryCondition() => currentAttempt < maxAttemptCount; - } + } public delegate bool FilterDelegate(TMessage input, out TResult result) diff --git a/src/Asv.IO/Protocol/IProtocol.cs b/src/Asv.IO/Protocol/IProtocol.cs index d6bf3b3..5b70ae3 100644 --- a/src/Asv.IO/Protocol/IProtocol.cs +++ b/src/Asv.IO/Protocol/IProtocol.cs @@ -26,4 +26,5 @@ public interface IProtocolFactory IProtocolRouter CreateRouter(string id); IVirtualConnection CreateVirtualConnection(Func? clientToServerFilter = null, Func? serverToClientFilter = null); + } \ No newline at end of file diff --git a/src/Asv.IO/Protocol/ProtocolHelper.cs b/src/Asv.IO/Protocol/ProtocolHelper.cs index ca16c1a..a21a67b 100644 --- a/src/Asv.IO/Protocol/ProtocolHelper.cs +++ b/src/Asv.IO/Protocol/ProtocolHelper.cs @@ -1,4 +1,9 @@ +using System; using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Asv.Common; +using R3; namespace Asv.IO; @@ -12,6 +17,139 @@ public static partial class ProtocolHelper private static readonly Regex IdNormailizeRegex = MyRegex(); + public static Observable RxFilter(this IProtocolConnection connection) + where TMessage : IProtocolMessage => connection.OnRxMessage.RxFilter(); + public static Observable RxFilter(this IProtocolConnection connection, Func filter) + where TMessage : IProtocolMessage => connection.OnRxMessage.RxFilter(filter); + + public static Observable RxFilter(this Observable src) + where TMessage : IProtocolMessage + { + return src.Where(raw => raw is TMessage).Cast(); + } + + public static Observable RxFilter(this Observable src, Func filter) + where TMessage : IProtocolMessage + { + return src.Where(raw => raw is TMessage) + .Cast() + .Where(filter); + } + + public static Observable RxFilterById(this Observable src) + where TMessage : IProtocolMessage, new() + { + var messageId = new TMessage().Id; + return src.Where(messageId, (raw, id) => + { + if (raw is TMessage message) + { + return message.Id != null && message.Id.Equals(id); + } + return false; + + }).Cast(); + } + public static Observable RxFilterById(this Observable src, + Func filter) + where TMessage : IProtocolMessage, new() + { + return src.RxFilterById().Where(filter); + } + + public static Observable RxFilterById(this IProtocolConnection connection) + where TMessage : IProtocolMessage, new() + => connection.OnRxMessage.RxFilterById(); + + public static Observable RxFilterById(this IProtocolConnection connection, + Func filter) + where TMessage : IProtocolMessage, new() + => connection.OnRxMessage.RxFilterById(filter); + + public static async Task SendAndWaitAnswer( + this IProtocolConnection connection, + IProtocolMessage request, + FilterDelegate filterAndGetResult, + CancellationToken cancel = default) + where TMessage : IProtocolMessage, new() + { + cancel.ThrowIfCancellationRequested(); + var tcs = new TaskCompletionSource(); + await using var c1 = cancel.Register(() => tcs.TrySetCanceled()); + using var c2 = connection.RxFilterById() + .Subscribe(filterAndGetResult, (res, f) => + { + if (filterAndGetResult(res, out var result)) + { + tcs.TrySetResult(result); + } + }); + await connection.Send(request, cancel).ConfigureAwait(false); + return await tcs.Task.ConfigureAwait(false); + } + + public static async Task SendAndWaitAnswer( + this IProtocolConnection connection, + IProtocolMessage request, + FilterDelegate filterAndGetResult, + TimeSpan timeout, + CancellationToken cancel = default, + TimeProvider? timeProvider = null) + where TMessage : IProtocolMessage, new() + { + timeProvider ??= TimeProvider.System; + using var linkedCancel = CancellationTokenSource.CreateLinkedTokenSource(cancel); + linkedCancel.CancelAfter(timeout, timeProvider); + return await connection.SendAndWaitAnswer(request, filterAndGetResult, cancel); + } + + public static async Task SendAndWaitAnswer( + this IProtocolConnection connection, + TRequestMessage request, + FilterDelegate filterAndGetResult, + TimeSpan timeout, + int maxAttemptCount, + ResendMessageModifyDelegate? modifyRequestOnResend = null, + CancellationToken cancel = default, + TimeProvider? timeProvider = null, + IProgress? progress = null) + where TResultMessage : IProtocolMessage, new() + where TRequestMessage : IProtocolMessage + { + cancel.ThrowIfCancellationRequested(); + TResult? result = default; + byte currentAttempt = 0; + progress ??= new Progress(); + while (IsRetryCondition()) + { + progress.Report(currentAttempt); + if (currentAttempt != 0) + { + modifyRequestOnResend?.Invoke(request, currentAttempt); + } + + ++currentAttempt; + try + { + result = await connection.SendAndWaitAnswer(request, filterAndGetResult, timeout, cancel, timeProvider) + .ConfigureAwait(false); + break; + } + catch (OperationCanceledException) + { + if (IsRetryCondition()) + { + continue; + } + + cancel.ThrowIfCancellationRequested(); + } + } + if (result != null) return result; + throw new TimeoutException($"Timeout to execute '{request}' with {maxAttemptCount} x {timeout}'"); + + bool IsRetryCondition() => currentAttempt < maxAttemptCount; + } } \ No newline at end of file