天天看點

.Net分表分庫動态化處理

介紹

本期主角:

ShardingCore

一款ef-core下高性能、輕量級針對分表分庫讀寫分離的解決方案,具有零依賴、零學習成本、零業務代碼入侵

我不是efcore怎麼辦

這邊肯定有小夥伴要問有沒有不是efcore的,我這邊很确信的和你講有并且适應所有的ADO.NET包括sqlhelper

ShardingConnector

一款基于ado.net下的高性能分表分庫解決方案目前已有demo案例,這個架構你可以認為是.Net版本的

ShardingSphere

但是目前僅實作了

ShardingSphere-JDBC

,後續我将會實作

ShardingSphere-Proxy

希望各位.Neter多多關注

背景

最近有個小夥伴來問我,分表下他有一批資料,這個資料是白天可能會相對比較頻繁資料錄入,但是到了晚上可能基本上就沒有對應的資料了,因為看到了我的架構,本來想以按小時來實作分表但是這麼以來可能會導緻一天有24張表,表多的情況下還導緻了資料分布不均勻,這是一個很嚴重的問題因為可能以24小時制會讓8-17這幾張白天的表資料很多,但是晚上和淩晨的表基本沒有資料,沒有資料其實意味着這些表其實不需要去查詢,基于這個情況想來問我應該如何實作這個自定義的路由。

聽了他的需求,其實我這邊又進行了一次确認,針對這個場景更多的其實是這個小夥伴需要的是按需分片,實時建表,來保證需要的資料進行合理的插入,那麼我們應該如何在

ShardingCore

下實作這麼一個需求呢,廢話不多說直接開始吧~~~

建立項目

本次需求我們以mysql作為測試資料庫,然後使用efcore6作為資料庫驅動orm來實作怎麼處理才能達到這個效果的分表分庫(本次隻涉及分表)。

建立一個項目

添加依賴

//請安裝最新版本第一個版本号6代表efcore的版本号
Install-Package ShardingCore -Version 6.4.3.4

Install-Package Microsoft.EntityFrameworkCore.SqlServer -Version 6.0.1
           

建立一個對象表,配置對應的資料庫映射關系并且關聯到dbcontext

//建立資料庫對象
    public class OrderByHour
    {
        public string Id { get; set; }
        public DateTime CreateTime { get; set; }
        public string Name { get; set; }
    }
//映射對象結構到資料庫
    public class OrderByHourMap:IEntityTypeConfiguration<OrderByHour>
    {
        public void Configure(EntityTypeBuilder<OrderByHour> builder)
        {
            builder.HasKey(o => o.Id);
            builder.Property(o => o.Id).IsRequired().HasMaxLength(50);
            builder.Property(o => o.Name).IsRequired().HasMaxLength(128);
            builder.ToTable(nameof(OrderByHour));
        }
    }
//建立dbcontext為efcore所用上下文
    public class DefaultDbContext:AbstractShardingDbContext,IShardingTableDbContext
    {
        public DefaultDbContext(DbContextOptions<DefaultDbContext> options) : base(options)
        {
        }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);
            modelBuilder.ApplyConfiguration(new OrderByHourMap());
        }

        public IRouteTail RouteTail { get; set; }
    }
           

到這邊其實隻需要啟動時候依賴注入

services.AddDbContext<DefaultDbContext>(o=>o.UseMySql(xxxx));
           

那麼efcore就可以運作了,這麼一看其實并沒有很複雜而且

IEntityTypeConfiguration

也不是必須的,efcore允許使用attribute來實作

當然

DefaultDbContext:AbstractShardingDbContext,IShardingTableDbContext

這一部分在原生efcore中應該是

DefaultDbContext:DbContext

建立分片路由

首先我們來看一下

ShardingCore

針對分片路由的自定義情況的分析,通過文檔我們可以了解到,如果想要使用自定義路由那麼你隻需要自己建立一個路由并且繼承實作

AbstractShardingOperatorVirtualTableRoute

,當然這是分表的,分庫是

AbstractShardingOperatorVirtualDataSourceRoute

.

接下來我們建立一個路由并且實作分表操作。

public class orderByHourRoute : AbstractShardingOperatorVirtualTableRoute<OrderByHour, DateTime>
    {
        public override string ShardingKeyToTail(object shardingKey)
        {
            throw new NotImplementedException();
        }

        public override List<string> GetAllTails()
        {
            throw new NotImplementedException();
        }

        public override void Configure(EntityMetadataTableBuilder<OrderByHour> builder)
        {
            throw new NotImplementedException();
        }

        public override Expression<Func<string, bool>> GetRouteToFilter(DateTime shardingKey, ShardingOperatorEnum shardingOperator)
        {
            throw new NotImplementedException();
        }
    }
           

接下來我們依次來實作并且說明各個接口。

  • ShardingKeyToTail:将你的對象轉成資料庫的字尾尾巴,比如你是按月分片,那麼你的分片值大機率是datetime,那麼隻需要

    datetime.ToString("yyyyMM")

    就可以擷取到分片字尾
  • GetAllTails:傳回集合,集合是資料庫現有的目前表的所有字尾,僅程式啟動時被調用,這個接口就是需要你傳回目前資料庫中目前表在系統裡面有多少張表,然後傳回這些表的字尾
  • Configure:配置目前對象按什麼字段分片
  • GetRouteToFilter:因為

    ShardingCore

    記憶體有目前所有表的字尾,假設字尾為list集合,傳回的

    Expression<Func<string, bool>>

    在經過

    And

    Or

    後的組合進行

    Compile()

    ,然後對

    list.Where(expression.Compile()).ToList()

    就可以傳回對應的本次查詢的字尾資訊

廢話不多說針對這個條件我們直接開始操作完成路由的實作

路由的編寫

1.ShardingKeyToTail:因為我們是按小時分表是以格式化值字尾我們采用日期格式化

//因為分片建是DateTime類型是以直接強轉
        public override string ShardingKeyToTail(object shardingKey)
        {
            var dateTime = (DateTime)shardingKey;
            return ShardingKeyFormat(dateTime);
        }
        private string ShardingKeyFormat(DateTime dateTime)
        {
            var tail = $"{dateTime:yyyyMMddHH}";

            return tail;
        }
           

2.Configure:分表配置

public override void Configure(EntityMetadataTableBuilder<OrderByHour> builder)
        {
            builder.ShardingProperty(o => o.CreateTime);
        }
           

3.GetRouteToFilter:路由比較,因為是時間字元串的字尾具有和按年,按月等相似的屬性是以我們直接參考預設路由來實作

public override Expression<Func<string, bool>> GetRouteToFilter(DateTime shardingKey, ShardingOperatorEnum shardingOperator)
        {
            var t = ShardingKeyFormat(shardingKey);
            switch (shardingOperator)
            {
                case ShardingOperatorEnum.GreaterThan:
                case ShardingOperatorEnum.GreaterThanOrEqual:
                    return tail => String.Compare(tail, t, StringComparison.Ordinal) >= 0;
                case ShardingOperatorEnum.LessThan:
                {
                    var currentHourBeginTime = new DateTime(shardingKey.Year,shardingKey.Month,shardingKey.Day,shardingKey.Hour,0,0);
                    //處于臨界值 不應該被傳回
                    if (currentHourBeginTime == shardingKey)
                        return tail => String.Compare(tail, t, StringComparison.Ordinal) < 0;
                    return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
                }
                case ShardingOperatorEnum.LessThanOrEqual:
                    return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
                case ShardingOperatorEnum.Equal: return tail => tail == t;
                default:
                {
                    return tail => true;
                }
            }
        }
           

4.

GetAllTails

:比較特殊我們因為并不是連續生成的是以沒辦法使用起始時間然後一直推到目前時間來實作字尾的傳回,隻能依靠ado.net的能力讀取資料庫然後傳回對應的表字尾,當然你也可以使用redis等三方工具來存儲

//1.構造函數注入 IVirtualDataSourceManager<DefaultDbContext> virtualDataSourceManager

//2/mysql的ado.net讀取資料庫表(sqlserver和mysql有差異自行百度或者檢視ShardingCore的SqlServerTableEnsureManager類)
        private const string CurrentTableName = nameof(OrderByHour);
        private const string Tables = "Tables";
        private const string TABLE_SCHEMA = "TABLE_SCHEMA";
        private const string TABLE_NAME = "TABLE_NAME";

        private readonly ConcurrentDictionary<string, object?> _tails = new ConcurrentDictionary<string, object?>();
        /// <summary>
        /// 如果你是非mysql資料庫請自行實作這個方法傳回目前類在資料庫已經存在的字尾
        /// 僅啟動時調用
        /// </summary>
        /// <returns></returns>
        public override List<string> GetAllTails()
        {
            //啟動尋找有哪些表字尾
            using (var connection = new MySqlConnection(_virtualDataSourceManager.GetCurrentVirtualDataSource().DefaultConnectionString))
            {
                connection.Open();
                var database = connection.Database;
                
                using (var dataTable = connection.GetSchema(Tables))
                {
                    for (int i = 0; i < dataTable.Rows.Count; i++)
                    {
                        var schema = dataTable.Rows[i][TABLE_SCHEMA];
                        if (database.Equals($"{schema}", StringComparison.OrdinalIgnoreCase))
                        {
                            var tableName = dataTable.Rows[i][TABLE_NAME]?.ToString()??string.Empty;
                            if (tableName.StartsWith(CurrentTableName, StringComparison.OrdinalIgnoreCase))
                            {
                                //如果沒有下劃線那麼需要CurrentTableName.Length有下劃線就要CurrentTableName.Length+1
                                _tails.TryAdd(tableName.Substring(CurrentTableName.Length),null);
                            }
                        }
                    }
                }
            }
            return _tails.Keys.ToList();
        }
           

動态建立添加表

到目前為止我們已經完成了路由的靜态分片的處理,但是還有一點需要處理就是如何在插入值得時候判斷目前有沒有對應的資料庫表是否需要建立等操作

檢視

AbstractShardingOperatorVirtualTableRoute

分表抽象類的父類我們發現目前抽象類有兩個地方會調用路由的擷取判斷方法

  • DoRouteWithPredicate:使用條件路由也就是where後面的表達式
  • RouteWithValue:使用值路由也就是我們的新增和修改整個對象的時候會被調用

是以通過上述流程的梳理我們可以知道隻需要在

RouteWithValue

處進行動手腳即可,又因為我們需要動态建表是以我們可以參考預設路由的自動建表的代碼進行參考

AbstractShardingAutoCreateOperatorVirtualTableRoute

下的

ExecuteAsync

private readonly IVirtualDataSourceManager<DefaultDbContext> _virtualDataSourceManager;
        private readonly IVirtualTableManager<DefaultDbContext> _virtualTableManager;
        private readonly IShardingTableCreator<DefaultDbContext> _shardingTableCreator;
        private readonly ConcurrentDictionary<string, object?> _tails = new ConcurrentDictionary<string, object?>();
        private readonly object _lock = new object();

        public OrderByHourRoute(IVirtualDataSourceManager<DefaultDbContext> virtualDataSourceManager,IVirtualTableManager<DefaultDbContext> virtualTableManager, IShardingTableCreator<DefaultDbContext> shardingTableCreator)
        {
            _virtualDataSourceManager = virtualDataSourceManager;
            _virtualTableManager = virtualTableManager;
            _shardingTableCreator = shardingTableCreator;
        }

        public override IPhysicTable RouteWithValue(List<IPhysicTable> allPhysicTables, object shardingKey)
        {
            var shardingKeyToTail = ShardingKeyToTail(shardingKey);

            if (!_tails.TryGetValue(shardingKeyToTail,out var _))
            {
                lock (_lock)
                {
                    if (!_tails.TryGetValue(shardingKeyToTail,out var _))
                    {
                        var virtualTable = _virtualTableManager.GetVirtualTable(typeof(OrderByHour));
//必須先執行AddPhysicTable在進行CreateTable
                        _virtualTableManager.AddPhysicTable(virtualTable, new DefaultPhysicTable(virtualTable, shardingKeyToTail));
                        try
                        {
                            _shardingTableCreator.CreateTable<OrderByHour>(_virtualDataSourceManager.GetCurrentVirtualDataSource().DefaultDataSourceName, shardingKeyToTail);
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine("嘗試添加表失敗" + ex);
                        }

                        _tails.TryAdd(shardingKeyToTail,null);
                    }
                }
            }

            var needRefresh = allPhysicTables.Count != _tails.Count;
            if (needRefresh)
            {
                var virtualTable = _virtualTableManager.GetVirtualTable(typeof(OrderByHour));
                //修複可能導緻疊代器周遊時添加的bug
                var keys = _tails.Keys.ToList();
                foreach (var tail in keys)
                {
                    var hashSet = allPhysicTables.Select(o=>o.Tail).ToHashSet();
                    if (!hashSet.Contains(tail))
                    {
                        var tables = virtualTable.GetAllPhysicTables();
                        var physicTable = tables.FirstOrDefault(o=>o.Tail==tail);
                        if (physicTable!= null)
                        {
                            allPhysicTables.Add(physicTable);
                        }
                    }
                }
            }
            var physicTables = allPhysicTables.Where(o => o.Tail== shardingKeyToTail).ToList();
            if (physicTables.IsEmpty())
            {
                throw new ShardingCoreException($"sharding key route not match {EntityMetadata.EntityType} -> [{EntityMetadata.ShardingTableProperty.Name}] ->【{shardingKey}】 all tails ->[{string.Join(",", allPhysicTables.Select(o=>o.FullName))}]");
            }

            if (physicTables.Count > 1)
                throw new ShardingCoreException($"more than one route match table:{string.Join(",", physicTables.Select(o => $"[{o.FullName}]"))}");
            return physicTables[0];
        }
           

通過和父類的比較我們隻是在對應的根據值判斷目前系統是否存在xx表如果不存在就在

ShardingCore

上插入

AddPhysicTable

然後

CreateTable

最後

_tails.TryAdd(shardingKeyToTail,null);

needRefresh

處的代碼需要針對如果目前需要和傳入的全量表進行比對因為新加的表字尾不在全量表裡面是以需要先進行對其的處理然後再進行執行

啟動配置必不可少

ILoggerFactory efLogger = LoggerFactory.Create(builder =>
{
    builder.AddFilter((category, level) => category == DbLoggerCategory.Database.Command.Name && level == LogLevel.Information).AddConsole();
});

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddShardingDbContext<DefaultDbContext>()
    .AddEntityConfig(o =>
    {
        o.ThrowIfQueryRouteNotMatch = false;
        o.CreateShardingTableOnStart = true;
        o.EnsureCreatedWithOutShardingTable = true;
        o.AddShardingTableRoute<OrderByHourRoute>();
    })
    .AddConfig(o =>
    {
        o.ConfigId = "c1";
        o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;");
        o.UseShardingQuery((conn, b) =>
        {
            b.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
        });
        o.UseShardingTransaction((conn, b) =>
        {
            b.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
        });
        o.ReplaceTableEnsureManager(sp=>new MySqlTableEnsureManager<DefaultDbContext>());
    }).EnsureConfig();
var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}
app.Services.GetRequiredService<IShardingBootstrapper>().Start();
app.UseAuthorization();

app.MapControllers();

app.Run();
           

最後我們直接啟動運作調試代碼

.Net分表分庫動态化處理

當我們插入一個沒有的時間對應的架構會幫我們對應的建立表并且插入資料

.Net分表分庫動态化處理

這個思路就是可以保證需要的時候就建立表不需要就不建立

.Net分表分庫動态化處理

最後的最後

demo位址 https://github.com/dotnetcore/sharding-core/tree/main/samples/Sample.AutoCreateIfPresent

您都看到這邊了确定不點個star或者贊嗎,一款.Net不得不學的分庫分表解決方案,簡單了解為sharding-jdbc在.net中的實作并且支援更多特性和更優秀的資料聚合,擁有原生性能的97%,并且無業務侵入性,支援未分片的所有efcore原生查詢

  • github位址 https://github.com/xuejmnet/sharding-core