问题描述
- C#中Parallel.For并行处理中读取文件时出现的错误
-
1、读取“d:/users/v-lingao/from_lei/wordsegmentation/testdata”目录下的所有txt文档,利用Parallel.For并行处理各个txt文档中的内容,每次读取一行存储到string line中,利用line = sr.ReadLine() (StreamReader sr); 没处理一行也入“d:/users/v-lingao/from_lei/wordsegmentation/testdata1”目录下新创建的对应的txt文件中。方法ComputeIDF()实现次功能。2、读取在“d:/users/v-lingao/from_lei/wordsegmentation/testdata1”目录下创建的txt文件,利用Parallel.For并行处理每个txt文档中的内容,类似于ComputeIDF()方法,利用line = sr.ReadLine().。法ComputingTfIdf()实现此功能。错误也就出现在此方法中,错误提示根据写入文件时编码方式的不同有所改变。
部分代码如下所示:
public static Dictionary ComputeIDF(List stopWordsList)
{
DirectoryInfo di = new DirectoryInfo(@"d:/users/v-lingao/from_lei/wordsegmentation/testdata");
FileInfo[] ff = di.GetFiles("*.txt");
Dictionary featureDoc = new Dictionary();Parallel.For(0, ff.Length, (part) => { FileInfo file = ff[part]; Dictionary<string, int> featureFile = new Dictionary<string,int>(); string name = file.Name.Substring(file.Name.LastIndexOf("\") + 1); string path = Path.Combine(@"d:/users/v-lingao/from_lei/wordsegmentation/testdata1", name); FileStream aFile = new FileStream(path, FileMode.Create); StreamWriter sw = new StreamWriter(aFile, Encoding.UTF8); int lineCount = 0; char[] charArray = new char[] { ' ' }; StreamReader sr = new StreamReader(file.OpenRead(),Encoding.UTF8); string line = sr.ReadLine(); while (line != null) { ? ? ? ? ? ?//部分代码省略 ? ? ? ? ??lineCount++; sw.Write(lineCount); foreach (KeyValuePair<string, int> keyvalue in featureLine) { sw.Write(' ' + keyvalue.Key + ':' + (0.5 + 0.5 * ((float)keyvalue.Value / maxCount))); } sw.WriteLine(); line = sr.ReadLine(); } //combine the featureFiles into featureDoc without repeating featureDoc.Add(featurename, featureFile[featurename]); sr.Close(); sw.Close(); }); Dictionary<string, float> idf = new Dictionary<string, float>(); foreach (KeyValuePair<string, int> keyvalue in featureDoc) { idf.Add(keyvalue.Key, (float)Math.Log10((float)sumLine / (float)keyvalue.Value)); } return idf; }
这个方法没有问题。接下来是ComputingTfIdf(idf),问题出在这个方法中。
public static void ComputingTfIdf(Dictionary idf)
{
DirectoryInfo dir = new DirectoryInfo(@"d:/users/v-lingao/from_lei/wordsegmentation/testdata1");
FileInfo[] ff = dir.GetFiles("*.txt");
StreamReader sr;Parallel.For(0, ff.Length, (part) => { FileInfo file = ff[part]; List<string> idfList = new List<string>(); idfList.AddRange(idf.Keys); int linenum = 0; sr = new StreamReader(file.OpenRead(),Encoding.UTF8); char[] charArray = new char[] { ' ' }; char[] charArray1 = new char[] { ':' }; string name = file.Name.Substring(file.Name.LastIndexOf("\") + 1); string path = Path.Combine(@"d:/users/v-lingao/from_lei/wordsegmentation/idfdata", name); FileStream aFile = new FileStream(path, FileMode.Create); StreamWriter sw = new StreamWriter(aFile, Encoding.UTF8); ** *string line = sr.ReadLine();* ** //这行有时也会出错 while (line != null) { linenum++; string[] words = line.Split(charArray); int i = 1; foreach (string word in words) { if (i == 1) { sw.Write(word + ' '); i++; } else { string[] wds = word.Split(charArray1); if (wds.Length == 2) { string key = wds[0]; if (idf.Keys.Contains(key)) { double tfidf = (double)idf[key] * (Convert.ToDouble(wds[1])); sw.Write(idfList.IndexOf(key)+ ':'+tfidf +' '); } } } } sw.WriteLine(); ** *line = sr.ReadLine();* ** //问题常常出现在这行 } } sw.Close(); }); }
错误提示根据写入文件时编码方式的不同有所改变。当读取、写入文件用UTF8或者Unicode时,写入和读取的都是乱码,并且line = sr.ReadLine()出错,错误提示为: ** The output char buffer is too small to contain the decoded characters, encoding 'Unicode (UTF-8)' fallback 'System.Text.DecoderReplacementFallback' **
很是无语,功能相同的代码,为什么ComputeIDF()方法中line = sr.ReadLine()就不出错。我将编码换成Encoding.GetEncoding("GBK")读写文件不会出现乱码,但line = sr.ReadLine()还是出错,相当无语!
还有就是当不用并行处理Parallel.For,而是用for循环时也不出错。
求大侠帮忙,不胜感激!
解决方案
问题在朋友的帮助下已经解决,很感谢我的朋友!
现在把结果和大家分享下,希望遇到类似问题的同仁能从中有所启发。
用并行处理Parallel.For,要特别注意局部变量的位置。在我的代码中sr是在Parallel.For结构外面定义的,这样在执行的过程中几个线程会共享一个sr,最终导致异常的产生。