C# 并行编程

本文最后更新于 2021年4月4日 晚上

摘要: 这篇主要是听课时的笔记整理出来的, 关于如何在 C# 中进行并行编程.

语法糖

在开始正题之前, 先来看看 C# 中的一些语法糖, 使用它们可以大大加快开发效率.

先来看看编程时经常会用到的两种语言成分. 它们是 属性(Property)域(Field), 其中属性用于公共访问. 域用于私有访问.

属性的实现方式主要有两种, 一种是使用域 + 手动实现的 getter/setter 来实现属性, 另外一种是利用 C# 中的自动属性机制:

1
2
// 属性的存储空间会自动被开辟出来.
public Int MyValue { set; get; }

编译器会自动生成内部的域以及 setter 和 getter.

另外在 C# 中有一种隐式类型 var, 即自动类型推导, 其是在编译器帮助下实现的.

匿名类型和对象: 类似 JSON 的语法, 只是冒号换成等号. 匿名对象是只读的, 且两个匿名对象中只要是所有的属性都相同, 则它们的类型相同. 本质上也是 C# 编译器帮助生成的.

对象的初始化, 语法可以使用 new Type { // 属性初始化 }, 对于集合也可以这样, 或者是内置的数组.

Lambda 表达式: 和委托密切相关, 因为 Lambda 表达式的类型就可以通过委托来定义(delegate).

老版本的 C# 2.0 中可以用匿名方法实现委托: delegate(参数表) { ... }

C# 3.0 中可以使用 Lambda 表达式, 只是语法上简单了很多, 实际上是编译器编译为匿名方法来实现的.

大大简化了委托的编写, 可以写出优美的代码.

可以使用系统提供的委托类型, 无需自定义:

  • Action<>

  • Func<>

上述两个泛型在 TPL 和服务端编程中都有广泛的应用.

TPL 任务并行库

TPL 是程序异步操作的基础.

先来看如何测量 C# 代码的执行时间.

加速系数: S(p) = 单核执行时间/p个核并行执行时间

在 C# 中可以使用 StopWatch 来测量并行操作的执行时间, 在 System.Diagnostics 名空间中:

1
2
3
4
5
Stopwatch sw = new Stopwatch();
sw.Start();
System.Console.WriteLine("hello");
sw.Stop();
System.Console.WriteLine(sw.Elapsed);

输出的话就是:

1
00:00:00.0069270

实际上可以发现, 这样的操作还是非常耗费时间的.

另外并行计算并非一定比串行快! 因为在线程切换或者创建的时候可能会造成非常大的开销.

需要对任务合理分解!

任务划分过细, 可能会出现反效果!

故总结如下:

  1. 并行编程时要选择一个合适的任务分解方案.
  2. 线程的同步, 通信, 缓冲等都需要考虑.
  3. 需要在具体的软硬件环境中去具体测试, 选择最佳方法, 指标可以使用加速系数来衡量对比.

TPL 中的两个类

实际的执行都是通过创建任务, 任务由调度器交给线程池中的线程去执行, 最后获取结果.

Parallel 类

TPL 提供的类1: Parallel.

有三个方法:

  • Parallel.Invoke():并行执行代码, 其中的所有 Action 执行完毕后再往下走.

  • Parallel.For():并行循环

  • Parallel.ForEach():并行迭代

TPL 的使用场景一般是数据的分区和并行处理.

用 StopWatch 来看, 实际上某些场景下多线程并没有单线程快. 故需要看当前场景针对性验证并选择.

Task 类

TPL 提供的类2: Task

Task 的创建和运行: Task 的实例就代表了一个可以并行执行的任务.

创建方式1: 直接 new, 然后 start.

1
2
3
4
5
6
7
8
9
private static void RunTaskWithNew()
{
// 创建 Task 对象并携带任务
var task = new Task(() => {
System.Console.WriteLine("开始执行");
});
// 开始执行
task.Start();
}

创建方式2: 直接创建并执行

1
2
3
4
5
6
7
8
9
10
11
private static void RunTaskWithFactory()
{
Task.Factory.StartNew(() => {
var result = 0;
for (int i = 0; i < 10000; ++i)
{
result += i;
}
System.Console.WriteLine(result);
});
}

创建方式3 Run: .NET 4.5 之后为了替代 Factory, 推荐采用

1
2
3
4
5
6
private static void RunTask()
{
Task.Run(() => {
System.Console.WriteLine("直接Run");
});
}

延迟任务的执行:

在 TPL 中提供有 Delay 方法:

1
2
3
4
5
6
7
8
9
10
11
12
private static void DelayTaskExecution()
{
// 仅演示, 实际开发中不会这样用.
Task.Delay(1000).Wait();
Task.Run(() => {
int[] arr = {1, 2, 3, 4, 5};
foreach (var i in arr)
{
System.Console.WriteLine(i);
}
});
}

实际上 Delay 方法返回的是一个任务对象, 它主要用在有 async/await 的方法中, 特点就是它也不会阻塞调用者线程, 同时可以让异步方法等待指定的时间.

实际开发时, 使用一般都是 await Task.Delay(5000) 这样的形式.

同时, async 方法转同步直接在其上调用 Wait() 即可. 因为本身要同步的话意味着阻塞当前线程, 要这样的话就只能等上述任务结束后再继续下面的工作.

取回 Task 的执行结果

主要有三种方式可以取回结果:

  1. 传统方式: 利用线程同步对象

  2. 利用 Task<T>.Result 以阻塞的方式取回结果

  3. 利用 Task.ContinueWith 以回调的方式取回结果

其中后面两种是当前常用的.

对于异步方法, 用得最多的还是 ContinueWith.

1
2
3
4
5
6
7
8
var completion = //...

var task1 = new Task<long>(() => {
//...
});

task1.ContinueWith(completion);
task1.Start();

这样的方式就实现了非阻塞的回调方式取回执行结果.

在 C#5.0 以及 .NET 4.5 之后, 引入了 async/await 实现的就是和这个相同的目的, 且编程更加简洁.

目前在 swift 中仍然沿用的是回调的方式, 看来可以学习一下更新的东西.

任务间协作

  1. 通过 ContinueWith 创建收尾相继的任务:

    1
    2
    3
    4
    5
    Task.Run(() => {
    System.Console.WriteLine("任务1 执行");
    }).ContinueWith((preTask) => {
    System.Console.WriteLine("任务2执行");
    });
  2. 可以使用 ContinueWith 中的参数来在指定条件下继续下一个任务, 方式类似如下:

    1
    2
    3
    4
    5
    Task.Run(() => {
    System.Console.WriteLine("任务1 执行");
    }).ContinueWith((task) => {
    System.Console.WriteLine("任务2执行");
    }, TaskContinuationOptions.OnlyOnRanToCompletion);
  3. 父子类型的任务关系:

    这里有一个特点, 比如父任务 A 内部启动了两个子任务 B 和 C, 则三个任务是并行执行的.

    如果想让父任务等待子任务完成, 就需要使用阻塞式的等待了.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Task.Run(() => {
    System.Console.WriteLine("父任务执行");
    var task1 = Task.Run(() => {
    System.Console.WriteLine("子任务1执行");
    });
    var task2 = Task.Run(() => {
    System.Console.WriteLine("子任务2执行");
    });
    Task.WaitAll(task1, task2);
    System.Console.WriteLine("父任务末尾");
    });
  4. 任务的相互等待:

    • Wait 方法: 等待任务结束后再执行后续任务
    • WaitAll: 等待所有任务结束
    • WaitAny: 等待任意任务结束

任务异常处理

异常处理中牵扯到一个异常表示类, TPL 将当前所有任务中出现的异常放入 AggragateException 类中, 通过这个类就可以获取到所有未捕捉的异常并进行处理.

1
2
3
4
5
6
7
8
9
10
11
try
{
Task.Run(() => {
System.Console.WriteLine("抛出异常");
throw new Exception("随意");
}).Wait();
}
catch (AggregateException ex)
{
System.Console.WriteLine(ex.Flatten());
}

需要注意的是, 在 try - catch 中的任务需要 wait 才能够捕捉到异常, 否则 try-catch 已经跳过了.

线程的统一取消模型

这里的确是直接使用 Thread 类来作为取消的目标.

如果手动模拟的话, 可以使用一个标志位, 如果外界设置了标志位, 在任务的检测点上, 就可以读取标志位并 return, 从而达到取消执行的效果.

volatile 关键字: 给编译器说明某个变量不要去优化, 让其始终保持在内存中去给外界读写.

实际上线程执行的取消就是在任务的执行过程中设置若干的检测点, 当在检测点时遇到外界将标志设置为取消时, 就取消操作.

可以被取消的任务中包含了 CancellationToken, 而这个是通过 CancellationTokenSource 创建的, 传入 CancellationTokenSource 中的 CancellationToken, 检测点就是去检测这个 CancellationToken 对象的 IsCancellationRequested 属性. 外界调用 CancellationTokenSource 的 Cancel 方法, 就可以引起 CancellationToken 对象的 IsCancellationRequested 属性为 true, 从而在检测点就可以取消往下执行, 清理资源并返回.

Task(任务)的取消

并行任务中多个任务只要有一个完成就可以取消其他任务的运行.

有时用户也希望手动停止某个 Task 的执行.

任务的取消也是通过 “线程统一取消模型” 完成的.

在 Task 的构造方法中, 有一个参数接收 CancellationToken 类型的对象.

同样设置检测点即可.

如果想取消多个任务, 在多个任务中设置检测点, 且使用同一个 cancellationToken 对象就能达到目的.

如果需要取消 Task 后, Task 的状态是 Cancelled, 则需要在检测点中去调用 CancellationToken 的 IsCancellationRequested 方法.

取消请求时, 如果收到的令牌和传入的令牌不是同一个, 则可以不用关心, 或者是可以去对应处理.

任务并行库的原理

任务并行库的核心是 Task 类.

Parallel 类的实现实际也是依赖 Task, 编译器会将 Parallel 对应编译为对 Task 类对象的属性或方法存取的 IL 代码.

任务的执行依赖于线程池, 任务被任务并行库调度.

原理主要是:

  1. 每个 .net 进程都可以访问一个线程池, 线程池本身关联了一个任务队列(全局), 代码中创建的任务被提交到全局任务队列中等待调度.

  2. 调度器根据每个线程的任务情况, 均衡地将任务调度给每个线程执行.

  3. 由于任务又可能产生子任务, 这时子任务就不会再放入全局队列中了, 而是放到该线程对应的本地任务队列中执行, 即新的子任务填充到本地任务队列.

  4. 如果某个线程当前工作已经执行完毕, 且全局队列中没有等待的任务, 同时其他线程的本地任务队列中有等待的任务, 则 .NET 4.0 后是通过 “任务窃取”, 将其他线程本地任务队列中等待的任务拿给空闲的线程去执行. 这样就保证每个线程的工作负载都比较均衡.

异步编程模式 async/await

异步编程模式建立在 TPL 的基础上.

比如一个典型的异步方法定义:

1
2
3
4
5
6
private static async Task DoItsJob()
{
System.Console.WriteLine("准备调用");
await Task.Run(() => DoLongRunningJob());
System.Console.WriteLine("完成调用");
}

在 await 前面的部分会在调用者线程执行, 而 await 部分会跑到其他线程中执行, 并且不会阻塞调用者线程(相当于返回了), 等到 await 执行结束, 又会继续执行后续的代码, 类似于执行回调.

故可以将异步方法分为三个部分:

  • 同步部分(await 之前): 调用者线程执行

  • await 部分: 其他线程执行

  • 回调部分(await 之后): 其他线程执行, 相当于 ContinueWith

如果异步方法调用前面不加 await, 也是可以运行的…只是会有警告.

另外异步方法最终执行的还是 Task, 即由 Task 封装的任务.

下面是一个例子:

1
2
3
4
5
// 语句块1...
await xxx // await 1
// 语句块2...
await yyy // await 2
// 语句块3...

一个 await 和它下方的语句块是在同一个线程中执行的, 测试的时候是这样的.

要取消异步方法, 同样是使用线程统一取消模型的一套操作.

异常捕获时, 是在访问任务的 Result 的时候才会抛出异常.

总结下来就是: 异步方法的异常捕获, 只有在 await 和 读取 Task.Result 的时候才会被抛出.

这里有一些编程规范:

  • 如果是异步方法, 则需要在其名字末尾添加 Async, 且其声明有 async 关键字, 内部也有 await 关键字, 返回值是 TaskTask<Result>.

  • 如果内部不需要 await, 则直接返回 Task 即可.

即在最底层的任务创建就不需要 async 声明了. 直接返回 Task 相关返回值. 在这一层上的调用者需要执行 Task 时, 就需要使用 async 和 await, 且注意方法命名.

避免出现多个连续的 await.

线程创建开销

在线程创建和切换时有许多开销, 主要是时间开销和空间开销.

时间开销: 在 iOS 中创建约 90 微秒.

空间开销: 内核空间中大约要创建 1KB 空间, 而用户空间中是 4KB 的整数倍, 可能最小 1M(主线程), 512KB(后台线程).

而线程的上下文切换也需要很大开销(微观上).

上下文切换时, 操作系统会将 CPU 对应核心上的之前的上下文保存下来, 并装入当前要执行的线程的上下文.

而一般来说, 在开发框架中都提供有线程上下文对象, 这个对象具有 POST 方法, 这个 Post 方法实际上就是在 await 结束之后起作用的, 也解释了为什么 await 和之后的代码执行是在同一个线程:

在 await 结束后, 调用 post, 它之后的代码会被打包推送到之前捕获的上下文中去执行.

这里再次进行试验, 在只有单个 await 的情况下, 的确是await 之后的代码会在另外一个线程中去执行, 但并非是之前的线程.

但在 WPF 这些框架中, UI 线程的上下文在 await 时的确被入栈了, 等 await 结束后再出栈, 故 await 后的代码仍然是在 UI 线程中执行的.

而 ASP.NET Core 中的是在 await 前捕捉到的线程中去执行 await 之后的那些代码. 准备验证.

另外在 console 程序中出现的情况是 await 之后的代码有可能在 await 的执行线程中继续执行, 这样就避免了线程切换带来的开销. 估计这样的是一个优化吧. 但默认情况下是在不同的线程中去执行的.

故:

多用线程池(重用), 不要创建过多线程! 不要认为多线程一定比单线程执行快! 异步方法的 IL 指令复杂得多!!!

区分并行编程和异步编程说的目标: CPU 密集型, I/O 密集型

CPU 密集型可以直接使用 TPL 完成工作.

I/O 密集型可以使用 await/async 完成.


C# 并行编程
https://blog.rayy.top/2018/09/11/2019-ParallelInCSharp/
作者
貘鸣
发布于
2018年9月11日
许可协议