c#里有好几个「多线程」实现,到底用哪个更合理,带着这个问题,一起来学习一下多线程吧。 首先是Joseph Albahari的Threading in C# 的翻译。
概念
c# 通过多线程来实现代码并行运行,一个线程相对独立,多个线程可以同时一起运行。
一个c#程序自动运行在一个主线程(main)里,可以通过增加线程实现多线线程。例如:
class ThreadTest
{
static void Main()
{
Thread t = new Thread (WriteY); //新建一个线程,指定线程运行WriteY函数
t.Start(); // 线程执行,WriteY开始执行
for (int i = 0; i < 1000; i++)
Console.Write ("x"); // 同时,主线程也可以做一些其他事情
}
static void WriteY()
{
for (int i = 0; i < 1000; i++) Console.Write ("y");
}
}
主线程里新建了线程t,在线程t里运行函数WriteY打印字母 y 。同时主线程打印字母 x :
xxxxxxxxxxyyyyyyyyxxxyxyxyx
xyyyyyyyyxxxyxyxyxxyyyyyyyy
当线程启动后,其IsAlive属性为true,直至线程结束。
当线程构造函数中传入的delegate执行完成,线程就结束了,并且不会被重启。
下面的例子里,一个有局部变量的方法在主线程和子线程中同时调用:
tatic void Main()
{
new Thread (Go).Start(); // Call Go() on a new thread
Go(); // Call Go() on the main thread
}
static void Go()
{
// Declare and use a local variable - 'cycles'
for (int cycles = 0; cycles < 5; cycles++) Console.Write ('?');
}
每个线程都独立的内存栈,所以结果可以看到是10个 ? 。
??????????
多个线程也可以同时引用一个对象,如:
class ThreadTest
{
bool done;
static void Main()
{
ThreadTest tt = new ThreadTest(); // Create a common instance
new Thread (tt.Go).Start();
tt.Go();
}
// Note that Go is now an instance method
void Go()
{
if (!done) { done = true; Console.WriteLine ("Done"); }
}
}
两个线程都调用了同一个对象的 Go(), 所以字段done只有一个,结果 Done 只打印了一次。
Done
这两个例子也说明一个关键概念:线程安全。程序的输出是不确定的,有可能输出两个 Done。 如果改一下Go方法的顺序,Done 就会被又打印出来一次。
static void Go()
{
if (!done) { Console.WriteLine ("Done"); done = true; }
}
问题是当判断if条件时,另个正在执行WriteLine,还没有来得及把 done 设置为 true。
补救的方法是使用排它锁 exclusive lock。
lass ThreadSafe
{
static bool done;
static readonly object locker = new object();
static void Main()
{
new Thread (Go).Start();
Go();
}
static void Go()
{
lock (locker)
{
if (!done) { Console.WriteLine ("Done"); done = true; }
}
}
}
当两个线程同时竞争一个锁,那一个线程需要阻塞等待锁可用。这保证了只有一个线程执行在执行这段代码。 这种多线程环境下的避免不确定性的方法,就是线程安全的。
当线程被阻塞时,不会占用CPU资源。
Join和Sleep
如果想等一个线程结束,可以调用这个线程的t.Join()方法。
static void Main()
{
Thread t = new Thread (Go);
t.Start();
t.Join();
Console.WriteLine ("Thread t has ended!");
}
static void Go()
{
for (int i = 0; i < 1000; i++) Console.Write ("y");
}
结果会打印1000次 y, 然后打印 Thread t has ended! . t.Join() 还可以加上参数,表示在线程执行一段时间后,继续执行后续代码。
Thread.Sleep是暂停当前线程一段时间。
在等待 Sleep 或 Join 时,线程被阻塞,所以不会耗用CPU的资源。
Thread.Sleep(0) 和 Thread.Yield() 会将当前线程交出CPU,让CPU处理其他线程。 可以用来更加高深的性能调优,同时也可以测试是否有线程安全的bug。
线程的工作原理
多线程是由线程调度器进行管理。CLR的调度器是操作系统代理函数。 线程调度器保证所有活动的线程分配合适的执行时间,并且线程等待和阻塞不会占用CPU。 在多处理器的计算机上,不同的线程同时在不同CPU上运行,多线程就是混合时间切片,并做到可靠同步。
线程和进程
不多说了,计算机上运行多个进程,一个进程并行多个线程。进程是完全独立的,线程相对独立,但是可以共享内存。
线程的正确使用和错误使用
多进程有很多应用,经常使用在下面的场合:
- 保持一个可响应的用户界面
- 让遭遇各种阻塞的CPU能高效利用
- 并行执行
- 特殊的执行方式,提前加载、预加载
- 让多个请求同时执行
多线程增加了工作的复杂性,线程交互导致开发周期长和各种bug。 因此要注意做到尽量少的交互,并使用已知可用的模式。
多线程在调度和切换时使用了额外的资源。多线程也不总是加快执行命令。
创建并启动线程
前面已经提到,线程使类Thread类构造,传入参数 ThreadStart 定义了线程开始位置。
public delegate void ThreadStart();
调用线程的Start方法使线程开始执行,直到方法返回后,线程结束。
class ThreadTest
{
static void Main()
{
Thread t = new Thread (new ThreadStart (Go));
t.Start(); // Run Go() on the new thread.
Go(); // Simultaneously run Go() in the main thread.
}
static void Go()
{
Console.WriteLine ("hello!");
}
}
在上面例子中,线程 t 执行 Go( ) 的同时,主线程也调用了 Go( )。几乎同时打印出两个hello
hello!
hello!
还可以用更简单的写法,不用定义delegate,直接出入一个方法,让C#自己推断出ThreadStart代理。
Thread t = new Thread (Go); // No need to explicitly use ThreadStart
还有更更简单的,直接使用lambda表达式或匿名方法:
static void Main()
{
Thread t = new Thread ( () => Console.WriteLine ("Hello!") );
t.Start();
}
向线程中传入数据
最简单的方法是使用lambda表达式,在lambda里调用的方法并传入合适的参数:
static void Main()
{
Thread t = new Thread ( () => Print ("Hello from t!") );
t.Start();
}
static void Print (string message)
{
Console.WriteLine (message);
}
也可以在Start的时候传入参数:
static void Main()
{
Thread t = new Thread (Print);
t.Start ("Hello from t!");
}
static void Print (object messageObj)
{
string message = (string) messageObj; // We need to cast here
Console.WriteLine (message);
}
因为线程Thread的构造函数可以接收下面的任一个代理
public delegate void ThreadStart();
public delegate void ParameterizedThreadStart (object obj);
这里有一个限制,就是ParameterizedThreadStart只能传入一个参数,并且类型是object,需要自己做转换。
Lambda表达式和捕获变量
使用强大的lambda表达式可以给线程出入参数。然而必须小心,在线程开始前修改了捕获变量的值,那结果就麻烦了。
for (int i = 0; i < 10; i++)
new Thread (() => Console.Write (i)).Start();
这段程序输出是不确定的,如:
0223557799
问题出在变量 i 在循环中指向同一处内存。因此每个线程打印出来的值是变化的。 解决方法是使用一个临时变量
for (int i = 0; i < 10; i++)
{
int temp = i;
new Thread (() => Console.Write (temp)).Start();
}
变量temp是每个循环内的局部变量,每个线程捕获到的是不同内存的变量。 用更浅显的例子说明一下:
string text = "t1";
Thread t1 = new Thread ( () => Console.WriteLine (text) );
text = "t2";
Thread t2 = new Thread ( () => Console.WriteLine (text) );
t1.Start();
t2.Start();
由于lambda表达式捕获的是同样的text变量,所以”t2” 打印了两次
t2
t2
线程命名
每个线程有个 Name 属性,可以用来调试。在Visual Studio中调试时可以显示出来线程名字。 Name属性只能设置一次,如果想再次修改会抛出异常。
静态的 Thread.CurrentThread 属性可以获得当前执行的线程:
class ThreadNaming
{
static void Main()
{
Thread.CurrentThread.Name = "main";
Thread worker = new Thread (Go);
worker.Name = "worker";
worker.Start();
Go();
}
static void Go()
{
Console.WriteLine ("Hello from " + Thread.CurrentThread.Name);
}
}
前台线程和后台线程
默认创建的线程是前台线程。只要有正在运行的前台线程,程序就一直活着。后台线程则不然。 当所有的前台线程执行完,程序就会结束,所有运行中的后台进程都会意外中止。
可以通过 IsBackground 属性来设置线程 是否是后台进程。
class PriorityTest
{
static void Main (string[] args)
{
Thread worker = new Thread ( () => Console.ReadLine() );
if (args.Length > 0) worker.IsBackground = true;
worker.Start();
}
}
在上面的例子中,如果程序运行时,如果没有输入参数,那worker进程是前台进行,就会停在ReadLine等待用户按下回车键。此时主线程执行完了退出,但是程序依然在运行。
而如果Main输入一个参数,worker线程就变成后台线程,主线程执行完成后,程序就退出了,worker的ReadLine被中止。
当一个线程被这样中止,在后台线程中的finally代码会被绕过。这样就有个问题,如果在finally(或者using)里写了释放资源或删除文件等操作,那就不会被执行了。 为了避免这个问题,需要保证等待这些线程执行完了后在退出程序,由两个方法:
- 如果是自己创建的线程,可以使用线程的Join方法
- 如果是线程池的线程,使用event wait handle
不管用那种方法,最好设置上超时时间,抛弃那些无法完成的线程,这样可以保证最终程序可以被关闭。 前台线程不需要这样的处理,但是最好针对无法结束的情况做处理。有时候程序不能退出的原因就是前台线程一直在运行没有结束。
线程优先级
线程的优先级决定了操作系统分配给他的执行时间。 优先级的枚举:
enum ThreadPriority { Lowest, BelowNormal, Normal, AboveNormal, Highest }
优先级只有在多线程并行的情况下有意义。 提高线程的优先级,并不能让线程一直工作,因为他还受到进程优先级的约束。略去一千个字。。。
处理异常
创建线程时使用 try/catch/finally ,不会捕捉到线程执行中的异常。
public static void Main()
{
try
{
new Thread (Go).Start();
}
catch (Exception ex)
{
// We'll never get here!
Console.WriteLine ("Exception!");
}
}
static void Go() { throw null; } // Throws a NullReferenceException
try/catch 部分不起作用,新创建的线程被 NullReferenceException 连累。 正确的做法是将错误处理移到Go方法里
public static void Main()
{
new Thread (Go).Start();
}
static void Go()
{
try
{
throw null; // The NullReferenceException will get caught below
}
catch (Exception ex)
{
// Typically log the exception, and/or signal another thread
// that we've come unstuck
}
}
和你在main线程最高层上进行trycatch一样,所有的线程都要有异常处理。未处理的异常会让整个程序垮掉。 有些情况你不用处理异常,.NET Framework已经帮忙处理好了:
- 异步代理
- BackgroundWorker
- TPL Task Parallel Library
线程池
当开始一个新的线程,几千微秒被用来组织新的私有变量栈。每个线程大概需要1M内存。 线程池 Thread Pool 通过共享和循环利用线程来降低开销,这使得多线程高可靠并且没有性能损失。 这在多处理器的电脑上使用分而治之的方法做并行运算时,非常有效果。
线程池限定了可同时运行的线程数量。太多活动的线程会加重线程管理的负担,降低CPU缓存的效率,从而阻塞操作系统。在线程池中,当达到一定数量后,任务就要等待,在别的任务完成后才能启动。
有几个方法来使用进程池:
- 使用TPL
- 调用 ThreadPool.QueueUserWorkItem
- 使用异步代理
- 使用BackgroundWorker
TPL和PLINQ是相当强大和高级的,即使没有引入进程池,也可以协助你完成多线程,稍后会讨论到。 现在要看简单看一下如何使用 Task 类,实现在池化线程上执行代理。
可以通过 Thread.CurrentThread.IsThreadPoolThread 属性来检查当前线程是否是池化线程。
通过TPL来实现线程池
使用Task类可以快速进入线程池。Task类是 Framework 4.0 加入的。可以这么看,Task用来替代ThreadPool.QueueUserWorkItem,而泛型 Task<TResult> 替代 asynchronous delegates。
使用非泛型的Task,直接调用Task.Factory.StartNew,传入代理方法:
static void Main()
{
Task.Factory.StartNew (Go);
}
Task.Factory.StartNew 返回一个Task对象,使用这个对象来监视任务,如调用Wait来等待任务完成。
注意:如果调用了Wait方法,未处理的异常会被抛到宿主(调用Wait的)线程。如果不是调用Wait,而是任由其自动执行,那么未处理的异常就会是整个程序停止。
泛型Task<TResult> 是Task的子类,使用它可以在task运行完成后得到一个返回值。下面的例子中,使用了Task<TResult> 来下载一个网页:
static void Main()
{
// Start the task executing:
Task<string> task = Task.Factory.StartNew<string>
( () => DownloadString ("http://www.linqpad.net") );
// We can do other work here and it will execute in parallel:
RunSomeOtherMethod();
// When we need the task's return value, we query its Result property:
// If it's still executing, the current thread will now block (wait)
// until the task finishes:
string result = task.Result;
}
static string DownloadString (string uri)
{
using (var wc = new System.Net.WebClient())
return wc.DownloadString (uri);
}
未处理的异常在查询返回值Result属性时,被包装到AggregateException里并转发。然而如果没有到检查结果,那么未处理的异常会让进程down掉。
TPL是个好东西,特别适合多核心处理器,后续还有介绍。
不通过TPL来实现线程池
如果使用的是低版本的.NET Framework,就没法用TPL啦。那就要用两个老东西来实现进程池:ThreadPool.QueueUserWorkItem 和 asynchronous delegates。二者的区别就是后者会有返回值,并且未处理的异常会同步返回给调用者。
QueueUserWorkItem
使用 QueueUserWorkItem,简单的通过传入一个代理就可以:
static void Main()
{
ThreadPool.QueueUserWorkItem (Go);
ThreadPool.QueueUserWorkItem (Go, 123);
Console.ReadLine();
}
static void Go (object data) // data will be null with the first call.
{
Console.WriteLine ("Hello from the thread pool! " + data);
}
输出结果:
Hello from the thread pool!
Hello from the thread pool! 123
目标函数Go必须有个object参数。和ParameterizedThreadStart那样,可以传参数进去。但是使用QueueUserWorkItem不会有返回对象来帮助查看执行状态,而且未处理的异常也会导致程序退出。
Asynchronous delegates
QueueUserWorkItem没有提供一个获取线程返回值的机制,异步代理可以双向传入传出多个参数,并且未处理异常也不会返回给原线程(更准确的说,叫EndInvoke线程)。
使用的方法如下:
- 实例化一个希望并行处理的目标函数,通常是 Func 代理
- 调用代理的 BeginInvoke,并保存 IAsyncResult 返回值 BeginInvoke会立即返回给调用方,调用方可以在线程池运行的同时,再继续执行其他任务。
- 当需要结果时,调用代理的EndInvoke,并传入上一步得到的 IAsyncResult 对象
下面的例子会得到传入字符串的长度:
static void Main()
{
Func<string, int> method = Work;
IAsyncResult cookie = method.BeginInvoke ("test", null, null);
// ... here's where we can do other work in parallel...
int result = method.EndInvoke (cookie);
Console.WriteLine ("String length is: " + result);
}
static int Work (string s) { return s.Length; }
EndInvoke做了三件事情:
- 首先要等待代理执行完成(加入当时还没有完成的话)
- 接收到返回值
- 抛出任何未处理的异常
注意:如果没有调用EndInvoke,编译器不会出错,但是运行时有可能出问题。
可以定义个回调函数来实现EndInvoke,回调函数会在线程完成后自动执行。当然有一些额外的工作:
static void Main()
{
Func<string, int> method = Work;
method.BeginInvoke ("test", Done, method);
}
static int Work (string s) { return s.Length; }
static void Done (IAsyncResult cookie)
{
var target = (Func<string, int>) cookie.AsyncState;
int result = target.EndInvoke (cookie);
Console.WriteLine ("String length is: " + result);
}
BeginInvoke 最后一个参数是用户定义值,可以通过IAsyncResult.AsyncState 属性获取到。这里传入的就是代理方法本身,所以可以再调用EndInvoke。
线程池优化
线程池开始时就有一个线程。当有任务分配进来时,线程池管理器就插入新的线程,直到最大限制值。在一段非活动期后,线程管理器会销毁一部分线程,来达到更高的效率。
可以用ThreadPool.SetMaxThreads属性,设置线程池的最大线程数,默认值如下:
- 1023 in Framework 4.0 in a 32-bit environment
- 32768 in Framework 4.0 in a 64-bit environment
- 250 per core in Framework 3.5
- 25 per core in Framework 2.0
当然这个值要根据具体情况不同。为什么数值会这么大,是因为总是会有一些线程被阻塞。
当然也可以通过ThreadPool.SetMinThreads来设置一个下限。提高这个最小值,可以在很多线程blocked时,仍能并行同步工作。
最小值一般是一个处理器一个线程。在服务器环境下,最小值会设置为50多
最小线程是怎么工作的呢?
设置了线程池的最小值x后,并不是马上就创建x个线程,线程池管理器会按需逐渐创建线程。不立马创建,是为了防止对应用造成冲击。假设有40个任务,每个任务执行10ms,在一台四核计算机上,需要执行100ms。理论上需要40个线程来执行。少了不能充分利用CPU,多了也没有必要。 如果假设线程不是执行10ms,而是请求网页,需要半秒钟等待回应,这时CPU时空闲的。线程池的管理策略就不行了,需要更多的线程,使得请求同时进行。 幸好,池管理器有个备份计划,如果他的队列半秒不动,就会在创建新的新城,每半秒一个,直到最大值。 半秒钟的延迟是个双刃剑,一方面保证不会马上消耗40M内存,另一方面造成不必要的等待。因此可以告诉池管理器,不要拖延,直接分配前x个线程,ThreadPool.SetMinThreads (50, 50);