-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[API Proposal]: Add an IAsyncEnumerable<T>.ToEnumerable extension method. #60106
Comments
Tagging subscribers to this area: @dotnet/area-system-collections Issue DetailsBackground and motivationAttempting to adapt an public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
var list = new BlockingCollection<T>();
async Task AsyncIterate()
{
await foreach (var item in asyncEnumerable)
{
list.Add(item);
}
list.CompleteAdding();
}
_ = AsyncIterate();
return list.GetConsumingEnumerable();
} This works well but there's no way to propagate exceptions to the consuming enumerable that may happen when enumerating the public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
var list = new BlockingCollection<T>();
async Task AsyncIterate()
{
try
{
await foreach (var item in asyncEnumerable)
{
list.Add(item);
}
list.CompleteAdding();
}
catch(Exception ex)
{
list.CompleteAdding(ex);
}
}
_ = AsyncIterate();
return list.GetConsumingEnumerable();
} CompleteAdding with an exception would propagate it to the caller and it would throw on synchronous enumeration. The only way to throw an exception today AFAIK is to use a cancellation token but that won't propagate the original exception. API Proposalnamespace System.Collections.Concurrent
{
public class BlockingCollection<T> : IEnumerable<T>
{
public void CompleteAdding(Exception error);
}
} API Usagepublic static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
var list = new BlockingCollection<T>();
async Task AsyncIterate()
{
try
{
await foreach (var item in asyncEnumerable)
{
list.Add(item);
}
list.CompleteAdding();
}
catch(Exception ex)
{
list.CompleteAdding(ex);
}
}
_ = AsyncIterate();
return list.GetConsumingEnumerable();
} RisksNone that I'm aware of.
|
If some loss of performance was acceptable here, then I think a simple modification of the code would make this work: public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
var list = new BlockingCollection<T>();
async Task AsyncIterate()
{
try
{
await foreach (var item in asyncEnumerable)
{
list.Add(item);
}
}
finally
{
list.CompleteAdding();
}
}
var task = AsyncIterate();
foreach (var item in list.GetConsumingEnumerable())
{
yield return item;
}
task.GetAwaiter().GetResult();
} Does this make the proposal less useful? Also, I don't like the suggested naming: |
I believe @svick's alternative has the added advantage of delaying async enumeration until blocking enumeration has started. Instead of retrofitting namespace System.Collections.Generic
{
public static class CollectionExtensions
{
public static IEnumerable<T> ToBlockingEnumerable<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default);
public static ValueTask<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default);
}
} |
@eiriktsarpalis sure, I wasn't sure about our tolerance for adding extensions to IAsyncEnumerable since we didn't add any to date 😄 |
I'd be more inclined to add a ToEnumerable extension method than to add a CompleteAdding(Exception) to BlockingCollection. We've avoided adding a full set of LINQ methods for IAsyncEnumerable, instead deferring to /~https://github.com/dotnet/reactive for that, but for helpers for going between core types, e.g. IAsyncEnumerable to IEnumerable, I'm ok with it. (And, just to be pedantic, we actually have added IAsyncEnumerable extension methods 😄 : /~https://github.com/dotnet/runtime/blob/6195d98cad42bc7ab70729e6c970e2c420b29db1/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskAsyncEnumerableExtensions.cs) I don't know, though, that BlockingCollection should be used in the implementation. If you really want these semantics, you can do what @svick has done, but this does effectively change the iteration semantics to be that once you start enumerating, background operations are filling up the collection concurrently with consumption of data, and that's generally not how either enumerables nor async enumerables work. On top of that, BlockingCollection, in large part due to its design of being able to wrap arbitrary collections and delegate to them for a variety of functionality, isn't terribly efficient, in particular from an allocation perspective when data isn't available. If we wanted to add a ToEnumerable method, I'd prefer to see us just doing something like: public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default)
{
IAsyncEnumerator<T> e = source.GetAsyncEnumerator(cancellationToken);
try
{
while (true)
{
ValueTask<bool> moveNext = e.MoveNextAsync();
if (moveNext.IsCompletedSuccessfully ? moveNext.Result : moveNext.AsTask().GetAwaiter().GetResult())
{
yield return e.Current;
}
else break;
}
}
finally
{
e.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
} That keeps everything in the world of "nothing is in flight while not interacting with the enumerator", avoids needing to expose some kind of bounded buffering policy on how many items a backing collection would enable, etc. We'd also want to consider whether we really want to encourage such blocking behaviors by exposing this as an extension method... might be worth forcing someone to call it via a static just to add a tiny bit more friction. Or something along those lines. Or maybe @eiriktsarpalis's naming suggestion would help. |
Somewhat, but I can think of scenarios that are the synchronous version of |
The most obvious implementation is usually the best one 😄. I don't think it avoids a need to add CompleteAdding with an exception, or some way to push the exception to the caller, unless we're saying that BlockingCollection isn't something we want to modify. If that's the case, what's the blocking version of a |
|
Hmmm, performance wise, is it the same? |
@davidfowl wouldn't the proposal have side effects on other members of the class? Like the |
Then Take would throw an exception? What happens when CompletedAdding is called from another thread while Take was blocked? |
... so attempting to have a different exception pop up would likely be considered a breaking change. |
For the record, that method already exists in dotnet/reactive. |
We'd need to measure. My gut is serialized throughout is slightly lower but allocation is much better. We also haven't invested in making blocking throughout with tasks super fast; if it's something we wanted to improve, I expect we could. I admit I'm surprised you care ;-) |
I'd support adding a |
Thought experiment 😄 |
namespace System.Threading.Tasks
{
public static class TaskAsyncEnumerableExtensions
{
public static IEnumerable<T> ToBlockingEnumerable<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default);
}
} |
cc @ajcvickers about ToListAsynx |
Agree with @terrajobst. /cc @smitpatel @roji @AndriySvyryd for their thoughts. |
Agree with @terrajobst above. |
I also agree with @terrajobst. If every |
EDIT: See #60106 (comment) for an API proposal.
Background and motivation
Attempting to adapt an
IAsyncEnumerable<T>
to anIEnumerable<T>
, Blocking collection seems to be the ideal type. It can be used to generate an enumerable that waits until theIAsyncEnumerable<T>
produces items:This works well but there's no way to propagate exceptions to the consuming enumerable that may happen when enumerating the
IAsyncEnumerable<T>
. I'd like to write something like this:CompleteAdding with an exception would propagate it to the caller and it would throw on synchronous enumeration. The only way to throw an exception today AFAIK is to use a cancellation token but that won't propagate the original exception.
API Proposal
API Usage
Risks
None that I'm aware of.
The text was updated successfully, but these errors were encountered: