1.前言 hi,大家好,我是三合。我是怎麼想起寫一篇關於資料庫快速批量插入的博客的呢?事情起源於我們工作中的一個需求,簡單來說,就是有一個定時任務,從資料庫里獲取大量數據,在應用層面經過處理後再把結果批量插入回到資料庫里。這個任務每十分鐘執行一次,但是有的時候數據量太大,迴圈插入資料庫的時候會超時, ...
1.前言
hi,大家好,我是三合。我是怎麼想起寫一篇關於資料庫快速批量插入的博客的呢?事情起源於我們工作中的一個需求,簡單來說,就是有一個定時任務,從資料庫里獲取大量數據,在應用層面經過處理後再把結果批量插入回到資料庫里。這個任務每十分鐘執行一次,但是有的時候數據量太大,迴圈插入資料庫的時候會超時,導致任務失敗,所以這個時候我就開始研究怎麼快速批量插入資料庫,因為我們用的資料庫是Oracle,所以我首先研究了Oracle的快速批量插入,後面我一想那其他類型的資料庫肯定也有這樣的需求,於是我在找了很多資料,並且反覆實驗後,終於完美解決了mysql,sqlServer以及Oracle的快速批量插入,sqlite自身不支持,所以沒有sqlite,特地整理成這篇文章,分享給大家。
2.測試前準備
添加一個具有絕大多數類型屬性的實體類,用來完整測驗批量插入效果,該實體類用於mysql和sqlserver的測試。
public class NullableTable
{
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
[Key]
public int Id { get; set; }
[Description("Int2")]
public int? Int2 { get; set; }
[Description("Long2")]
public long? Long2 { get; set; }
public float? Float2 { get; set; }
public double? Double2 { get; set; }
public decimal? Decimal2 { get; set; }
[DecimalPrecision(20,4)]
public decimal? Decimal3 { get; set; }
public Guid? Guid2 { get; set; }
public short? Short2 { get; set; }
public DateTime? DateTime2 { get; set; }
public bool? Bool2 { get; set; }
public TimeSpan? TimeSpan2 { get; set; }
public byte? Byte2 { get; set; }
[StringLength(100)]
public string String2 { get; set; }
public string String3 { get; set; }
public Enum2? Enum2 { get; set; }
[Column("TestInt3")]
[Description("Int2")]
public int? Int3 { get; set; }
}
public enum Enum2
{
x,
y
}
因為oracle資料庫我們習慣於表名和欄位名大寫,所以oracle的測試實體類定義如下:
[Table("NULLABLETABLE")]
[Description("NullableTable")]
public class NullableTable
{
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
[Key]
[Column("ID")]
public int Id { get; set; }
[Description("Int2")]
[Column("INT2")]
public int? Int2 { get; set; }
[Description("Long2")]
[Column("LONG2")]
public long? Long2 { get; set; }
[Column("FLOAT2")]
public float? Float2 { get; set; }
[Column("DOUBLE2")]
public double? Double2 { get; set; }
[Column("DECIMAL2")]
public decimal? Decimal2 { get; set; }
[Column("DECIMAL3")]
[DecimalPrecision(20,4)]
public decimal? Decimal3 { get; set; }
[Column("GUID2")]
public Guid? Guid2 { get; set; }
[Column("SHORT2")]
public short? Short2 { get; set; }
[Column("DATETIME2")]
public DateTime? DateTime2 { get; set; }
[Column("BOOL2")]
public bool? Bool2 { get; set; }
[Column("TIMESPAN2")]
public TimeSpan? TimeSpan2 { get; set; }
[Column("BYTE2")]
public byte? Byte2 { get; set; }
[Column("STRING2")]
[StringLength(100)]
public string String2 { get; set; }
[Column("STRING3")]
public string String3 { get; set; }
[Column("ENUM2")]
public Enum2? Enum2 { get; set; }
[Column("TESTINT3")]
[Description("Int2")]
public int? Int3 { get; set; }
}
實驗我們採用的是code first,先利用SummerBoot框架的可用於依賴註入的,資料庫表和c#實體類互相轉換的介面實現功能從實體類生成相應的資料庫表,本次實驗批量插入2w條數據來對比時間,定義一個列表,用迴圈的方式給這個列表添加2w條數據。
var nullableTableList3 = new List<NullableTable>();
var now = DateTime.Now;
for (int i = 0; i < 20000; i++)
{
var a = new NullableTable()
{
Int2 = 2,
Bool2 = true,
Byte2 = 1,
DateTime2 = now,
Decimal2 = 1m,
Decimal3 = 1.1m,
Double2 = 1.1,
Float2 = (float)1.1,
Guid2 = Guid.NewGuid(),
Id = 0,
Short2 = 1,
TimeSpan2 = TimeSpan.FromHours(1),
String2 = "sb",
String3 = "sb",
Long2 = 2,
Enum2 = Model.Enum2.y,
Int3 = 4
};
nullableTableList3.Add(a);
}
資料庫驅動上的選擇是這樣的,sqlserver採用微軟官方驅動System.Data.SqlClient,oracle採用官方驅動Oracle.ManagedDataAccess.Core,mysql採用社區驅動MySqlConnector(為啥mysql不採用官方的驅動呢?因為官方的驅動封裝的太差了,社區的驅動支持列名映射,同時項目里官方驅動和社區驅動可以共存)。
同時快速批量插入均支持非同步同步,這裡僅演示同步,非同步的實現基本一樣。
3.sqlserver快速批量插入
sqlserver官方提供的批量插入方式是SqlBulkCopy,參數為一個dataTable對象,原生的批量插入代碼如下,採用StopWatch類進行計時,測試前都會用DELETE from NullableTable 語句清空表,測試里迴圈跑5次,獲取總時間後除以5獲取平均值,合計插入10w條數據。
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < 5; i++)
{
using (var dbConnection = new SqlConnection(connectionString))
{
dbConnection.Open();
SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(dbConnection, SqlBulkCopyOptions.KeepIdentity,
null);
sqlBulkCopy.BatchSize = 20000;
sqlBulkCopy.DestinationTableName = "NullableTable";
//針對列名做一下映射
sqlBulkCopy.ColumnMappings.Add("Int2", "Int2");
sqlBulkCopy.ColumnMappings.Add("Bool2", "Bool2");
sqlBulkCopy.ColumnMappings.Add("Byte2", "Byte2");
sqlBulkCopy.ColumnMappings.Add("DateTime2", "DateTime2");
sqlBulkCopy.ColumnMappings.Add("Decimal2", "Decimal2");
sqlBulkCopy.ColumnMappings.Add("Decimal3", "Decimal3");
sqlBulkCopy.ColumnMappings.Add("Double2", "Double2");
sqlBulkCopy.ColumnMappings.Add("Float2", "Float2");
sqlBulkCopy.ColumnMappings.Add("Guid2", "Guid2");
sqlBulkCopy.ColumnMappings.Add("Short2", "Short2");
sqlBulkCopy.ColumnMappings.Add("TimeSpan2", "TimeSpan2");
sqlBulkCopy.ColumnMappings.Add("String2", "String2");
sqlBulkCopy.ColumnMappings.Add("String3", "String3");
sqlBulkCopy.ColumnMappings.Add("Long2", "Long2");
sqlBulkCopy.ColumnMappings.Add("Enum2", "Enum2");
sqlBulkCopy.ColumnMappings.Add("Int3", "TestInt3");
//將實體類列表轉換成dataTable
var table = nullableTableList3.ToDataTable();
sqlBulkCopy.WriteToServer(table);
}
}
sw.Stop();
var totalTime= sw.ElapsedMilliseconds;
var avgValue = totalTime / 5;
實驗結果如下,sql server中:
採用快速批量插入10w條數據,時間合計1858毫秒,平均插入2w條數據僅需371毫秒。
採用insert into語句,迴圈插入10w條數據,時間合計457606毫秒,平均插入2w條數據需91521毫秒。
4.實體類列表轉dataTable的擴展方法
這裡有一個實體類列表轉dataTable的擴展方法,採用的是表達式樹+構建委托的方式,性能不錯,大家可以參考,代碼實現如下。
public static ConcurrentDictionary<string, object> CacheDictionary = new ConcurrentDictionary<string, object>();
/// <summary>
/// 構建一個object數據轉換成一維數組數據的委托
/// </summary>
/// <param name="objType"></param>
/// <param name="propertyInfos"></param>
/// <returns></returns>
public static Func<T, object[]> BuildObjectGetValuesDelegate<T>(List<PropertyInfo> propertyInfos) where T : class
{
var objParameter = Expression.Parameter(typeof(T), "model");
var selectExpressions = propertyInfos.Select(it => BuildObjectGetValueExpression(objParameter, it));
var arrayExpression = Expression.NewArrayInit(typeof(object), selectExpressions);
var result = Expression.Lambda<Func<T, object[]>>(arrayExpression, objParameter).Compile();
return result;
}
/// <summary>
/// 構建對象獲取單個值得
/// </summary>
/// <param name="modelExpression"></param>
/// <param name="propertyInfo"></param>
/// <returns></returns>
public static Expression BuildObjectGetValueExpression(ParameterExpression modelExpression, PropertyInfo propertyInfo)
{
var propertyExpression = Expression.Property(modelExpression, propertyInfo);
var convertExpression = Expression.Convert(propertyExpression, typeof(object));
return convertExpression;
}
public static DataTable ToDataTable<T>(this IEnumerable<T> source, List<PropertyInfo> propertyInfos = null,bool useColumnAttribute=false) where T : class
{
var table = new DataTable("template");
if (propertyInfos == null || propertyInfos.Count == 0)
{
propertyInfos = typeof(T).GetProperties().Where(it => it.CanRead).ToList();
}
foreach (var propertyInfo in propertyInfos)
{
var columnName=useColumnAttribute?(propertyInfo.GetCustomAttribute<ColumnAttribute>()?.Name?? propertyInfo.Name) : propertyInfo.Name;
table.Columns.Add(columnName, ChangeType(propertyInfo.PropertyType));
}
Func<T, object[]> func;
var key = typeof(T).FullName + propertyInfos.Select(it => it.Name).ToList().StringJoin();
if (CacheDictionary.TryGetValue(key, out var cacheFunc))
{
func = (Func<T, object[]>)cacheFunc;
}
else
{
func = BuildObjectGetValuesDelegate<T>(propertyInfos);
CacheDictionary.TryAdd(key, func);
}
foreach (var model in source)
{
var rowData = func(model);
table.Rows.Add(rowData);
}
return table;
}
private static Type ChangeType(Type type)
{
if (type.IsNullable())
{
type = Nullable.GetUnderlyingType(type);
}
return type;
}
5.oracle快速批量插入
oracle官方提供的批量插入方式是ArrayBindCount,即數組批量插入,原生的批量插入代碼如下,計時方式與sqlserver相同
var total = 20000;
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < 5; i++)
{
var connection = new OracleConnection(connectionString);
connection.Open();
int?[] Int2 = new int?[total];
bool[] Bool2 = new bool[total];
byte[] Byte2 = new byte[total];
DateTime[] DateTime2 = new DateTime[total];
decimal?[] Decimal2 = new decimal?[total];
decimal[] Decimal3 = new decimal[total];
double[] Double2 = new double[total];
float[] Float2 = new float[total];
Guid?[] Guid2 = new Guid?[total];
short[] Short2 = new short[total];
TimeSpan[] TimeSpan2 = new TimeSpan[total];
string[] String2 = new string[total];
string[] String3 = new string[total];
long[] Long2 = new long[total];
Enum2[] Enum2 = new Enum2[total];
for (int j = 0; j < total; j++)
{
Int2[j] = 2;
Bool2[j] = true;
Byte2[j] = 1;
DateTime2[j] = now;
Decimal2[j] = 1m;
Decimal3[j] = 1.1m;
Double2[j] = 1.1;
Float2[j] = (float) 1.1;
Guid2[j] = Guid.NewGuid();
Short2[j] = 1;
TimeSpan2[j] = TimeSpan.FromHours(1);
String2[j] = "sb";
String3[j] = "sb";
Long2[j] = 2;
Enum2[j] = Model.Enum2.y;
}
var c = (int) Model.Enum2.y;
OracleParameter pInt2 = new OracleParameter();
pInt2.OracleDbType = OracleDbType.Int32;
pInt2.Value = Int2;
OracleParameter pBool2 = new OracleParameter();
pBool2.OracleDbType = OracleDbType.Byte;
pBool2.Value = Bool2;
OracleParameter pByte2 = new OracleParameter();
pByte2.OracleDbType = OracleDbType.Byte;
pByte2.Value = Byte2;
OracleParameter pDateTime2 = new OracleParameter();
pDateTime2.OracleDbType = OracleDbType.TimeStamp;
pDateTime2.Value = DateTime2;
OracleParameter pDecimal2 = new OracleParameter();
pDecimal2.OracleDbType = OracleDbType.Decimal;
pDecimal2.Value = Decimal2;
OracleParameter pDecimal3 = new OracleParameter();
pDecimal3.OracleDbType = OracleDbType.Decimal;
pDecimal3.Value = Decimal3;
OracleParameter pDouble2 = new OracleParameter();
pDouble2.OracleDbType = OracleDbType.Double;
pDouble2.Value = Double2;
OracleParameter pFloat2 = new OracleParameter();
pFloat2.OracleDbType = OracleDbType.BinaryFloat;
pFloat2.Value = Float2;
OracleParameter pGuid2 = new OracleParameter();
pGuid2.OracleDbType = OracleDbType.Raw;
pGuid2.Value = Guid2;
OracleParameter pShort2 = new OracleParameter();
pShort2.OracleDbType = OracleDbType.Int16;
pShort2.Value = Short2;
OracleParameter pTimeSpan2 = new OracleParameter();
pTimeSpan2.OracleDbType = OracleDbType.IntervalDS;
pTimeSpan2.Value = TimeSpan2;
OracleParameter pString2 = new OracleParameter();
pString2.OracleDbType = OracleDbType.Varchar2;
pString2.Value = String2;
OracleParameter pString3 = new OracleParameter();
pString3.OracleDbType = OracleDbType.Varchar2;
pString3.Value = String3;
OracleParameter pLong2 = new OracleParameter();
pLong2.OracleDbType = OracleDbType.Long;
pLong2.Value = Long2;
OracleParameter pEnum2 = new OracleParameter();
pEnum2.OracleDbType = OracleDbType.Byte;
pEnum2.Value = Enum2;
// create command and set properties
OracleCommand cmd = connection.CreateCommand();
cmd.CommandText =
"INSERT INTO NULLABLETABLE (INT2, LONG2, FLOAT2, DOUBLE2, DECIMAL2, DECIMAL3, GUID2, SHORT2, DATETIME2, BOOL2, TIMESPAN2, BYTE2, STRING2, STRING3,ENUM2) VALUES(:1,:2,:3,:4,:5,:6,:7,:8,:9,:10,:11,:12,:13,:14,:15)";
cmd.ArrayBindCount = total;
cmd.Parameters.Add(pInt2);
cmd.Parameters.Add(pLong2);
cmd.Parameters.Add(pFloat2);
cmd.Parameters.Add(pDouble2);
cmd.Parameters.Add(pDecimal2);
cmd.Parameters.Add(pDecimal3);
cmd.Parameters.Add(pGuid2);
cmd.Parameters.Add(pShort2);
cmd.Parameters.Add(pDateTime2);
cmd.Parameters.Add(pBool2);
cmd.Parameters.Add(pTimeSpan2);
cmd.Parameters.Add(pByte2);
cmd.Parameters.Add(pString2);
cmd.Parameters.Add(pString3);
cmd.Parameters.Add(pEnum2);
cmd.ExecuteNonQuery();
}
sw.Stop();
var totalTime = sw.ElapsedMilliseconds;
var avgValue = totalTime / 5;
實驗結果如下,oracle中:
採用快速批量插入10w條數據,時間合計2323毫秒,平均插入2w條數據僅需464毫秒。
採用insert into語句,迴圈插入10w條數據,時間合計462837毫秒,平均插入2w條數據僅需92567毫秒。
6.mysql快速批量插入
mysql社區驅動MySqlConnector提供的批量插入方式是SqlBulkCopy,基於mysql自身的文件上傳機制進行批量插入,參數為一個dataTable對象,原生的批量插入代碼如下,計時方式與sqlserver相同,同時,mysql的連接字元串里要添加";AllowLoadLocalInfile=true",即連接字元串的形式應該是"Server=
var sw = new Stopwatch();
sw.Start();
for (int j = 0; j < 5; j++)
{
using (var dbConnection = new MySqlConnection(connectionString))
{
dbConnection.Open();
MySqlBulkCopy sqlBulkCopy = new MySqlBulkCopy(dbConnection, null);
sqlBulkCopy.DestinationTableName = "NullableTable";
var propertys = typeof(NullableTable).GetProperties()
.Where(it => it.CanRead && it.GetCustomAttribute<NotMappedAttribute>() == null).ToList();
for (int i = 0; i < propertys.Count; i++)
{
var property = propertys[i];
var columnName = property.GetCustomAttribute<ColumnAttribute>()?.Name ?? property.Name;
if (property.PropertyType.GetUnderlyingType() == typeof(Guid))
{
sqlBulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, "@tmp",
$"{columnName} =unhex(@tmp)"));
}
else
{
sqlBulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, columnName));
}
}
var table = nullableTableList3.ToDataTable();
SbUtil.ReplaceDataTableColumnType<Guid, byte[]>(table, guid1 => guid1.ToByteArray());
var c = sqlBulkCopy.WriteToServer(table);
}
}
sw.Stop();
var totalTime = sw.ElapsedMilliseconds;
var avgValue = totalTime / 5;
實驗結果如下,mysql中:
採用快速批量插入10w條數據,時間合計2350毫秒,平均插入2w條數據僅需470毫秒。
採用insert into語句,迴圈插入10w條數據,時間合計414700毫秒,平均插入2w條數據需82940毫秒。
在mysql中c#的guid對應的mysql欄位類型為varbinary(16),所以table里的guid要轉換為位元組數組,否則插入資料庫後,guid的值就會變成亂碼,位元組數組傳遞到mysql服務端後利用unhex函數進行解析,即可正常保存guid類型。 將table里guid的值轉為位元組數組的方法-SbUtil.ReplaceDataTableColumnType的代碼實現如下:
/// <summary>
/// 替換dataTable里的列類型
/// </summary>
/// <param name="dt"></param>
public static void ReplaceDataTableColumnType<OldType,NewType>(DataTable dt,Func<OldType, NewType> replaceFunc)
{
var needUpdateColumnIndexList = new List<int>();
var needUpdateColumnNameList = new List<string>();
for (int i = 0; i < dt.Columns.Count; i++)
{
var column = dt.Columns[i];
if (column.DataType.GetUnderlyingType() == typeof(OldType))
{
needUpdateColumnIndexList.Add(i);
needUpdateColumnNameList.Add(column.ColumnName);
}
}
if (needUpdateColumnIndexList.Count == 0)
{
return;
}
var nameMapping = new Dictionary<string, string>();
for (int i = 0; i < needUpdateColumnIndexList.Count; i++)
{
var oldColumnName = needUpdateColumnNameList[i];
var newColumnName = Guid.NewGuid().ToString("N");
nameMapping.Add(newColumnName, oldColumnName);
dt.Columns.Add(newColumnName, typeof(byte[])).SetOrdinal(needUpdateColumnIndexList[i]);
for (int j = 0; j < dt.Rows.Count; j++)
{
var c = (dt.Rows[j][oldColumnName]);
dt.Rows[j][newColumnName] = replaceFunc((OldType)(dt.Rows[j][oldColumnName]));
}
dt.Columns.Remove(oldColumnName);
}
for (int i = 0; i < dt.Columns.Count; i++)
{
var columnName = dt.Columns[i].ColumnName;
if (nameMapping.ContainsKey(columnName))
{
dt.Columns[i].ColumnName = nameMapping[columnName];
}
}
}
7.SummerBoot對各資料庫快速批量插入的封裝
基於以上各種資料庫對於快速批量插入的原生寫法過於複雜難記,SummerBoot對其進行了封裝,在聲明式編程的理念下,封裝後僅需3步即可快速批量插入,這裡以sqlserver舉例。
7.1在StartUp.cs中添加summerBoot的服務支持
services.AddSummerBoot();
services.AddSummerBootRepository(it =>
{
it.DbConnectionType = typeof(SqlConnection);
it.ConnectionString = connectionString;
});
7.2添加倉儲介面
[AutoRepository]
public interface INullableTableRepository : IBaseRepository<NullableTable>
{
}
7.3註入倉儲介面後直接調用FastBatchInsert方法
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < 5; i++)
{
nullableTableRepository.FastBatchInsert(nullableTableList3);
}
sw.Stop();
var totalTime= sw.ElapsedMilliseconds;
var avgValue = totalTime / 5;
實驗結果如下,sql server中:
採用SummerBoot統一封裝後快速批量插入10w條數據,時間合計3926(原生快速批量寫法1858)毫秒,平均插入2w條數據僅需785(原生快速批量寫法371)毫秒。從對比可以看出,經過SummerBoot封裝後,快速批量插入所花費的時間有所增加,但是對於這麼大數據量而言,這點多消耗的時間和節省的開發量對比,不值一提。
寫在最後
SummerBoot是一款聲明式編程框架,專註於”做什麼”而不是”如何去做”,更多用法,可參考SummerBoot文檔,也可以加入QQ群:799648362反饋建議。同時各位看官,如果你覺得這篇文章還不錯的話,請幫忙一鍵三連哦(推薦+關註+github star)