初始化
# 介绍
分库分表介绍
sharding-core
支持自定义分库,流式聚合,支持同表join,数据源并非分表的笛卡尔积而是交集,比如我分库的对象和不分库的对象join结果只会查询不分库对象对应的数据源- 分库下的分布式事务(支持业务逻辑导致的事务出错可以回滚),网络情况下的事务出错直接忽略除非是第一个提交的事务,第二个数据源提交的事务开始就直接忽略异常。
# Demo
本次分库的demo源码:SqlServerShardingAll (opens new window)
# 分表使用
先拟定一个场景目前有用户表SysUser
和订单表Order
,用户我们按用户区域进行分库按用户Id取模分表,订单我们按用户区域分库按订单时间分表
首先创建一个空的aspnetcore web api。
# 安装ShardingCore
# 请对应安装您需要的版本
PM> Install-Package ShardingCore
# 请对应数据库版本
PM> Install-Package Microsoft.EntityFrameworkCore.SqlServer
2
3
4
# 创建对象
public enum OrderStatusEnum
{
NoPay=1,
Paying=2,
Payed=3,
PayFail=4
}
public class Order
{
public string Id { get; set; }
public string Payer { get; set; }
public long Money { get; set; }
public string Area { get; set; }
public OrderStatusEnum OrderStatus { get; set; }
public DateTime CreationTime { get; set; }
}
public class SysUser
{
public string Id { get; set; }
public string Name { get; set; }
public string Area { get; set; }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 创建DbContext
这样我们就创建好了三张表,接下来我们创建我们的DbContext
,因为需要分表所以我们需要实现IShardingTableDbContext
接口
public class MyDbContext:AbstractShardingDbContext,IShardingTableDbContext
{
public MyDbContext(DbContextOptions<MyDbContext> options) : base(options)
{
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.Entity<Order>(entity =>
{
entity.HasKey(o => o.Id);
entity.Property(o => o.Id).IsRequired().IsUnicode(false).HasMaxLength(50);
entity.Property(o=>o.Payer).IsRequired().IsUnicode(false).HasMaxLength(50);
entity.Property(o => o.Area).IsRequired().IsUnicode(false).HasMaxLength(50);
entity.Property(o => o.OrderStatus).HasConversion<int>();
entity.ToTable(nameof(Order));
});
modelBuilder.Entity<SysUser>(entity =>
{
entity.HasKey(o => o.Id);
entity.Property(o => o.Id).IsRequired().IsUnicode(false).HasMaxLength(50);
entity.Property(o=>o.Name).IsRequired().IsUnicode(false).HasMaxLength(50);
entity.Property(o=>o.Area).IsRequired().IsUnicode(false).HasMaxLength(50);
entity.ToTable(nameof(SysUser));
});
}
public IRouteTail RouteTail { get; set; }
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
自定义标题
- 构造函数必须是
DbContextOptions<MyDbContext>
或者DbContextOptions
OnModelCreating
并不是说分表必须要这样,而是你原先efcore怎么使用就怎么使用,efcore配置对象有两种一种是DbSet
+Attribute
,另外一种是OnModelCreating
+ModelBuilder
,你可以选择你原先在用的任何一种AbstractShardingDbContext
这个对象是可以不继承的,但是如果要使用分表分库你必须实现IShardingTableDbContext
这个接口,因为这个接口实现起来都是一样的所以默认你只需要继承AbstractShardingDbContext
就可以了IShardingTableDbContext
这个接口在你需要支持分表的情况下需要加,如果您只是分库那么就不需要添加这个接口
# 创建虚拟路由
- 订单表按用户区域分库,因为分库系统并没有给我提供默认的分库路由,所以需要我们自行实现,我们先假设我们的数据源为A,B,C三个数据源, 再按订单时间分表
- 用户按区域分库,在按用户Id取模
//订单分库路由
public class OrderVirtualDataSourceRoute : AbstractShardingOperatorVirtualDataSourceRoute<Order, string>
{
private readonly List<string> _dataSources = new List<string>()
{
"A", "B", "C"
};
protected override string ConvertToShardingKey(object shardingKey)
{
return shardingKey?.ToString() ?? string.Empty;
}
//我们设置区域就是数据库
public override string ShardingKeyToDataSourceName(object shardingKey)
{
return ConvertToShardingKey(shardingKey);
}
public override List<string> GetAllDataSourceNames()
{
return _dataSources;
}
public override bool AddDataSourceName(string dataSourceName)
{
if (_dataSources.Any(o => o == dataSourceName))
return false;
_dataSources.Add(dataSourceName);
return true;
}
protected override Func<string, bool> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
{
var t = ShardingKeyToDataSourceName(shardingKey);
switch (shardingOperator)
{
case ShardingOperatorEnum.Equal: return tail => tail == t;
default:
{
return tail => true;
}
}
}
public override void Configure(EntityMetadataDataSourceBuilder<Order> builder)
{
builder.ShardingProperty(o => o.Area);
}
}
//订单分表路由
public class OrderVirtualTableRoute:AbstractSimpleShardingMonthKeyDateTimeVirtualTableRoute<Order>
{
public override DateTime GetBeginTime()
{
return new DateTime(2021, 1, 1);
}
public override void Configure(EntityMetadataTableBuilder<Order> builder)
{
builder.ShardingProperty(o => o.CreationTime);
}
}
//用户区域分库
public class SysUserVirtualDataSourceRoute : AbstractShardingOperatorVirtualDataSourceRoute<SysUser, string>
{
private readonly List<string> _dataSources = new List<string>()
{
"A", "B", "C"
};
protected override string ConvertToShardingKey(object shardingKey)
{
return shardingKey?.ToString() ?? string.Empty;
}
//我们设置区域就是数据库
public override string ShardingKeyToDataSourceName(object shardingKey)
{
return ConvertToShardingKey(shardingKey);
}
public override List<string> GetAllDataSourceNames()
{
return _dataSources;
}
public override bool AddDataSourceName(string dataSourceName)
{
if (_dataSources.Any(o => o == dataSourceName))
return false;
_dataSources.Add(dataSourceName);
return true;
}
protected override Func<string, bool> GetRouteToFilter(string shardingKey, ShardingOperatorEnum shardingOperator)
{
var t = ShardingKeyToDataSourceName(shardingKey);
switch (shardingOperator)
{
case ShardingOperatorEnum.Equal: return tail => tail == t;
default:
{
return tail => true;
}
}
}
public override void Configure(EntityMetadataDataSourceBuilder<SysUser> builder)
{
builder.ShardingProperty(o => o.Area);
}
}
//用户Id取模
public class SysUserVirtualTableRoute:AbstractSimpleShardingModKeyStringVirtualTableRoute<SysUser>
{
public SysUserVirtualTableRoute() : base(2, 3)
{
}
public override void Configure(EntityMetadataTableBuilder<SysUser> builder)
{
builder.ShardingProperty(o => o.Id);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
注意
因为分库提供默认路由所以需要用户自行实现AbstractShardingOperatorVirtualDataSourceRoute
抽象,分表有默认抽象路由但是如果无法满足需求可以自定义实现AbstractShardingOperatorVirtualTableRoute
抽象
# Startup配置
ConfigureServices(IServiceCollection services)
配置
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddShardingDbContext<MyDbContext>()
.UseRouteConfig(o =>
{
o.AddShardingDataSourceRoute<OrderVirtualDataSourceRoute>();
o.AddShardingDataSourceRoute<SysUserVirtualDataSourceRoute>();
o.AddShardingTableRoute<SysUserVirtualTableRoute>();
o.AddShardingTableRoute<OrderVirtualTableRoute>();
})
.UseConfig(op =>
{
op.UseShardingQuery((conStr, builder) =>
{
builder.UseSqlServer(conStr).UseLoggerFactory(efLogger);
});
op.UseShardingTransaction((connection, builder) =>
{
builder.UseSqlServer(connection).UseLoggerFactory(efLogger);
});
op.AddDefaultDataSource("A",
"Data Source=localhost;Initial Catalog=EFCoreShardingDataSourceTableDBA;Integrated Security=True;");
op.AddExtraDataSource(sp =>
{
return new Dictionary<string, string>()
{
{
"B","Data Source=localhost;Initial Catalog=EFCoreShardingDataSourceTableDBB;Integrated Security=True;"
},
{
"C","Data Source=localhost;Initial Catalog=EFCoreShardingDataSourceTableDBC;Integrated Security=True;"
},
};
});
}).AddShardingCore();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
重要
!!!MyDbContext
请勿注册成单例、ServiceLifetime.Singleton
!!!
!!!MyDbContext
请勿注册成单例、ServiceLifetime.Singleton
!!!
!!!MyDbContext
请勿注册成单例、ServiceLifetime.Singleton
!!!
新建一个扩展方法用来初始化ShardingCore和初始化种子数据
public static class StartupExtension
{
public static void UseShardingCore(this IApplicationBuilder app)
{
app.ApplicationServices.GetRequiredService<IShardingBootstrapper>().Start();
}
public static void InitSeed(this IApplicationBuilder app)
{
using (var serviceScope = app.ApplicationServices.CreateScope())
{
var myDbContext = serviceScope.ServiceProvider.GetRequiredService<MyDbContext>();
if (!myDbContext.Set<SysUser>().Any())
{
string[] areas = new string[] {"A","B","C" };
List<SysUser> users = new List<SysUser>(10);
for (int i = 0; i < 100; i++)
{
var uer=new SysUser()
{
Id = i.ToString(),
Name = $"MyName{i}",
Area = areas[new Random().Next(0,3)]
};
users.Add(uer);
}
List<Order> orders = new List<Order>(300);
var begin = new DateTime(2021, 1, 1, 3, 3, 3);
for (int i = 0; i < 300; i++)
{
var sysUser = users[i % 100];
var order = new Order()
{
Id = i.ToString(),
Payer = sysUser.Id,
Money = 100+new Random().Next(100,3000),
OrderStatus = (OrderStatusEnum)(i % 4 + 1),
Area = sysUser.Area,
CreationTime = begin.AddDays(i)
};
orders.Add(order);
}
myDbContext.AddRange(users);
myDbContext.AddRange(orders);
myDbContext.SaveChanges();
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Configure(IApplicationBuilder app, IWebHostEnvironment env)
配置
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
//初始化ShardingCore
app.UseShardingCore();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
//初始化种子数据
app.InitSeed();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# efcore日志(可选)
这边为了方便我们观察efcore的执行sql语句我们这边建议对efcore添加日志
Startup
添加静态数据
public static readonly ILoggerFactory efLogger = LoggerFactory.Create(builder =>
{
builder.AddFilter((category, level) => category == DbLoggerCategory.Database.Command.Name && level == LogLevel.Information).AddConsole();
});
2
3
4
在所有builder.UseSqlServer(conStr);
都改成builder.UseSqlServer(conStr).UseLoggerFactory(efLogger);
启动后我们将可以看到数据库和表会被自动创建,并且会将种子数据进行插入到内部供我们可以查询测试