天天看点

CLR线程池的I/O线程

I/O 线程是.NET专为访问外部资源所设置的一种线程,因为访问外部资源常常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET为多个I/O操作都建立起了异步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每个异步方法的使用方式都非常类似,都是以BeginXXX为开始,以EndXXX结束。

异步读写 FileStream

需要在 FileStream 异步调用 I/O线程,必须使用以下构造函数建立 FileStream 对象,并把useAsync设置为 true。

FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;

 path ​是文件的相对路径或绝对路径; mode 确定如何打开或创建文件; access 确定​访问文件的方式; share 确定文件如何进程共享; bufferSize 是代表缓冲区大小,一般默认最小值为8,在启动异步读取或写入时,文件大小一般大于缓冲大小; userAsync代表是否启动异步I/O线程。

异步写入

FileStream中包含BeginWrite、EndWrite 方法可以启动I/O线程进行异步写入。

public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject )

public override void EndWrite (IAsyncResult asyncResult )

BeginWrite 返回值为IAsyncResult, 使用方式与委托的BeginInvoke方法相似,最好就是使用回调函数,避免线程阻塞。在最后两个参数中,参数AsyncCallback用于绑定回调函数; 参数Object用于传递外部数据。要注意一点:AsyncCallback所绑定的回调函数必须是带单个 IAsyncResult 参数的无返回值方法。

在例子中,把FileStream作为外部数据传递到回调函数当中,然后在回调函数中利用IAsyncResult.AsyncState获取FileStream对象,最后通过FileStream.EndWrite(IAsyncResult)结束写入。

1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //把线程池的最大值设置为1000
 6             ThreadPool.SetMaxThreads(1000, 1000);
 7             ThreadPoolMessage("Start");
 8 
 9             //新立文件File.sour
10             FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate, 
11                                        FileAccess.ReadWrite,FileShare.ReadWrite,1024,true);
12             byte[] bytes = new byte[16384];
13             string message = "An operating-system ThreadId has no fixed relationship........";
14             bytes = Encoding.Unicode.GetBytes(message);
15 
16             //启动异步写入
17             stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream);
18             stream.Flush();
19             
20             Console.ReadKey();
21         }
22 
23         static void Callback(IAsyncResult result)
24         {
25             //显示线程池现状
26             Thread.Sleep(200);
27             ThreadPoolMessage("AsyncCallback");
28             //结束异步写入
29             FileStream stream = (FileStream)result.AsyncState;
30             stream.EndWrite(result);
31             stream.Close();
32         }
33 
34         //显示线程池现状
35         static void ThreadPoolMessage(string data)
36         {
37             int a, b;
38             ThreadPool.GetAvailableThreads(out a, out b);
39             string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+
40                   "WorkerThreads is:{2}  CompletionPortThreads is :{3}",
41                   data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42             Console.WriteLine(message);
43         }
44     }      

异步读取

FileStream 中包含 BeginRead 与 EndRead 可以异步调用I/O线程进行读取。

public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)

public override int EndRead(IAsyncResult asyncResult)

其使用方式与BeginWrite和EndWrite相似,AsyncCallback用于绑定回调函数; Object用于传递外部数据。在回调函数只需要使用IAsyncResut.AsyncState就可获取外部数据。EndWrite 方法会返回从流读取到的字节数量。

首先定义 FileData 类,里面包含FileStream对象,byte[] 数组和长度。然后把FileData对象作为外部数据传到回调函数,在回调函数中,把IAsyncResult.AsyncState强制转换为FileData,然后通过FileStream.EndRead(IAsyncResult)结束读取。最后比较一下长度,若读取到的长度与输入的数据长度不一至,则抛出异常。

1      class Program
 2      {
 3          public class FileData
 4          {
 5              public FileStream Stream;
 6              public int Length;
 7              public byte[] ByteData;
 8          }
 9  
10          static void Main(string[] args)
11          {       
12              //把线程池的最大值设置为1000
13              ThreadPool.SetMaxThreads(1000, 1000);
14              ThreadPoolMessage("Start");
15              ReadFile();
16  
17              Console.ReadKey();
18          }
19  
20          static void ReadFile()
21          {
22              byte[] byteData=new byte[80961024];
23              FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate, 
24                                      FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
25              
26              //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数
27              FileData fileData = new FileData();
28              fileData.Stream = stream;
29              fileData.Length = (int)stream.Length;
30              fileData.ByteData = byteData;
31              
32              //启动异步读取
33              stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData);
34          }
35   
36          static void Completed(IAsyncResult result)
37          {
38              ThreadPoolMessage("Completed");
39  
40              //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取
41              FileData fileData = (FileData)result.AsyncState;
42              int length=fileData.Stream.EndRead(result);
43              fileData.Stream.Close();
44  
45              //如果读取到的长度与输入长度不一致,则抛出异常
46              if (length != fileData.Length)
47                  throw new Exception("Stream is not complete!");
48  
49              string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length);
50              Console.WriteLine(data.Substring(2,22));
51          }
52  
53          //显示线程池现状
54          static void ThreadPoolMessage(string data)
55          {
56              int a, b;
57              ThreadPool.GetAvailableThreads(out a, out b);
58              string message = string.Format("{0}\n  CurrentThreadId is {1}\n  "+
59                           "WorkerThreads is:{2}  CompletionPortThreads is :{3}",
60                           data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
61              Console.WriteLine(message);      
62          }
63              
64    }