作者:肖波
用Remoting做程序間通訊,效率較低,于是做了一個采用管道技術進行程序間通訊的例子,在1.8G 雙核計算機上每秒鐘可以發送180M資料。下面給出源碼
Server端的管道類

using System;

using System.Collections.Generic;

using System.Text;

using System.Threading;

using Pipe.Win32;


namespace Pipe.Server
{
public delegate void ReceiveMessageFunc(System.IO.MemoryStream m);
public delegate void ReceiveMessageErrorFunc(Exception e);
public class PipeServer : IDisposable
{
enum State
{
Idle = 0,
Begining = 1,
Reading = 2,
}
String m_PipeName;
uint m_Handle;
uint m_BufferSize;
State m_State = State.Idle;
const ulong SYNC_HEAD = 0xf8c7a1ca13db307e;
const uint NMPWAIT_USE_DEFAULT_WAIT = 0x00000000;
const int DEFAULT_BUFFER_SIZE = 1024;
ReceiveMessageFunc m_ReceiveMessage;
ReceiveMessageErrorFunc m_ReceiveMessageError;
public ReceiveMessageFunc OnReceiveMessage
get
{
return m_ReceiveMessage;
}
set
m_ReceiveMessage = value;
public ReceiveMessageErrorFunc OnReceiveMessageError
return m_ReceiveMessageError;
m_ReceiveMessageError = value;
public String PipeName
return m_PipeName;
public uint BufferSize
return m_BufferSize;
public String PipeUri
return @"\\.\pipe\" + m_PipeName;
private bool IsSyncHead(byte[] buf, uint len, out int msgLen)
msgLen = 0;
if (len != 12)
return false;
if (SYNC_HEAD != BitConverter.ToUInt64(buf, 0))
msgLen = BitConverter.ToInt32(buf, sizeof(ulong));
if (msgLen < 0)
return true;
private void ProcessMessage(System.IO.MemoryStream m)
if (OnReceiveMessage != null)
m.Position = 0;
OnReceiveMessage(m);
private void ThreadProc()
public PipeServer(String pipeName)
m_PipeName = pipeName;
m_BufferSize = DEFAULT_BUFFER_SIZE;
public PipeServer(String pipeName, uint bufferSize)
m_BufferSize = bufferSize;
public void Listen()
while (true)
try
{
m_Handle = NTKernel.CreateNamedPipe(PipeUri, (uint)FileAccess.PIPE_ACCESS_DUPLEX,
(uint)PipeMode.PIPE_READMODE_MESSAGE | (uint)PipeMode.PIPE_TYPE_MESSAGE | (uint)PipeMode.PIPE_WAIT,
NTKernel.PIPE_UNLIMITED_INSTANCES, m_BufferSize, m_BufferSize, NMPWAIT_USE_DEFAULT_WAIT, new SecurityAttributes());
if (m_Handle == NTKernel.INVAILD_HANDLE)
{
throw new Exception(String.Format("CreateNamedPipe fail, err={0}", NTKernel.GetLastError()));
}
if (!NTKernel.ConnectNamedPipe(m_Handle, IntPtr.Zero))
uint err = NTKernel.GetLastError();
NTKernel.CloseHandle(m_Handle);
throw new Exception(String.Format("ConnectNamedPipe fail, err={0}", err));
byte[] buf = new byte[m_BufferSize];
uint relSize = 0;
int msgLen = 0;
int offset = 0;
System.IO.MemoryStream m = new System.IO.MemoryStream();
while (NTKernel.ReadFile(m_Handle, buf, m_BufferSize, out relSize, IntPtr.Zero))
switch (m_State)
{
case State.Idle:
if (IsSyncHead(buf, relSize, out msgLen))
{
m_State = State.Begining;
}
break;
case State.Begining:
offset = 0;
m = new System.IO.MemoryStream();
m.Write(buf, 0, (int)relSize);
offset += (int)relSize;
if (offset >= msgLen)
m_State = State.Idle;
if (offset == msgLen)
{
ProcessMessage(m);
}
else
if (OnReceiveMessageError != null)
{
OnReceiveMessageError(new Exception("Message overflow!"));
}
else
m_State = State.Reading;
case State.Reading:
}
NTKernel.DisconnectNamedPipe(m_Handle);
NTKernel.CloseHandle(m_Handle);
System.Threading.Thread.Sleep(10);
}
catch (Exception e)
if (OnReceiveMessageError != null)
OnReceiveMessageError(e);
public void Dispose()
lock (this)
if (m_Handle != NTKernel.INVAILD_HANDLE)
m_Handle = NTKernel.INVAILD_HANDLE;
~PipeServer()
Dispose();
}
}

Client 端的管道類




using System.Diagnostics;



namespace Pipe.Client
public class PipeClient : IDisposable
String m_ComputerName;
byte[] m_SendBuf;
Propertys
private void Connect()
int file_not_find_times = 0;
m_Handle = NTKernel.CreateFile(PipeUri, (uint)FileAccess.GENERIC_READ | (uint)FileAccess.GENERIC_WRITE,
0, new SecurityAttributes(), (uint)CreateMode.OPEN_EXISTING, 0, 0);
if (m_Handle == NTKernel.INVAILD_HANDLE)
uint err = NTKernel.GetLastError();
if (err == NTKernel.ERROR_FILE_NOT_FOUND)
if (file_not_find_times++ < 2000)
System.Threading.Thread.Sleep(20);
continue;
if (err == NTKernel.ERROR_PIPE_BUSY)
NTKernel.WaitNamedPipeA(PipeUri, 20);
continue;
else
throw new Exception(String.Format("Create File for pipe fail, err={0}", NTKernel.GetLastError()));
break;
private void WriteBuf(byte[] buf)
uint relSize;
if (!NTKernel.WriteFile(m_Handle, buf, (uint)buf.Length, out relSize, IntPtr.Zero))
throw new Exception(String.Format("Send message to pipe fail, err={0}", NTKernel.GetLastError()));
public void Close()
bool ret = NTKernel.CloseHandle(m_Handle);
public PipeClient(String pipeName, uint bufferSize)
m_Handle = NTKernel.INVAILD_HANDLE;
m_SendBuf = new byte[bufferSize];
Close();
public void Send(byte[] buf)
if (m_Handle == NTKernel.INVAILD_HANDLE)
Connect();
//Build Message Head
byte[] syncHead = BitConverter.GetBytes(SYNC_HEAD);
byte[] length = BitConverter.GetBytes(buf.Length);
byte[] lengthBuf = new byte[syncHead.Length + length.Length];
syncHead.CopyTo(lengthBuf, 0);
for (int i = syncHead.Length; i < lengthBuf.Length; i++)
lengthBuf[i] = length[i - syncHead.Length];
WriteBuf(lengthBuf);
//write content
if (buf.Length < m_BufferSize)
WriteBuf(buf);
else
//the length of buf lardge than m_BufferSize
int offset = 0;
int len = Math.Min((int)m_BufferSize, buf.Length - offset);
byte[] sendbuf;
while (len > 0)
if (len == m_BufferSize)
sendbuf = m_SendBuf;
sendbuf = new byte[len];
System.IO.MemoryStream m = new System.IO.MemoryStream(sendbuf);
m.Write(buf, offset, len);
m.Close();
offset += len;
len = Math.Min((int)m_BufferSize, buf.Length - offset);
WriteBuf(sendbuf);
~PipeClient()

NTKernel.cs
這個程式檔案Client 和 Server 都要,封裝了相應的API函數




using System.Runtime.InteropServices;



namespace Pipe.Win32
Data Structures
public class NTKernel
public const uint PIPE_UNLIMITED_INSTANCES = 255;
public const uint INVAILD_HANDLE = 0xFFFFFFFF;
public const uint ERROR_FILE_NOT_FOUND = 2;
public const uint ERROR_PIPE_BUSY = 231;
internal const uint INFINITE = 0xFFFFFFFF;
[DllImport("kernel32", EntryPoint = "GetLastError", SetLastError = true, CharSet = CharSet.Unicode)]
public static extern uint GetLastError();
[DllImport("kernel32.dll", SetLastError = true)]
public static extern uint CreateNamedPipe(string lpName, uint dwOpenMode,
uint dwPipeMode, uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize,
uint nDefaultTimeOut, SecurityAttributes lpSecurityAttributes);
[DllImport("kernel32.dll")]
public static extern bool ConnectNamedPipe(uint hNamedPipe,
IntPtr lpOverlapped);
public static extern bool DisconnectNamedPipe(uint hNamedPipe);
[DllImport("kernel32.dll", SetLastError=true)]
public static extern int WaitNamedPipeA (string lpNamedPipeName, int nTimeOut);
public static extern bool ReadFile(uint hFile, byte[] lpBuffer,
uint nNumberOfBytesToRead, out uint lpNumberOfBytesRead, IntPtr lpOverlapped);
public static extern bool WriteFile(uint hFile, byte[] lpBuffer,
uint nNumberOfBytesToWrite, out uint lpNumberOfBytesWritten,
public static extern bool CloseHandle(uint hHandle);
[DllImport("kernel32.dll", CharSet = CharSet.Auto, CallingConvention = CallingConvention.StdCall, SetLastError = true)]
public static extern uint CreateFile(
string lpFileName,
uint dwDesiredAccess,
uint dwShareMode,
SecurityAttributes lpSecurityAttributes,
uint dwCreationDisposition,
uint dwFlagsAndAttributes,
int hTemplateFile
);
Mutex
Semaphore
Event
class Mutex : IDisposable
IntPtr m_Handle;
public Mutex(SecurityAttributes lpEventAttributes, bool bInitialOwner, string lpName)
m_Handle = NTKernel.CreateMutex(lpEventAttributes, bInitialOwner, lpName);
if (m_Handle == IntPtr.Zero)
uint err = NTKernel.GetLastError();
throw new Exception(String.Format("Create Event fail, error={0}",
err));
public Mutex(bool bInitialOwner, string lpName)
m_Handle = NTKernel.CreateMutex(null, bInitialOwner, lpName);
public bool WaitOne(uint dwMilliseconds)
WaitForState waitForState = (WaitForState)NTKernel.WaitForSingleObject((uint)m_Handle, dwMilliseconds);
if (waitForState == WaitForState.WAIT_OBJECT_0)
return true;
else if (waitForState == WaitForState.WAIT_TIMEOUT)
throw new System.Threading.AbandonedMutexException();
public bool WaitOne()
return WaitOne(NTKernel.INFINITE);
public void ReleaseMutex()
NTKernel.ReleaseMutex(m_Handle);
if (m_Handle != IntPtr.Zero)
if (NTKernel.CloseHandle((uint)m_Handle))
m_Handle = IntPtr.Zero;
~Mutex()
IDisposable Members
public class Event : IDisposable
public Event()
public Event(SecurityAttributes lpEventAttributes, bool bManualReset, bool bInitialState, string lpName)
m_Handle = NTKernel.CreateEvent(lpEventAttributes, bManualReset, bInitialState, lpName);
public bool Open(EventAccess dwDesiredAccess, bool bInheritHandle, string lpName)
m_Handle = NTKernel.OpenEvent((uint)dwDesiredAccess, bInheritHandle, lpName);
public WaitForState WaitFor(uint dwMilliseconds)
return (WaitForState)NTKernel.WaitForSingleObject((uint)m_Handle, dwMilliseconds);
public WaitForState WaitFor()
return WaitFor(NTKernel.INFINITE);
public void SetEvent()
NTKernel.SetEvent(m_Handle);
public void Release()
NTKernel.ResetEvent(m_Handle);
~Event()

用戶端調用

byte[] buf = new byte[10240];

Pipe.Client.PipeClient client = new Pipe.Client.PipeClient("test", 102400);


for (int i = 0; i < 10000; i++)
client.Send(buf);
Console.WriteLine(e.Message);
finally
伺服器調用

static bool begin = true;

static System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();

static int count = 0;

static object lockObj = new object();


static void ReceiveMessage(System.IO.MemoryStream m)
//Console.WriteLine(msg.Event);
lock (lockObj)
if (begin)
watch.Start();
begin = false;
count++;
if (count == 10000)
watch.Stop();
float len = m.Length;
Console.WriteLine(String.Format("{0} MB", (len * 10000 * 1000 / watch.ElapsedMilliseconds) / (1024 * 1024)));
Console.WriteLine(String.Format("{0} ms", watch.ElapsedMilliseconds));


static void ReceiveMessageError(Exception e)
Console.WriteLine(e.Message);


static void Main(string[] args)
Pipe.Server.PipeServer server = new Pipe.Server.PipeServer("Test", 102400);
server.OnReceiveMessage = ReceiveMessage;
server.OnReceiveMessageError = ReceiveMessageError;
server.Listen();