天天看點

基于WCF的通道網絡傳輸資料壓縮技術的應用研究

本文及程式不是介紹WCF怎麼用,而是研究如何在WCF通信時的通道兩端自動進行資料壓縮和解壓縮,進而增加分布式資料傳輸速度。

而且,這個過程是完全透明的,使用者及程式設計人員根本不需要知道它的存在,相當于HOOK在兩端的一個元件。可以使用中網絡帶寬較小

的網絡環境中。當WCF在兩個實體間通訊的時候,便自動建立一個資訊通道轉接通訊,這個消息包含資料請求和相應。WCF使用特殊的

編碼器将請求和響應資料轉換成一系列的位元組。

    我所帶的項目裡遇到大檔案分布式傳輸問題,經過分析考慮采用WCF通道壓縮技術來解決此問題。執行這樣的編碼是需要傳輸大檔案(XML格式)由一台機器到另一台機器傳輸,而連接配接有速度限制。經過檢視了一些國外英文網站,發現我不用寫一個特殊的函數邊壓縮和邊解壓,而是配置傳輸通道可以做到這一點,這種方式壓縮可重複使用的任何契約。我發現自己編寫的消息編碼器是最簡單的方式來實作功能,真正的問題是如何編寫資訊編碼器,在MSDN上沒有找到任何關于此應用的執行個體。消息契約編碼器的想法是Hook連接配接兩端發送和接收資訊的管道。程式是采用Microsoft Visual Studio 2008 WCF設計。

                                  圖1 WCF消息通道編碼過程時序圖    

發送方:代碼中加入方法,該方法及其參數的序列化成SOAP消息,消息編碼序列化的資訊将成為一個位元組數組,位元組數組發送傳輸層。

 接收方:傳輸層接收位元組數組,消息編碼器并行化位元組數組到一條消息,該方法及其參數并行化到一個SOAP消息,方法是被監聽的。 當加入壓縮資訊編碼器,該方法要求有一點改變:發送方:代碼中加入方法,該方法及其參數的序列化成SOAP消息,消息契約編碼讓其内在的資訊編碼序列的資訊成為一個位元組數組,消息契約編碼壓縮的位元組數組第二個位元組數組,位元組數組發送傳輸層。

     接收方:傳輸層接收位元組數組,消息契約編碼的位元組數組解壓到第二位元組數組,消息契約編碼讓其内在的資訊編碼化的第二個位元組數組消息,該方法及其參并行化到SOAP消息,方法是被監聽的。

     這個消息契約編碼分為幾個類:

     CompactMessageEncoder //這個類提供了資訊編碼實施。

     CompactMessageEncoderFactory //這個類是負責提供契約資訊編碼執行個體。

     CompactMessageEncodingBindingElement //這個類負責通道的協定限制規範。

     CompactMessageEncodingElement //這個類使資訊編碼通過增加應用程式配置檔案。

                              圖2 消息通道編碼器靜态類圖 

     壓縮方法:契約消息編碼器是使用gzip壓縮的NET Framework範圍内執行的,是調用System.IO.Compression.GZipStream名字空間類中。

     加入引用CompactMessageEncoder.dll,修改app.config檔案引用,應用程式必須要在用戶端和伺服器端。

壓縮緩沖代碼:

private static ArraySegment<byte> CompressBuffer(ArraySegment<byte> buffer, BufferManager bufferManager, int messageOffset)

        {

            // Create a memory stream for the final message

            MemoryStream memoryStream = new MemoryStream();

            // Copy the bytes that should not be compressed into the stream

            memoryStream.Write(buffer.Array, 0, messageOffset);

            // Compress the message into the stream

            using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Compress, true))

            {

                gzStream.Write(buffer.Array, messageOffset, buffer.Count);

            }

            // Convert the stream into a bytes array

            byte[] compressedBytes = memoryStream.ToArray();

            // Allocate a new buffer to hold the new bytes array

            byte[] bufferedBytes = bufferManager.TakeBuffer(compressedBytes.Length);

            // Copy the compressed data into the allocated buffer

            Array.Copy(compressedBytes, 0, bufferedBytes, 0, compressedBytes.Length);

             // Release the original buffer we got as an argument

            bufferManager.ReturnBuffer(buffer.Array);

            // Create a new ArraySegment that points to the new message buffer

            ArraySegment<byte> byteArray = new ArraySegment<byte>(bufferedBytes, messageOffset, compressedBytes.Length - messageOffset);

            return byteArray;

        }

解壓緩沖代碼:

Code

   private static ArraySegment<byte> DecompressBuffer(ArraySegment<byte> buffer, BufferManager bufferManager)

            // Create a new memory stream, and copy into it the buffer to decompress

            MemoryStream memoryStream = new MemoryStream(buffer.Array, buffer.Offset, buffer.Count);

            // Create a memory stream to store the decompressed data

            MemoryStream decompressedStream = new MemoryStream();

            // The totalRead stores the number of decompressed bytes

            int totalRead = 0;

            int blockSize = 1024;

            // Allocate a temporary buffer to use with the decompression

            byte[] tempBuffer = bufferManager.TakeBuffer(blockSize);

            // Uncompress the compressed data

            using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Decompress))

                while (true)

                {

                    // Read from the compressed data stream

                    int bytesRead = gzStream.Read(tempBuffer, 0, blockSize);

                    if (bytesRead == 0)

                        break;

                    // Write to the decompressed data stream

                    decompressedStream.Write(tempBuffer, 0, bytesRead);

                    totalRead += bytesRead;

                }

            // Release the temporary buffer

            bufferManager.ReturnBuffer(tempBuffer);

            // Convert the decompressed data stream into bytes array

            byte[] decompressedBytes = decompressedStream.ToArray();

            // Allocate a new buffer to store the message 

            byte[] bufferManagerBuffer = bufferManager.TakeBuffer(decompressedBytes.Length + buffer.Offset);

            // Copy the bytes that comes before the compressed message in the buffer argument

            Array.Copy(buffer.Array, 0, bufferManagerBuffer, 0, buffer.Offset);

            // Copy the decompressed data

            Array.Copy(decompressedBytes, 0, bufferManagerBuffer, buffer.Offset, decompressedBytes.Length);

            ArraySegment<byte> byteArray = new ArraySegment<byte>(bufferManagerBuffer, buffer.Offset, decompressedBytes.Length);

            // Release the original message buffer

改變服務端配置

   加入消息契約編碼器之前app.config的執行個體:

<?xml version="1.0" encoding="utf-8" ?>

<configuration> 

    <system.serviceModel> 

        <services> 

            <service name="Server.MyService">

                <endpoint 

                    address="net.tcp://localhost:1234/MyService" 

                    binding="netTcpBinding"

                    contract="Server.IMyService" /> 

            </service> 

        </services> 

    </system.serviceModel>

</configuration>

加入消息契約編碼器後app.config的例子:

            <!-- Set the binding of the endpoint to customBinding -->                 

            <endpoint 

                    binding="customBinding"

        <!-- Defines a new customBinding that contains the compactMessageEncoding -->         

        <bindings> 

            <customBinding> 

                <binding name="compactBinding"> 

                    <compactMessageEncoding>

                <!-- Defines the inner message encoder as binary encoder -->                        

                <binaryMessageEncoding /> 

                    </compactMessageEncoding> 

                    <tcpTransport /> 

                </binding> 

            </customBinding> 

        </bindings> 

    <!-- Adds the extension dll so the WCF can find the compactMessageEncoding -->

        <extensions> 

            <bindingElementExtensions> 

                <add name="compactMessageEncoding" type="Amib.WCF.CompactMessageEncodingElement, CompactMessageEncoder, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" />

            </bindingElementExtensions> 

        </extensions> 

     </system.serviceModel>

用戶端配置變化

加入消息契約編碼器之前app.config的執行個體:

基于WCF的通道網絡傳輸資料壓縮技術的應用研究
基于WCF的通道網絡傳輸資料壓縮技術的應用研究

這種壓縮方法,消息堵塞的幾率很小。使用CompactMessageEncoder在同一台機器運作用戶端和伺服器上可能會降低效率。

繼續閱讀