添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::Generic::IReadOnlyCollection<T>, System::Collections::ICollection
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
type BlockingCollection<'T> = class
    interface seq<'T>
    interface IEnumerable
    interface IReadOnlyCollection<'T>
    interface ICollection
    interface IDisposable
[<System.Runtime.Versioning.UnsupportedOSPlatform("browser")>]
type BlockingCollection<'T> = class
    interface seq<'T>
    interface IEnumerable
    interface IReadOnlyCollection<'T>
    interface ICollection
    interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
    interface seq<'T>
    interface ICollection
    interface IEnumerable
    interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
    interface seq<'T>
    interface IEnumerable
    interface ICollection
    interface IDisposable
    interface IReadOnlyCollection<'T>
type BlockingCollection<'T> = class
    interface seq<'T>
    interface ICollection
    interface IEnumerable
    interface IDisposable
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T), IReadOnlyCollection(Of T)
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T)
await AddTakeDemo.BC_AddTakeCompleteAdding(); TryTakeDemo.BC_TryTake(); FromToAnyDemo.BC_FromToAny(); await ConsumingEnumerableDemo.BC_GetConsumingEnumerable(); Console.WriteLine("Press any key to exit."); Console.ReadKey(); class AddTakeDemo // Demonstrates: // BlockingCollection<T>.Add() // BlockingCollection<T>.Take() // BlockingCollection<T>.CompleteAdding() public static async Task BC_AddTakeCompleteAdding() using (BlockingCollection<int> bc = new BlockingCollection<int>()) // Spin up a Task to populate the BlockingCollection Task t1 = Task.Run(() => bc.Add(1); bc.Add(2); bc.Add(3); bc.CompleteAdding(); // Spin up a Task to consume the BlockingCollection Task t2 = Task.Run(() => // Consume the BlockingCollection while (true) Console.WriteLine(bc.Take()); catch (InvalidOperationException) // An InvalidOperationException means that Take() was called on a completed collection Console.WriteLine("That's All!"); await Task.WhenAll(t1, t2); class TryTakeDemo // Demonstrates: // BlockingCollection<T>.Add() // BlockingCollection<T>.CompleteAdding() // BlockingCollection<T>.TryTake() // BlockingCollection<T>.IsCompleted public static void BC_TryTake() // Construct and fill our BlockingCollection using (BlockingCollection<int> bc = new BlockingCollection<int>()) int NUMITEMS = 10000; for (int i = 0; i < NUMITEMS; i++) bc.Add(i); bc.CompleteAdding(); int outerSum = 0; // Delegate for consuming the BlockingCollection and adding up all items Action action = () => int localItem; int localSum = 0; while (bc.TryTake(out localItem)) localSum += localItem; Interlocked.Add(ref outerSum, localSum); // Launch three parallel actions to consume the BlockingCollection Parallel.Invoke(action, action, action); Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2)); Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted); class FromToAnyDemo // Demonstrates: // Bounded BlockingCollection<T> // BlockingCollection<T>.TryAddToAny() // BlockingCollection<T>.TryTakeFromAny() public static void BC_FromToAny() BlockingCollection<int>[] bcs = new BlockingCollection<int>[2]; bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items // Should be able to add 10 items w/o blocking int numFailures = 0; for (int i = 0; i < 10; i++) if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++; Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures); // Should be able to retrieve 10 items int numItems = 0; int item; while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++; Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems); class ConsumingEnumerableDemo // Demonstrates: // BlockingCollection<T>.Add() // BlockingCollection<T>.CompleteAdding() // BlockingCollection<T>.GetConsumingEnumerable() public static async Task BC_GetConsumingEnumerable() using (BlockingCollection<int> bc = new BlockingCollection<int>()) // Kick off a producer task var producerTask = Task.Run(async () => for (int i = 0; i < 10; i++) bc.Add(i); Console.WriteLine($"Producing: {i}"); await Task.Delay(100); // sleep 100 ms between adds // Need to do this to keep foreach below from hanging bc.CompleteAdding(); // Now consume the blocking collection with foreach. // Use bc.GetConsumingEnumerable() instead of just bc because the // former will block waiting for completion and the latter will // simply take a snapshot of the current state of the underlying collection. foreach (var item in bc.GetConsumingEnumerable()) Console.WriteLine($"Consuming: {item}"); await producerTask; // Allow task to complete cleanup open System open System.Collections.Concurrent open System.Threading open System.Threading.Tasks module AddTakeDemo = // Demonstrates: // BlockingCollection<T>.Add() // BlockingCollection<T>.Take() // BlockingCollection<T>.CompleteAdding() let blockingCollectionAddTakeCompleteAdding () = task { use bc = new BlockingCollection<int>() // Spin up a Task to populate the BlockingCollection let t1 = task { bc.Add 1 bc.Add 2 bc.Add 3 bc.CompleteAdding() // Spin up a Task to consume the BlockingCollection let t2 = task { // Consume consume the BlockingCollection while true do printfn $"{bc.Take()}" with :? InvalidOperationException -> // An InvalidOperationException means that Take() was called on a completed collection printfn "That's All!" let! _ = Task.WhenAll(t1, t2) module TryTakeDemo = // Demonstrates: // BlockingCollection<T>.Add() // BlockingCollection<T>.CompleteAdding() // BlockingCollection<T>.TryTake() // BlockingCollection<T>.IsCompleted let blockingCollectionTryTake () = // Construct and fill our BlockingCollection use bc = new BlockingCollection<int>() let NUMITEMS = 10000; for i = 0 to NUMITEMS - 1 do bc.Add i bc.CompleteAdding() let mutable outerSum = 0 // Delegate for consuming the BlockingCollection and adding up all items let action = Action(fun () -> let mutable localItem = 0 let mutable localSum = 0 while bc.TryTake &localItem do localSum <- localSum + localItem Interlocked.Add(&outerSum, localSum) |> ignore) // Launch three parallel actions to consume the BlockingCollection Parallel.Invoke(action, action, action) printfn $"Sum[0..{NUMITEMS}) = {outerSum}, should be {((NUMITEMS * (NUMITEMS - 1)) / 2)}" printfn $"bc.IsCompleted = {bc.IsCompleted} (should be true)" module FromToAnyDemo = // Demonstrates: // Bounded BlockingCollection<T> // BlockingCollection<T>.TryAddToAny() // BlockingCollection<T>.TryTakeFromAny() let blockingCollectionFromToAny () = let bcs = new BlockingCollection<int>(5) // collection bounded to 5 items new BlockingCollection<int>(5) // collection bounded to 5 items // Should be able to add 10 items w/o blocking let mutable numFailures = 0; for i = 0 to 9 do if BlockingCollection<int>.TryAddToAny(bcs, i) = -1 then numFailures <- numFailures + 1 printfn $"TryAddToAny: {numFailures} failures (should be 0)" // Should be able to retrieve 10 items let mutable numItems = 0 let mutable item = 0 while BlockingCollection<int>.TryTakeFromAny(bcs, &item) <> -1 do numItems <- numItems + 1 printfn $"TryTakeFromAny: retrieved {numItems} items (should be 10)" module ConsumingEnumerableDemo = // Demonstrates: // BlockingCollection<T>.Add() // BlockingCollection<T>.CompleteAdding() // BlockingCollection<T>.GetConsumingEnumerable() let blockingCollectionGetConsumingEnumerable () = task { use bc = new BlockingCollection<int>() // Kick off a producer task let producerTask = task { for i = 0 to 9 do bc.Add i printfn $"Producing: {i}" do! Task.Delay 100 // sleep 100 ms between adds // Need to do this to keep foreach below from hanging bc.CompleteAdding() // Now consume the blocking collection with foreach. // Use bc.GetConsumingEnumerable() instead of just bc because the // former will block waiting for completion and the latter will // simply take a snapshot of the current state of the underlying collection. for item in bc.GetConsumingEnumerable() do printfn $"Consuming: {item}" do! producerTask // Allow task to complete cleanup let main = task { do! AddTakeDemo.blockingCollectionAddTakeCompleteAdding () TryTakeDemo.blockingCollectionTryTake () FromToAnyDemo.blockingCollectionFromToAny () do! ConsumingEnumerableDemo.blockingCollectionGetConsumingEnumerable () printfn "Press any key to exit." Console.ReadKey(true) |> ignore main.Wait() Imports System.Threading.Tasks Imports System.Collections.Concurrent Imports System.Threading Class BlockingCollectionDemo Shared Sub Main() AddTakeDemo.BC_AddTakeCompleteAdding() TryTakeDemo.BC_TryTake() ToAnyDemo.BC_ToAny() ConsumingEnumerableDemo.BC_GetConsumingEnumerable() ' Keep the console window open in debug mode Console.WriteLine("Press any key to exit.") Console.ReadKey() End Sub End Class Class AddTakeDemo ' Demonstrates: ' BlockingCollection<T>.Add() ' BlockingCollection<T>.Take() ' BlockingCollection<T>.CompleteAdding() Shared Sub BC_AddTakeCompleteAdding() Using bc As New BlockingCollection(Of Integer)() ' Spin up a Task to populate the BlockingCollection Using t1 As Task = Task.Factory.StartNew( Sub() bc.Add(1) bc.Add(2) bc.Add(3) bc.CompleteAdding() End Sub) ' Spin up a Task to consume the BlockingCollection Using t2 As Task = Task.Factory.StartNew( Sub() ' Consume the BlockingCollection While True Console.WriteLine(bc.Take()) End While Catch generatedExceptionName As InvalidOperationException ' An InvalidOperationException means that Take() was called on a completed collection Console.WriteLine("That's All!") End Try End Sub) Task.WaitAll(t1, t2) End Using End Using End Using End Sub End Class 'Imports System.Collections.Concurrent 'Imports System.Threading 'Imports System.Threading.Tasks Class TryTakeDemo ' Demonstrates: ' BlockingCollection<T>.Add() ' BlockingCollection<T>.CompleteAdding() ' BlockingCollection<T>.TryTake() ' BlockingCollection<T>.IsCompleted Shared Sub BC_TryTake() ' Construct and fill our BlockingCollection Using bc As New BlockingCollection(Of Integer)() Dim NUMITEMS As Integer = 10000 For i As Integer = 0 To NUMITEMS - 1 bc.Add(i) bc.CompleteAdding() Dim outerSum As Integer = 0 ' Delegate for consuming the BlockingCollection and adding up all items Dim action As Action = Sub() Dim localItem As Integer Dim localSum As Integer = 0 While bc.TryTake(localItem) localSum += localItem End While Interlocked.Add(outerSum, localSum) End Sub ' Launch three parallel actions to consume the BlockingCollection Parallel.Invoke(action, action, action) Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2)) Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted) End Using End Sub End Class 'Imports System.Threading.Tasks 'Imports System.Collections.Concurrent ' Demonstrates: ' Bounded BlockingCollection<T> ' BlockingCollection<T>.TryAddToAny() ' BlockingCollection<T>.TryTakeFromAny() Class ToAnyDemo Shared Sub BC_ToAny() Dim bcs As BlockingCollection(Of Integer)() = New BlockingCollection(Of Integer)(1) {} bcs(0) = New BlockingCollection(Of Integer)(5) ' collection bounded to 5 items bcs(1) = New BlockingCollection(Of Integer)(5) ' collection bounded to 5 items ' Should be able to add 10 items w/o blocking Dim numFailures As Integer = 0 For i As Integer = 0 To 9 If BlockingCollection(Of Integer).TryAddToAny(bcs, i) = -1 Then numFailures += 1 End If Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures) ' Should be able to retrieve 10 items Dim numItems As Integer = 0 Dim item As Integer While BlockingCollection(Of Integer).TryTakeFromAny(bcs, item) <> -1 numItems += 1 End While Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems) End Sub End Class 'Imports System.Threading.Tasks 'Imports System.Collections.Concurrent ' Demonstrates: ' BlockingCollection<T>.Add() ' BlockingCollection<T>.CompleteAdding() ' BlockingCollection<T>.GetConsumingEnumerable() Class ConsumingEnumerableDemo Shared Sub BC_GetConsumingEnumerable() Using bc As New BlockingCollection(Of Integer)() ' Kick off a producer task Task.Factory.StartNew( Sub() For i As Integer = 0 To 9 bc.Add(i) ' sleep 100 ms between adds Thread.Sleep(100) ' Need to do this to keep foreach below from not responding. bc.CompleteAdding() End Sub) ' Now consume the blocking collection with foreach. ' Use bc.GetConsumingEnumerable() instead of just bc because the ' former will block waiting for completion and the latter will ' simply take a snapshot of the current state of the underlying collection. For Each item In bc.GetConsumingEnumerable() Console.WriteLine(item) End Using End Sub End Class

BlockingCollection<T> 是提供以下内容的线程安全集合类:

  • 生成者/使用者模式的实现; BlockingCollection<T> 是 接口的 IProducerConsumerCollection<T> 包装器。

  • 使用 Add Take 方法并发添加和删除多个线程中的项。

  • 一个边界集合,当集合已满或为空时阻止 Add Take 操作。

  • Add 通过使用 CancellationToken Take 方法中的 TryAdd 对象取消 或 TryTake 操作。

    此类型实现 IDisposable 接口。 在使用完类型后,您应直接或间接释放类型。 若要直接释放类型,请在 try / catch 块中调用其 Dispose 方法。 若要间接释放类型,请使用 using (在 C# 中)或 Using (在 Visual Basic 中)等语言构造。 有关详细信息,请参阅 IDisposable 接口主题中的“使用实现 IDisposable 的对象”一节。 另请注意, Dispose() 方法不是线程安全的。 的所有其他公共成员和受保护成员 BlockingCollection<T> 都是线程安全的,并且可以从多个线程并发使用。

    IProducerConsumerCollection<T> 表示允许线程安全添加和删除数据的集合。 BlockingCollection<T> 用作实例的 IProducerConsumerCollection<T> 包装器,并允许从集合中阻止删除尝试,直到数据可用为止。 同样,可以创建 , BlockingCollection<T> 对 中允许 IProducerConsumerCollection<T> 的数据元素数强制实施上限;然后,对集合的添加尝试可能会受阻,直到有空间可用于存储添加的项。 以这种方式, BlockingCollection<T> 类似于传统的阻塞队列数据结构,只不过基础数据存储机制被抽象化为 IProducerConsumerCollection<T>

    BlockingCollection<T> 支持限制和阻塞。 边界意味着可以设置集合的最大容量。 在某些情况下,边界非常重要,因为它使你能够控制内存中集合的最大大小,并防止生成线程在消耗线程之前移动得太远。多个线程或任务可以同时向集合添加项,如果集合达到其指定的最大容量,则生成线程将阻塞,直到删除项。 多个使用者可以同时移除项,如果集合变空,则使用线程将发生阻塞,直到制造者添加某个项。 生成线程可以调用 CompleteAdding 方法以指示不再添加任何项。 使用者将监视 IsCompleted 属性以了解集合何时为空且不再添加项。

    Add Take 操作通常在循环中执行。 可以通过将 对象传递给 CancellationToken TryAdd TryTake 方法,然后在每次迭代中检查令牌的 IsCancellationRequested 属性的值来取消循环。 如果值为 true ,则由你通过清理任何资源并退出循环来响应取消请求。

    创建 BlockingCollection<T> 对象时,不仅可以指定边界容量,还可以指定要使用的集合类型。 例如,可以为先入先出 (FIFO) 行为指定对象,或者 ConcurrentStack<T> 为后进先出 (LIFO) 行为指定 ConcurrentQueue<T> 对象。 可使用实现 IProducerConsumerCollection<T> 接口的任何集合类。 BlockingCollection<T> 的默认集合类型为 ConcurrentQueue<T>

    不要直接修改基础集合。 使用 BlockingCollection<T> 方法添加或删除元素。 如果直接更改基础集合,对象 BlockingCollection<T> 可能会损坏。

    BlockingCollection<T> 在设计时未考虑到异步访问。 如果应用程序需要异步生成者/使用者方案,请考虑改用 Channel<T>

    ToImmutableDictionary<TSource,TKey,TValue>(IEnumerable<TSource>, Func<TSource,TKey>, Func<TSource,TValue>, IEqualityComparer<TKey>, IEqualityComparer<TValue>)

    枚举并转换序列,然后使用指定的键和值比较器生成其内容的不可变字典。

    AggregateBy<TSource,TKey,TAccumulate>(IEnumerable<TSource>, Func<TSource, TKey>, Func<TKey,TAccumulate>, Func<TAccumulate,TSource,TAccumulate>, IEqualityComparer<TKey>)

    为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻塞和限制功能。

    GroupBy<TSource,TKey,TElement,TResult>(IEnumerable<TSource>, Func<TSource, TKey>, Func<TSource,TElement>, Func<TKey,IEnumerable<TElement>, TResult>, IEqualityComparer<TKey>)

    根据指定的键选择器函数对序列中的元素进行分组,并且从每个组及其键中创建结果值。 通过使用指定的比较器对键值进行比较,并且通过使用指定的函数对每个组的元素进行投影。

    GroupJoin<TOuter,TInner,TKey,TResult>(IEnumerable<TOuter>, IEnumerable<TInner>, Func<TOuter,TKey>, Func<TInner,TKey>, Func<TOuter,IEnumerable<TInner>, TResult>)

    基于键值等同性对两个序列的元素进行关联,并对结果进行分组。 使用默认的相等比较器对键进行比较。

    GroupJoin<TOuter,TInner,TKey,TResult>(IEnumerable<TOuter>, IEnumerable<TInner>, Func<TOuter,TKey>, Func<TInner,TKey>, Func<TOuter,IEnumerable<TInner>, TResult>, IEqualityComparer<TKey>)

    基于键值等同性对两个序列的元素进行关联,并对结果进行分组。 使用指定的 IEqualityComparer<T> 对键进行比较。

    Join<TOuter,TInner,TKey,TResult>(IEnumerable<TOuter>, IEnumerable<TInner>, Func<TOuter,TKey>, Func<TInner,TKey>, Func<TOuter,TInner,TResult>, IEqualityComparer<TKey>)

    基于匹配键对两个序列的元素进行关联。 使用指定的 IEqualityComparer<T> 对键进行比较。

    即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅: https://aka.ms/ContentUserFeedback

    提交和查看相关反馈

  •