天天看點

C# 用一個檔案實作對象的二進制序列化和緩存

應用場景

    在通訊中,收發資料都應該先放到隊列中,避免堵塞通訊,最好做持久化,避免因隊列滿或程式退出造成丢數,就像RabbitMQ這類中間件那樣,這裡我們手搓一個用于做對象持久化到檔案的輔助類。

實作思路

    開辟一個檔案,同時支援讀和寫,設定大小上限,當檔案達到上限時從頭覆寫寫入,如此往複,如果檔案大小足夠大,就能夠在無法轉發那段時間把資料都緩存起來,并記錄轉發的斷點,當通訊恢複時繼續轉發。

關鍵點

    1. 在一個FileStream中同時記錄讀位置和寫位置,而流本身隻有一個Position,是以算法需要小心處理讀寫位置的計算;

    2.不允許讀位置先于寫位置,即當讀位置等于寫位置時停止讀;

    3.當寫覆寫了未讀資料時,要将讀位置向前移動,即丢棄未讀的舊資料,發生這種情況意味着檔案大小不足以緩存無法轉發的所有資料;

    4.關閉流之前,在檔案的末尾儲存讀位置和寫位置。

實作代碼

using System;
using System.IO;

namespace Arim.DataX.Core
{
    /// <summary>
    /// 固定大小的檔案,當寫到底時從頭開始覆寫.
    /// </summary>
    public class FixLengthFile
    {
        private readonly long maxLength;
        private readonly string fileName;
        private readonly object lockthis;
        private System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter;
        public FixLengthFile(string name, long length)
        {
            fileName = name;
            maxLength = length;
            lockthis = new object();
            formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
        }

        //讀寫位置.
        private BinaryWriter writer;
        private BinaryReader reader;
        private bool opened = false;
        private const int TAIL16 = 16;//檔案尾保留16位元組存儲readpos和writepos.
        private long readPos = 0, writePos = 0;
        public void Open()
        {
            if (!opened)
            {
                lock (lockthis)
                {
                    var stream = new FileStream(fileName, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);
                    writer = new BinaryWriter(stream);
                    reader = new BinaryReader(stream);
                    var length = reader.BaseStream.Length;
                    if (length > TAIL16)
                    {
                        reader.BaseStream.Seek(-TAIL16, SeekOrigin.End);
                        readPos = reader.ReadInt64();
                        writePos = reader.ReadInt64();
                        writer.BaseStream.Seek(writePos, SeekOrigin.Begin);//跳轉到上次寫入的位置.
                        writer.BaseStream.SetLength(length - TAIL16);//丢棄之前儲存的讀寫位置.
                    }
                }
                opened = true;
            }
        }

        private readonly byte[] BYTE4 = new byte[4];
        public void Write(object obj)
        {
            if (obj == null) return;

            if (!opened) Open();

            using (MemoryStream stream = new MemoryStream())
            {
                stream.Write(BYTE4, 0, 4);//預留4個位元組的長度.
                formatter.Serialize(stream, obj);
                var size = (int)stream.Position - 4;
                stream.Seek(0, SeekOrigin.Begin);
                stream.Write(BitConverter.GetBytes(size), 0, 4);//覆寫前4個長度位元組.
                size += 4;
                var arr = stream.GetBuffer();
                lock (lockthis)
                {
                    var remainspace = maxLength - TAIL16 - writePos; //到檔案尾部剩餘的可寫空間.
                    var remainlength = size - remainspace;//到檔案尾部還寫不下的部分長度.
                    if (remainlength > 0)
                    {
                        moveReadPos(writePos, maxLength - TAIL16, 0, remainlength);
                        writer.BaseStream.Position = writePos;
                        writer.Write(arr, 0, (int)remainspace);
                        writer.BaseStream.Seek(0, SeekOrigin.Begin);
                        writer.Write(arr, (int)remainspace, (int)remainlength);
                    }//寫滿檔案後将剩餘位元組從頭寫入.
                    else
                    {
                        moveReadPos(writePos, writePos + size);
                        writer.BaseStream.Position = writePos;
                        writer.Write(arr, 0, size);
                    }
                    writePos = writer.BaseStream.Position;
                }
            }
        }

        //如果讀取位置被覆寫,移動到下一個不被覆寫的資料位置.                    
        private void moveReadPos(long start, long end)
        {
            while (readPos > start && readPos <= end)
            {
                doMoveReadPos();
            }
        }

        private void moveReadPos(long start1, long end1, long start2, long end2)
        {
            while ((readPos > start1 && readPos <= end1) || (readPos >= start2 && readPos <= end2))
            {
                doMoveReadPos();
            }
        }

        private void doMoveReadPos()
        {
            var length = writer.BaseStream.Length;
            var pos = readPos;//readInt32會改變readPos.
            var size = readInt32(length) + 4;

            //seek
            var remain = pos + size - length;
            if (remain <= 0)
            {
                pos += size;
            }//檔案不滿.
            else
            {
                pos = remain;
            }
            readPos = pos;
        }

        public void Read(Func<object, bool> handle)
        {
            if (handle != null)
            {
                lock (lockthis)
                {
                    if (writePos != readPos)
                    {
                        var oldreadpos = readPos;
                        var length = writer.BaseStream.Length;
                        var size = readInt32(length);
                        var stream = readBytes(length, size);
                        var result = formatter.Deserialize(stream);
                        stream.Close();
                        if (!handle(result))
                        {
                            reader.BaseStream.Seek(oldreadpos, SeekOrigin.Begin);//重置到讀取之前的位置.
                        }
                    }//有可讀資料.
                }
            }
        }

        private Stream readBytes(long filelength, int size)
        {
            reader.BaseStream.Position = readPos;

            var result = new MemoryStream(size);
            var remain = readPos + size - filelength;
            if (remain <= 0)
            {
                var arr = reader.ReadBytes(size);
                readPos += size;
                result.Write(arr, 0, arr.Length);
            }//檔案不滿.
            else
            {
                var firstPart = reader.ReadBytes((int)(filelength - readPos));
                result.Write(firstPart, 0, firstPart.Length);
                reader.BaseStream.Seek(0, SeekOrigin.Begin);
                var secondPart = reader.ReadBytes((int)remain);
                readPos = remain;
                result.Write(secondPart, 0, secondPart.Length);
            }
            result.Seek(0, SeekOrigin.Begin);
            return result;
        }

        private int readInt32(long filelength)
        {
            reader.BaseStream.Position = readPos;

            var remain = readPos + 4 - filelength;
            if (remain <= 0)
            {
                var result = reader.ReadInt32();
                readPos += 4;
                return result;
            }//檔案不滿.
            else
            {
                var firstPart = reader.ReadBytes((int)(filelength - readPos));
                Array.Copy(firstPart, 0, BYTE4, 0, firstPart.Length);
                reader.BaseStream.Seek(0, SeekOrigin.Begin);
                var secondPart = reader.ReadBytes((int)remain);
                Array.Copy(secondPart, 0, BYTE4, firstPart.Length, secondPart.Length);
                readPos = remain;
                return BitConverter.ToInt32(BYTE4, 0);
            }
        }

        public void Close()
        {
            if (opened)
            {
                lock (lockthis)
                {
                    writer.Seek(0, SeekOrigin.End);
                    writer.Write(readPos);//記錄上一次讀取位置.
                    writer.Write(writePos);//記錄上一次寫入位置.
                    writer.Flush();
                    writer.Close();
                    reader.Close();
                }
            }
            opened = false;
        }
    }
}
           

測試代碼

    随機寫入或讀取100次數字,列印寫入和讀出的數字。

private static void testFixLengthFile()
        {
            var file = new Arim.DataX.Core.FixLengthFile("serialize1.bin", 1024);
            file.Open();
            Random r = new Random();
            int j = 0;
            for (int i=0;i<100;i++)
            {
                if(r.Next()%2 == 0)
                {
                    file.Write(j);
                    Console.WriteLine($"寫{j}");
                    j++;
                }
                else
                {
                    file.Read((o) =>
                    {
                        Console.WriteLine($"      讀{o.ToString()}");
                        return true;
                    });
                }
            }
            file.Close();
        }
           

    運作結果: 

C# 用一個檔案實作對象的二進制序列化和緩存

     儲存檔案大小為給定的上限1024位元組:

C# 用一個檔案實作對象的二進制序列化和緩存

使用須知

    Write方法傳入的自定義對象必須标注了[Serializable]。