1.并行集合 1.并行集合 a. 在C#中, 并行集合( Concurrent Collections) 是. NET 为多线程/ 并行编程设计的线程安全集合 b. 位于System. Collections. Concurrent命名空间, 专门解决普通集合( 如List< T> 、Dictionary< TKey, TValue> ) 在多线程 读写时的线程安全问题( 普通集合非线程安全,多线程操作会导致数据错乱) 1 ) . ConcurrentDictionary < TKey, TValue> ( 最常用) a. 线程安全的字典, 是多线程场景下Dictionary的直接替代, 核心优势是原子操作方法 b. 避免手动加锁实现"检查 - 添加" , "更新" 等复合操作)using System ; using System. Collections. Concurrent ; using System. Threading. Tasks ; class ConcurrentDictDemo { static void Main ( ) { // 初始化并发字典 var concurrentDict= new ConcurrentDictionary< int , string > ( ) ; // 并行循环(多线程)添加元素 Parallel. For ( 0 , 100 , i=> { // 原子操作:不存在则添加,存在则返回已有值 concurrentDict. GetOrAdd ( i, $"Value_ { i } " ) ; // 原子操作:尝试更新(只有当前值匹配时才更新) concurrentDict. TryUpdate ( i, $"Updated_ { i } " , $"Value_ { i } " ) ; } ) ; // 多线程查询 Parallel. For ( 0 , 100 , i=> { if ( concurrentDict. TryGetValue ( i, out var value ) ) { Console. WriteLine ( $"Key: { i } , Value: { value } " ) ; } } ) ; } } 2 ) . ConcurrentQueue< T> 线程安全的FIFO队列, 适合"生产者线程添加任务,消费者线程处理任务" 的场景( 如后台任务池) using System ; using System. Collections. Concurrent ; using System. Threading ; using System. Threading. Tasks ; class ConcurrentQueueDemo { static void Main ( ) { var queue= new ConcurrentQueue< int > ( ) ; var cts= new CancellationTokenSource ( ) ; // 生产者线程:持续添加数据 Task producer= Task. Run ( ( ) => { int i= 0 ; while ( ! cts. Token. IsCancellationRequested) { queue. Enqueue ( i++ ) ; Console. WriteLine ( $"生产: { i- 1 } " ) ; Thread. Sleep ( 100 ) ; } } ) ; // 消费者线程:持续消费数据 Task consumer= Task. Run ( ( ) => { while ( ! cts. Token. IsCancellationRequested) { if ( queue. TryDequeue ( out int value ) ) { Console. WriteLine ( $"消费: { value } " ) ; } Thread. Sleep ( 150 ) ; } } ) ; // 运行5秒后停止 Thread. Sleep ( 5000 ) ; cts. Cancel ( ) ; Task. WaitAll ( producer, consumer) ; } } 3 ) . BlockingCollection < T> ( 增强版生产者- 消费者) 封装了ConcurrentQueue/ ConcurrentBag等底层集合, 提供阻塞操作( "无数据时消费者阻塞, 队列满时生产者阻塞" ) 和边界 限制( 限制集合最大容量) , 是生产者- 消费者场景的一站式解决方案using System ; using System. Collections. Concurrent ; using System. Threading ; using System. Threading. Tasks ; class BlockingCollectionDemo { static void Main ( ) { // 初始化:底层用ConcurrentQueue,最大容量10 var blockingCollection= new BlockingCollection< int > ( new ConcurrentQueue< int > ( ) , 10 ) ; var cts= new CancellationTokenSource ( ) ; // 生产者(2个线程) Parallel. For ( 0 , 2 , producerId=> { int i= 0 ; while ( ! cts. Token. IsCancellationRequested) { int value = producerId* 1000 + i++ ; // 队列满时会阻塞,直到有空间 blockingCollection. Add ( value , cts. Token) ; Console. WriteLine ( $"生产者 { producerId } 添加: { value } " ) ; Thread. Sleep ( 200 ) ; } } ) ; // 消费者(3个线程) Parallel. For ( 0 , 3 , consumerId=> { try { // 无数据时阻塞,直到有数据或完成添加 foreach ( var value in blockingCollection. GetConsumingEnumerable ( cts. Token) ) { Console. WriteLine ( $"消费者 { consumerId } 处理: { value } " ) ; Thread. Sleep ( 300 ) ; } } catch ( OperationCanceledException ) { Console. WriteLine ( $"消费者 { consumerId } 停止" ) ; } } ) ; // 运行10秒后停止 Thread. Sleep ( 10000 ) ; cts. Cancel ( ) ; blockingCollection. CompleteAdding ( ) ; // 标记“添加完成”,消费者遍历结束 } }