应用场景
在通讯中,收发数据都应该先放到队列中,避免堵塞通讯,最好做持久化,避免因队列满或程序退出造成丢数,就像RabbitMQ这类中间件那样,这里我们手搓一个用于做对象持久化到文件的辅助类。
实现思路
开辟一个文件,同时支持读和写,设置大小上限,当文件达到上限时从头覆盖写入,如此往复,如果文件大小足够大,就能够在无法转发那段时间把数据都缓存起来,并记录转发的断点,当通讯恢复时继续转发。
关键点
1. 在一个FileStream中同时记录读位置和写位置,而流本身只有一个Position,因此算法需要小心处理读写位置的计算;
2.不允许读位置先于写位置,即当读位置等于写位置时停止读;
3.当写覆盖了未读数据时,要将读位置向前移动,即丢弃未读的旧数据,发生这种情况意味着文件大小不足以缓存无法转发的所有数据;
4.关闭流之前,在文件的末尾保存读位置和写位置。
实现代码
using System;
using System.IO;
namespace Arim.DataX.Core
{
/// <summary>
/// 固定大小的文件,当写到底时从头开始覆盖.
/// </summary>
public class FixLengthFile
{
private readonly long maxLength;
private readonly string fileName;
private readonly object lockthis;
private System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter;
public FixLengthFile(string name, long length)
{
fileName = name;
maxLength = length;
lockthis = new object();
formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
}
//读写位置.
private BinaryWriter writer;
private BinaryReader reader;
private bool opened = false;
private const int TAIL16 = 16;//文件尾保留16字节存储readpos和writepos.
private long readPos = 0, writePos = 0;
public void Open()
{
if (!opened)
{
lock (lockthis)
{
var stream = new FileStream(fileName, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);
writer = new BinaryWriter(stream);
reader = new BinaryReader(stream);
var length = reader.BaseStream.Length;
if (length > TAIL16)
{
reader.BaseStream.Seek(-TAIL16, SeekOrigin.End);
readPos = reader.ReadInt64();
writePos = reader.ReadInt64();
writer.BaseStream.Seek(writePos, SeekOrigin.Begin);//跳转到上次写入的位置.
writer.BaseStream.SetLength(length - TAIL16);//丢弃之前保存的读写位置.
}
}
opened = true;
}
}
private readonly byte[] BYTE4 = new byte[4];
public void Write(object obj)
{
if (obj == null) return;
if (!opened) Open();
using (MemoryStream stream = new MemoryStream())
{
stream.Write(BYTE4, 0, 4);//预留4个字节的长度.
formatter.Serialize(stream, obj);
var size = (int)stream.Position - 4;
stream.Seek(0, SeekOrigin.Begin);
stream.Write(BitConverter.GetBytes(size), 0, 4);//覆盖前4个长度字节.
size += 4;
var arr = stream.GetBuffer();
lock (lockthis)
{
var remainspace = maxLength - TAIL16 - writePos; //到文件尾部剩余的可写空间.
var remainlength = size - remainspace;//到文件尾部还写不下的部分长度.
if (remainlength > 0)
{
moveReadPos(writePos, maxLength - TAIL16, 0, remainlength);
writer.BaseStream.Position = writePos;
writer.Write(arr, 0, (int)remainspace);
writer.BaseStream.Seek(0, SeekOrigin.Begin);
writer.Write(arr, (int)remainspace, (int)remainlength);
}//写满文件后将剩余字节从头写入.
else
{
moveReadPos(writePos, writePos + size);
writer.BaseStream.Position = writePos;
writer.Write(arr, 0, size);
}
writePos = writer.BaseStream.Position;
}
}
}
//如果读取位置被覆盖,移动到下一个不被覆盖的数据位置.
private void moveReadPos(long start, long end)
{
while (readPos > start && readPos <= end)
{
doMoveReadPos();
}
}
private void moveReadPos(long start1, long end1, long start2, long end2)
{
while ((readPos > start1 && readPos <= end1) || (readPos >= start2 && readPos <= end2))
{
doMoveReadPos();
}
}
private void doMoveReadPos()
{
var length = writer.BaseStream.Length;
var pos = readPos;//readInt32会改变readPos.
var size = readInt32(length) + 4;
//seek
var remain = pos + size - length;
if (remain <= 0)
{
pos += size;
}//文件不满.
else
{
pos = remain;
}
readPos = pos;
}
public void Read(Func<object, bool> handle)
{
if (handle != null)
{
lock (lockthis)
{
if (writePos != readPos)
{
var oldreadpos = readPos;
var length = writer.BaseStream.Length;
var size = readInt32(length);
var stream = readBytes(length, size);
var result = formatter.Deserialize(stream);
stream.Close();
if (!handle(result))
{
reader.BaseStream.Seek(oldreadpos, SeekOrigin.Begin);//重置到读取之前的位置.
}
}//有可读数据.
}
}
}
private Stream readBytes(long filelength, int size)
{
reader.BaseStream.Position = readPos;
var result = new MemoryStream(size);
var remain = readPos + size - filelength;
if (remain <= 0)
{
var arr = reader.ReadBytes(size);
readPos += size;
result.Write(arr, 0, arr.Length);
}//文件不满.
else
{
var firstPart = reader.ReadBytes((int)(filelength - readPos));
result.Write(firstPart, 0, firstPart.Length);
reader.BaseStream.Seek(0, SeekOrigin.Begin);
var secondPart = reader.ReadBytes((int)remain);
readPos = remain;
result.Write(secondPart, 0, secondPart.Length);
}
result.Seek(0, SeekOrigin.Begin);
return result;
}
private int readInt32(long filelength)
{
reader.BaseStream.Position = readPos;
var remain = readPos + 4 - filelength;
if (remain <= 0)
{
var result = reader.ReadInt32();
readPos += 4;
return result;
}//文件不满.
else
{
var firstPart = reader.ReadBytes((int)(filelength - readPos));
Array.Copy(firstPart, 0, BYTE4, 0, firstPart.Length);
reader.BaseStream.Seek(0, SeekOrigin.Begin);
var secondPart = reader.ReadBytes((int)remain);
Array.Copy(secondPart, 0, BYTE4, firstPart.Length, secondPart.Length);
readPos = remain;
return BitConverter.ToInt32(BYTE4, 0);
}
}
public void Close()
{
if (opened)
{
lock (lockthis)
{
writer.Seek(0, SeekOrigin.End);
writer.Write(readPos);//记录上一次读取位置.
writer.Write(writePos);//记录上一次写入位置.
writer.Flush();
writer.Close();
reader.Close();
}
}
opened = false;
}
}
}
测试代码
随机写入或读取100次数字,打印写入和读出的数字。
private static void testFixLengthFile()
{
var file = new Arim.DataX.Core.FixLengthFile("serialize1.bin", 1024);
file.Open();
Random r = new Random();
int j = 0;
for (int i=0;i<100;i++)
{
if(r.Next()%2 == 0)
{
file.Write(j);
Console.WriteLine($"写{j}");
j++;
}
else
{
file.Read((o) =>
{
Console.WriteLine($" 读{o.ToString()}");
return true;
});
}
}
file.Close();
}
运行结果:

保存文件大小为给定的上限1024字节:
使用须知
Write方法传入的自定义对象必须标注了[Serializable]。