generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::Generic::IReadOnlyCollection<T>, System::Collections::ICollection
generic <typename T>
public ref class BlockingCollection : IDisposable, System::Collections::Generic::IEnumerable<T>, System::Collections::ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
[System.Runtime.InteropServices.ComVisible(false)]
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.Generic.IReadOnlyCollection<T>, System.Collections.ICollection
public class BlockingCollection<T> : IDisposable, System.Collections.Generic.IEnumerable<T>, System.Collections.ICollection
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.Versioning.UnsupportedOSPlatform("browser")>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface IReadOnlyCollection<'T>
interface ICollection
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
[<System.Runtime.InteropServices.ComVisible(false)>]
type BlockingCollection<'T> = class
interface seq<'T>
interface IEnumerable
interface ICollection
interface IDisposable
interface IReadOnlyCollection<'T>
type BlockingCollection<'T> = class
interface seq<'T>
interface ICollection
interface IEnumerable
interface IDisposable
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T), IReadOnlyCollection(Of T)
Public Class BlockingCollection(Of T)
Implements ICollection, IDisposable, IEnumerable(Of T)
await AddTakeDemo.BC_AddTakeCompleteAdding();
TryTakeDemo.BC_TryTake();
FromToAnyDemo.BC_FromToAny();
await ConsumingEnumerableDemo.BC_GetConsumingEnumerable();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
class AddTakeDemo
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
public static async Task BC_AddTakeCompleteAdding()
using (BlockingCollection<int> bc = new BlockingCollection<int>())
// Spin up a Task to populate the BlockingCollection
Task t1 = Task.Run(() =>
bc.Add(1);
bc.Add(2);
bc.Add(3);
bc.CompleteAdding();
// Spin up a Task to consume the BlockingCollection
Task t2 = Task.Run(() =>
// Consume the BlockingCollection
while (true) Console.WriteLine(bc.Take());
catch (InvalidOperationException)
// An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!");
await Task.WhenAll(t1, t2);
class TryTakeDemo
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
public static void BC_TryTake()
// Construct and fill our BlockingCollection
using (BlockingCollection<int> bc = new BlockingCollection<int>())
int NUMITEMS = 10000;
for (int i = 0; i < NUMITEMS; i++) bc.Add(i);
bc.CompleteAdding();
int outerSum = 0;
// Delegate for consuming the BlockingCollection and adding up all items
Action action = () =>
int localItem;
int localSum = 0;
while (bc.TryTake(out localItem)) localSum += localItem;
Interlocked.Add(ref outerSum, localSum);
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action);
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2));
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted);
class FromToAnyDemo
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
public static void BC_FromToAny()
BlockingCollection<int>[] bcs = new BlockingCollection<int>[2];
bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items
bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items
// Should be able to add 10 items w/o blocking
int numFailures = 0;
for (int i = 0; i < 10; i++)
if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++;
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures);
// Should be able to retrieve 10 items
int numItems = 0;
int item;
while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++;
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems);
class ConsumingEnumerableDemo
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
public static async Task BC_GetConsumingEnumerable()
using (BlockingCollection<int> bc = new BlockingCollection<int>())
// Kick off a producer task
var producerTask = Task.Run(async () =>
for (int i = 0; i < 10; i++)
bc.Add(i);
Console.WriteLine($"Producing: {i}");
await Task.Delay(100); // sleep 100 ms between adds
// Need to do this to keep foreach below from hanging
bc.CompleteAdding();
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
foreach (var item in bc.GetConsumingEnumerable())
Console.WriteLine($"Consuming: {item}");
await producerTask; // Allow task to complete cleanup
open System
open System.Collections.Concurrent
open System.Threading
open System.Threading.Tasks
module AddTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.Take()
// BlockingCollection<T>.CompleteAdding()
let blockingCollectionAddTakeCompleteAdding () =
task {
use bc = new BlockingCollection<int>()
// Spin up a Task to populate the BlockingCollection
let t1 =
task {
bc.Add 1
bc.Add 2
bc.Add 3
bc.CompleteAdding()
// Spin up a Task to consume the BlockingCollection
let t2 =
task {
// Consume consume the BlockingCollection
while true do
printfn $"{bc.Take()}"
with :? InvalidOperationException ->
// An InvalidOperationException means that Take() was called on a completed collection
printfn "That's All!"
let! _ = Task.WhenAll(t1, t2)
module TryTakeDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.TryTake()
// BlockingCollection<T>.IsCompleted
let blockingCollectionTryTake () =
// Construct and fill our BlockingCollection
use bc = new BlockingCollection<int>()
let NUMITEMS = 10000;
for i = 0 to NUMITEMS - 1 do
bc.Add i
bc.CompleteAdding()
let mutable outerSum = 0
// Delegate for consuming the BlockingCollection and adding up all items
let action =
Action(fun () ->
let mutable localItem = 0
let mutable localSum = 0
while bc.TryTake &localItem do
localSum <- localSum + localItem
Interlocked.Add(&outerSum, localSum)
|> ignore)
// Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
printfn $"Sum[0..{NUMITEMS}) = {outerSum}, should be {((NUMITEMS * (NUMITEMS - 1)) / 2)}"
printfn $"bc.IsCompleted = {bc.IsCompleted} (should be true)"
module FromToAnyDemo =
// Demonstrates:
// Bounded BlockingCollection<T>
// BlockingCollection<T>.TryAddToAny()
// BlockingCollection<T>.TryTakeFromAny()
let blockingCollectionFromToAny () =
let bcs =
new BlockingCollection<int>(5) // collection bounded to 5 items
new BlockingCollection<int>(5) // collection bounded to 5 items
// Should be able to add 10 items w/o blocking
let mutable numFailures = 0;
for i = 0 to 9 do
if BlockingCollection<int>.TryAddToAny(bcs, i) = -1 then
numFailures <- numFailures + 1
printfn $"TryAddToAny: {numFailures} failures (should be 0)"
// Should be able to retrieve 10 items
let mutable numItems = 0
let mutable item = 0
while BlockingCollection<int>.TryTakeFromAny(bcs, &item) <> -1 do
numItems <- numItems + 1
printfn $"TryTakeFromAny: retrieved {numItems} items (should be 10)"
module ConsumingEnumerableDemo =
// Demonstrates:
// BlockingCollection<T>.Add()
// BlockingCollection<T>.CompleteAdding()
// BlockingCollection<T>.GetConsumingEnumerable()
let blockingCollectionGetConsumingEnumerable () =
task {
use bc = new BlockingCollection<int>()
// Kick off a producer task
let producerTask =
task {
for i = 0 to 9 do
bc.Add i
printfn $"Producing: {i}"
do! Task.Delay 100 // sleep 100 ms between adds
// Need to do this to keep foreach below from hanging
bc.CompleteAdding()
// Now consume the blocking collection with foreach.
// Use bc.GetConsumingEnumerable() instead of just bc because the
// former will block waiting for completion and the latter will
// simply take a snapshot of the current state of the underlying collection.
for item in bc.GetConsumingEnumerable() do
printfn $"Consuming: {item}"
do! producerTask // Allow task to complete cleanup
let main =
task {
do! AddTakeDemo.blockingCollectionAddTakeCompleteAdding ()
TryTakeDemo.blockingCollectionTryTake ()
FromToAnyDemo.blockingCollectionFromToAny ()
do! ConsumingEnumerableDemo.blockingCollectionGetConsumingEnumerable ()
printfn "Press any key to exit."
Console.ReadKey(true) |> ignore
main.Wait()
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Imports System.Threading
Class BlockingCollectionDemo
Shared Sub Main()
AddTakeDemo.BC_AddTakeCompleteAdding()
TryTakeDemo.BC_TryTake()
ToAnyDemo.BC_ToAny()
ConsumingEnumerableDemo.BC_GetConsumingEnumerable()
' Keep the console window open in debug mode
Console.WriteLine("Press any key to exit.")
Console.ReadKey()
End Sub
End Class
Class AddTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.Take()
' BlockingCollection<T>.CompleteAdding()
Shared Sub BC_AddTakeCompleteAdding()
Using bc As New BlockingCollection(Of Integer)()
' Spin up a Task to populate the BlockingCollection
Using t1 As Task = Task.Factory.StartNew(
Sub()
bc.Add(1)
bc.Add(2)
bc.Add(3)
bc.CompleteAdding()
End Sub)
' Spin up a Task to consume the BlockingCollection
Using t2 As Task = Task.Factory.StartNew(
Sub()
' Consume the BlockingCollection
While True
Console.WriteLine(bc.Take())
End While
Catch generatedExceptionName As InvalidOperationException
' An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!")
End Try
End Sub)
Task.WaitAll(t1, t2)
End Using
End Using
End Using
End Sub
End Class
'Imports System.Collections.Concurrent
'Imports System.Threading
'Imports System.Threading.Tasks
Class TryTakeDemo
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.TryTake()
' BlockingCollection<T>.IsCompleted
Shared Sub BC_TryTake()
' Construct and fill our BlockingCollection
Using bc As New BlockingCollection(Of Integer)()
Dim NUMITEMS As Integer = 10000
For i As Integer = 0 To NUMITEMS - 1
bc.Add(i)
bc.CompleteAdding()
Dim outerSum As Integer = 0
' Delegate for consuming the BlockingCollection and adding up all items
Dim action As Action =
Sub()
Dim localItem As Integer
Dim localSum As Integer = 0
While bc.TryTake(localItem)
localSum += localItem
End While
Interlocked.Add(outerSum, localSum)
End Sub
' Launch three parallel actions to consume the BlockingCollection
Parallel.Invoke(action, action, action)
Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2))
Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted)
End Using
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' Bounded BlockingCollection<T>
' BlockingCollection<T>.TryAddToAny()
' BlockingCollection<T>.TryTakeFromAny()
Class ToAnyDemo
Shared Sub BC_ToAny()
Dim bcs As BlockingCollection(Of Integer)() = New BlockingCollection(Of Integer)(1) {}
bcs(0) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
bcs(1) = New BlockingCollection(Of Integer)(5)
' collection bounded to 5 items
' Should be able to add 10 items w/o blocking
Dim numFailures As Integer = 0
For i As Integer = 0 To 9
If BlockingCollection(Of Integer).TryAddToAny(bcs, i) = -1 Then
numFailures += 1
End If
Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures)
' Should be able to retrieve 10 items
Dim numItems As Integer = 0
Dim item As Integer
While BlockingCollection(Of Integer).TryTakeFromAny(bcs, item) <> -1
numItems += 1
End While
Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems)
End Sub
End Class
'Imports System.Threading.Tasks
'Imports System.Collections.Concurrent
' Demonstrates:
' BlockingCollection<T>.Add()
' BlockingCollection<T>.CompleteAdding()
' BlockingCollection<T>.GetConsumingEnumerable()
Class ConsumingEnumerableDemo
Shared Sub BC_GetConsumingEnumerable()
Using bc As New BlockingCollection(Of Integer)()
' Kick off a producer task
Task.Factory.StartNew(
Sub()
For i As Integer = 0 To 9
bc.Add(i)
' sleep 100 ms between adds
Thread.Sleep(100)
' Need to do this to keep foreach below from not responding.
bc.CompleteAdding()
End Sub)
' Now consume the blocking collection with foreach.
' Use bc.GetConsumingEnumerable() instead of just bc because the
' former will block waiting for completion and the latter will
' simply take a snapshot of the current state of the underlying collection.
For Each item In bc.GetConsumingEnumerable()
Console.WriteLine(item)
End Using
End Sub
End Class
BlockingCollection<T>
是提供以下内容的线程安全集合类:
生成者/使用者模式的实现;
BlockingCollection<T>
是 接口的
IProducerConsumerCollection<T>
包装器。
使用
Add
和
Take
方法并发添加和删除多个线程中的项。
一个边界集合,当集合已满或为空时阻止
Add
和
Take
操作。
Add
通过使用
CancellationToken
或
Take
方法中的
TryAdd
对象取消 或
TryTake
操作。
此类型实现
IDisposable
接口。 在使用完类型后,您应直接或间接释放类型。 若要直接释放类型,请在
try
/
catch
块中调用其
Dispose
方法。 若要间接释放类型,请使用
using
(在 C# 中)或
Using
(在 Visual Basic 中)等语言构造。 有关详细信息,请参阅
IDisposable
接口主题中的“使用实现 IDisposable 的对象”一节。 另请注意,
Dispose()
方法不是线程安全的。 的所有其他公共成员和受保护成员
BlockingCollection<T>
都是线程安全的,并且可以从多个线程并发使用。
IProducerConsumerCollection<T>
表示允许线程安全添加和删除数据的集合。
BlockingCollection<T>
用作实例的
IProducerConsumerCollection<T>
包装器,并允许从集合中阻止删除尝试,直到数据可用为止。 同样,可以创建 ,
BlockingCollection<T>
对 中允许
IProducerConsumerCollection<T>
的数据元素数强制实施上限;然后,对集合的添加尝试可能会受阻,直到有空间可用于存储添加的项。 以这种方式,
BlockingCollection<T>
类似于传统的阻塞队列数据结构,只不过基础数据存储机制被抽象化为
IProducerConsumerCollection<T>
。
BlockingCollection<T>
支持限制和阻塞。 边界意味着可以设置集合的最大容量。 在某些情况下,边界非常重要,因为它使你能够控制内存中集合的最大大小,并防止生成线程在消耗线程之前移动得太远。多个线程或任务可以同时向集合添加项,如果集合达到其指定的最大容量,则生成线程将阻塞,直到删除项。 多个使用者可以同时移除项,如果集合变空,则使用线程将发生阻塞,直到制造者添加某个项。 生成线程可以调用
CompleteAdding
方法以指示不再添加任何项。 使用者将监视
IsCompleted
属性以了解集合何时为空且不再添加项。
Add
和
Take
操作通常在循环中执行。 可以通过将 对象传递给
CancellationToken
TryAdd
或
TryTake
方法,然后在每次迭代中检查令牌的
IsCancellationRequested
属性的值来取消循环。 如果值为
true
,则由你通过清理任何资源并退出循环来响应取消请求。
创建
BlockingCollection<T>
对象时,不仅可以指定边界容量,还可以指定要使用的集合类型。 例如,可以为先入先出 (FIFO) 行为指定对象,或者
ConcurrentStack<T>
为后进先出 (LIFO) 行为指定
ConcurrentQueue<T>
对象。 可使用实现
IProducerConsumerCollection<T>
接口的任何集合类。
BlockingCollection<T>
的默认集合类型为
ConcurrentQueue<T>
。
不要直接修改基础集合。 使用
BlockingCollection<T>
方法添加或删除元素。 如果直接更改基础集合,对象
BlockingCollection<T>
可能会损坏。
BlockingCollection<T>
在设计时未考虑到异步访问。 如果应用程序需要异步生成者/使用者方案,请考虑改用
Channel<T>
。
ToImmutableDictionary<TSource,TKey,TValue>(IEnumerable<TSource>,
Func<TSource,TKey>, Func<TSource,TValue>, IEqualityComparer<TKey>,
IEqualityComparer<TValue>)
枚举并转换序列,然后使用指定的键和值比较器生成其内容的不可变字典。
AggregateBy<TSource,TKey,TAccumulate>(IEnumerable<TSource>, Func<TSource,
TKey>, Func<TKey,TAccumulate>, Func<TAccumulate,TSource,TAccumulate>,
IEqualityComparer<TKey>)
为实现
IProducerConsumerCollection<T>
的线程安全集合提供阻塞和限制功能。
GroupBy<TSource,TKey,TElement,TResult>(IEnumerable<TSource>, Func<TSource,
TKey>, Func<TSource,TElement>, Func<TKey,IEnumerable<TElement>,
TResult>, IEqualityComparer<TKey>)
根据指定的键选择器函数对序列中的元素进行分组,并且从每个组及其键中创建结果值。 通过使用指定的比较器对键值进行比较,并且通过使用指定的函数对每个组的元素进行投影。
GroupJoin<TOuter,TInner,TKey,TResult>(IEnumerable<TOuter>, IEnumerable<TInner>,
Func<TOuter,TKey>, Func<TInner,TKey>, Func<TOuter,IEnumerable<TInner>,
TResult>)
基于键值等同性对两个序列的元素进行关联,并对结果进行分组。 使用默认的相等比较器对键进行比较。
GroupJoin<TOuter,TInner,TKey,TResult>(IEnumerable<TOuter>, IEnumerable<TInner>,
Func<TOuter,TKey>, Func<TInner,TKey>, Func<TOuter,IEnumerable<TInner>,
TResult>, IEqualityComparer<TKey>)
基于键值等同性对两个序列的元素进行关联,并对结果进行分组。 使用指定的
IEqualityComparer<T>
对键进行比较。
Join<TOuter,TInner,TKey,TResult>(IEnumerable<TOuter>, IEnumerable<TInner>,
Func<TOuter,TKey>, Func<TInner,TKey>, Func<TOuter,TInner,TResult>,
IEqualityComparer<TKey>)
基于匹配键对两个序列的元素进行关联。 使用指定的
IEqualityComparer<T>
对键进行比较。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:
https://aka.ms/ContentUserFeedback
。
提交和查看相关反馈