應用場景
在通訊中,收發資料都應該先放到隊列中,避免堵塞通訊,最好做持久化,避免因隊列滿或程式退出造成丢數,就像RabbitMQ這類中間件那樣,這裡我們手搓一個用于做對象持久化到檔案的輔助類。
實作思路
開辟一個檔案,同時支援讀和寫,設定大小上限,當檔案達到上限時從頭覆寫寫入,如此往複,如果檔案大小足夠大,就能夠在無法轉發那段時間把資料都緩存起來,并記錄轉發的斷點,當通訊恢複時繼續轉發。
關鍵點
1. 在一個FileStream中同時記錄讀位置和寫位置,而流本身隻有一個Position,是以算法需要小心處理讀寫位置的計算;
2.不允許讀位置先于寫位置,即當讀位置等于寫位置時停止讀;
3.當寫覆寫了未讀資料時,要将讀位置向前移動,即丢棄未讀的舊資料,發生這種情況意味着檔案大小不足以緩存無法轉發的所有資料;
4.關閉流之前,在檔案的末尾儲存讀位置和寫位置。
實作代碼
using System;
using System.IO;
namespace Arim.DataX.Core
{
/// <summary>
/// 固定大小的檔案,當寫到底時從頭開始覆寫.
/// </summary>
public class FixLengthFile
{
private readonly long maxLength;
private readonly string fileName;
private readonly object lockthis;
private System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter;
public FixLengthFile(string name, long length)
{
fileName = name;
maxLength = length;
lockthis = new object();
formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
}
//讀寫位置.
private BinaryWriter writer;
private BinaryReader reader;
private bool opened = false;
private const int TAIL16 = 16;//檔案尾保留16位元組存儲readpos和writepos.
private long readPos = 0, writePos = 0;
public void Open()
{
if (!opened)
{
lock (lockthis)
{
var stream = new FileStream(fileName, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);
writer = new BinaryWriter(stream);
reader = new BinaryReader(stream);
var length = reader.BaseStream.Length;
if (length > TAIL16)
{
reader.BaseStream.Seek(-TAIL16, SeekOrigin.End);
readPos = reader.ReadInt64();
writePos = reader.ReadInt64();
writer.BaseStream.Seek(writePos, SeekOrigin.Begin);//跳轉到上次寫入的位置.
writer.BaseStream.SetLength(length - TAIL16);//丢棄之前儲存的讀寫位置.
}
}
opened = true;
}
}
private readonly byte[] BYTE4 = new byte[4];
public void Write(object obj)
{
if (obj == null) return;
if (!opened) Open();
using (MemoryStream stream = new MemoryStream())
{
stream.Write(BYTE4, 0, 4);//預留4個位元組的長度.
formatter.Serialize(stream, obj);
var size = (int)stream.Position - 4;
stream.Seek(0, SeekOrigin.Begin);
stream.Write(BitConverter.GetBytes(size), 0, 4);//覆寫前4個長度位元組.
size += 4;
var arr = stream.GetBuffer();
lock (lockthis)
{
var remainspace = maxLength - TAIL16 - writePos; //到檔案尾部剩餘的可寫空間.
var remainlength = size - remainspace;//到檔案尾部還寫不下的部分長度.
if (remainlength > 0)
{
moveReadPos(writePos, maxLength - TAIL16, 0, remainlength);
writer.BaseStream.Position = writePos;
writer.Write(arr, 0, (int)remainspace);
writer.BaseStream.Seek(0, SeekOrigin.Begin);
writer.Write(arr, (int)remainspace, (int)remainlength);
}//寫滿檔案後将剩餘位元組從頭寫入.
else
{
moveReadPos(writePos, writePos + size);
writer.BaseStream.Position = writePos;
writer.Write(arr, 0, size);
}
writePos = writer.BaseStream.Position;
}
}
}
//如果讀取位置被覆寫,移動到下一個不被覆寫的資料位置.
private void moveReadPos(long start, long end)
{
while (readPos > start && readPos <= end)
{
doMoveReadPos();
}
}
private void moveReadPos(long start1, long end1, long start2, long end2)
{
while ((readPos > start1 && readPos <= end1) || (readPos >= start2 && readPos <= end2))
{
doMoveReadPos();
}
}
private void doMoveReadPos()
{
var length = writer.BaseStream.Length;
var pos = readPos;//readInt32會改變readPos.
var size = readInt32(length) + 4;
//seek
var remain = pos + size - length;
if (remain <= 0)
{
pos += size;
}//檔案不滿.
else
{
pos = remain;
}
readPos = pos;
}
public void Read(Func<object, bool> handle)
{
if (handle != null)
{
lock (lockthis)
{
if (writePos != readPos)
{
var oldreadpos = readPos;
var length = writer.BaseStream.Length;
var size = readInt32(length);
var stream = readBytes(length, size);
var result = formatter.Deserialize(stream);
stream.Close();
if (!handle(result))
{
reader.BaseStream.Seek(oldreadpos, SeekOrigin.Begin);//重置到讀取之前的位置.
}
}//有可讀資料.
}
}
}
private Stream readBytes(long filelength, int size)
{
reader.BaseStream.Position = readPos;
var result = new MemoryStream(size);
var remain = readPos + size - filelength;
if (remain <= 0)
{
var arr = reader.ReadBytes(size);
readPos += size;
result.Write(arr, 0, arr.Length);
}//檔案不滿.
else
{
var firstPart = reader.ReadBytes((int)(filelength - readPos));
result.Write(firstPart, 0, firstPart.Length);
reader.BaseStream.Seek(0, SeekOrigin.Begin);
var secondPart = reader.ReadBytes((int)remain);
readPos = remain;
result.Write(secondPart, 0, secondPart.Length);
}
result.Seek(0, SeekOrigin.Begin);
return result;
}
private int readInt32(long filelength)
{
reader.BaseStream.Position = readPos;
var remain = readPos + 4 - filelength;
if (remain <= 0)
{
var result = reader.ReadInt32();
readPos += 4;
return result;
}//檔案不滿.
else
{
var firstPart = reader.ReadBytes((int)(filelength - readPos));
Array.Copy(firstPart, 0, BYTE4, 0, firstPart.Length);
reader.BaseStream.Seek(0, SeekOrigin.Begin);
var secondPart = reader.ReadBytes((int)remain);
Array.Copy(secondPart, 0, BYTE4, firstPart.Length, secondPart.Length);
readPos = remain;
return BitConverter.ToInt32(BYTE4, 0);
}
}
public void Close()
{
if (opened)
{
lock (lockthis)
{
writer.Seek(0, SeekOrigin.End);
writer.Write(readPos);//記錄上一次讀取位置.
writer.Write(writePos);//記錄上一次寫入位置.
writer.Flush();
writer.Close();
reader.Close();
}
}
opened = false;
}
}
}
測試代碼
随機寫入或讀取100次數字,列印寫入和讀出的數字。
private static void testFixLengthFile()
{
var file = new Arim.DataX.Core.FixLengthFile("serialize1.bin", 1024);
file.Open();
Random r = new Random();
int j = 0;
for (int i=0;i<100;i++)
{
if(r.Next()%2 == 0)
{
file.Write(j);
Console.WriteLine($"寫{j}");
j++;
}
else
{
file.Read((o) =>
{
Console.WriteLine($" 讀{o.ToString()}");
return true;
});
}
}
file.Close();
}
運作結果:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLzYTY2UWOhZTNzUGOiJDOhhDZjRjYjRDZ1MTOmFmM4UzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
儲存檔案大小為給定的上限1024位元組:
使用須知
Write方法傳入的自定義對象必須标注了[Serializable]。