在上篇文章中介绍了如何Break、Stop循环,以及如何定义线程局部变量。在本文中介绍如何在外部去取消循环、以及异常的处理。
Cancel
在并行的循环中支持通过传递ParallelOptions参数中的CancellationToken进行取消循环的控制,我们可以CancellationTokenSource实例化之后传递给ParallelOptions对象Cancellation值。下面来看个示例:
[TestMethod]
public void CancelLoop()
{
var sourceNums = Enumerable.Range(0, 1000000000);
var cts = new CancellationTokenSource();
var po = new ParallelOptions();
var stack = new ConcurrentStack<int>();
po.CancellationToken = cts.Token;
po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
Task.Factory.StartNew(() =>
{
foreach (var num in sourceNums)
{
if (num == 1000000)
cts.Cancel();
}
});
try
{
Parallel.ForEach(sourceNums,po, num =>
{
stack.Push(num);
po.CancellationToken.ThrowIfCancellationRequested();
});
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
}
Console.WriteLine(stack.Count);
}
我们来看下运行的结果:
解释下上面的方法,并行循环的意图是将sourceNums里面的元素推到Stack中,然后另外开启了一个线程来控制了什么时候进行cancel操作。也许会有个疑问,为什么不是1000000呢,原因很简单就是上面的控制的线程不可能跟下面的同时开始的,而其每次迭代运行所需要的时间也是不同的。
上面的示例中我们看的是如何终止Parallel的ForEach循环,终止For循环是一样的,For方法中也提供了ParallelOptions参数。
Handel Exceptions
在处理并行循环的异常的与顺序循环异常的处理是有所不同的,并行循环里面可能会一个异常在多个循环中出现,或则一个线程上的异常导致另外一个线程上也出现异常。比较好的处理方式就是,首先获取所有的异常最后通过AggregateException来包装所有的循环的异常,循环结束后进行throw。看一段示例代码:
private void HandleNumbers(int[] numbers)
{
var exceptions = new ConcurrentQueue<Exception>();
Parallel.For(0, numbers.Length, i =>
{
try
{
if (numbers[i] > 10 && numbers[i] < 20)
{
throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i));
}
}
catch (Exception e)
{
exceptions.Enqueue(e);
}
});
if (exceptions.Count > 0)
throw new AggregateException(exceptions);
}
测试方法:
[TestMethod()]
public void HandleExceptions()
{
var numbers = Enumerable.Range(0, 10000).ToArray();
try
{
this.HandleNumbers(numbers);
}
catch(AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
}
测试结果:
对上面的方法说明下,在HandleNumbers方法中,就是一个小的demo如果元素的值出现在10-20之间就抛出异常。在上面我们的处理方法就是:在循环时通过队列将所有的异常都集中起来,循环结束后来抛出一个AggregateException。
总结
在本文中主要说明了如何处理异常以及如何在外部取消一个并行循环。到此Task Parallel Library中的数据并行部分已经结束。下面的会就学习下,任务并行部分。