代碼片段: 文末附鏈接。 using DataSync.Core; using Furion.Logging.Extensions; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using System.Da ...
代碼片段: 文末附鏈接。
using DataSync.Core; using Furion.Logging.Extensions; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using System.Data; namespace DataSync.Application.DataSync.Services { public class DataSyncServices : IDataSyncData, ITransient { private readonly object lockObj = new object(); /// <summary> /// 客戶端向服務端同步 /// </summary> /// <param name="clientConn"></param> /// <param name="serviceConn">目標資料庫</param> /// <returns></returns> public string SyncDataForClient(string clientConn, string serviceConn) { return SyncData(clientConn, serviceConn); } /// <summary> /// 服務端向客戶端同步 /// </summary> /// <param name="serviceConn"></param> /// <param name="clientConn"></param> /// <returns></returns> public string SyncDataForServer(string serviceConn, string clientConn) { return SyncData(serviceConn, clientConn); } /// <summary> /// 數據同步 /// </summary> private string SyncData(string sourceConn, string targetConn) { try { //源資料庫 數據源鏈接 SqlSugarScope sourceDb = new SqlSugarScope(new ConnectionConfig() { DbType = SqlSugar.DbType.SqlServer, ConnectionString = sourceConn, IsAutoCloseConnection = true, AopEvents = new AopEvents { OnLogExecuting = (sql, ps) => { #if DEBUG Log.Information($"語句:{sql},參數:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}"); #endif } } }); //目標資料庫 數據源鏈接 SqlSugarScope targetDb = new SqlSugarScope(new ConnectionConfig() { DbType = SqlSugar.DbType.SqlServer, ConnectionString = targetConn, IsAutoCloseConnection = true, AopEvents = new AopEvents { OnLogExecuting = (sql, ps) => { #if DEBUG Log.Information($"語句:{sql},參數:{(ps.Any() ? "[" : string.Empty) + string.Join("|", ps.Select(m => $"{m.ParameterName}={m.Value}")) + (ps.Any() ? "]" : string.Empty)}"); #endif } } }); //使用sqlsugar 初始化目標資料庫 targetDb.DbMaintenance.CreateDatabase(); var tableNames = sourceDb.DbMaintenance.GetTableInfoList(false).Select(t => t.Name).ToList(); // 調用函數獲取所有表名 var syncBlackTable = App.GetConfig<List<string>>("SyncBlackTable"); tableNames = tableNames.Except(syncBlackTable).ToList(); //多線程 Parallel.ForEach(tableNames, tableName => { lock (lockObj) { // 根據表名從源資料庫中獲取數據並存入 DataTable var targetdataTable = DataTableHelper.FetchDataFromTable(tableName, sourceDb); //判斷數據表在目標庫是否存在 var flagTargetTab = targetDb.DbMaintenance.IsAnyTable(tableName); if (!flagTargetTab) { // 創建表的SQL語句 var createTableSql = $"CREATE TABLE {tableName} ("; if (targetdataTable != null && targetdataTable.Rows.Count > 0) { //目標資料庫寫入-先移除數據同步標識 DataBaseInfoService.DatatableRemoveCloumns(targetdataTable); // 遍歷DataTable的列 foreach (DataColumn column in targetdataTable.Columns) { string columnName = column.ColumnName; string dataType = DataBaseInfoService.GetSqlDataType(column.DataType); createTableSql += $"{columnName} {dataType}, "; } createTableSql = createTableSql.TrimEnd(',', ' ') + ")"; // 創建表 targetDb.Ado.ExecuteCommand(createTableSql); ("TargetTable : " + tableName + ",創建成功").LogInformation(); // } //} } } //AppSys if (tableName.ToUpper().Equals("APPSYS")) { AppSysDataSync.SyncData(tableName, sourceDb, targetDb); } var selectCountSql = $"SELECT COUNT(*) FROM {tableName} "; var sourceCount = sourceDb.Ado.GetInt(selectCountSql); var middleCount = targetDb.Ado.GetInt(selectCountSql); //增量 if (sourceCount > middleCount) { // commandTarget.Connection = connTarget; // commandTarget.CommandType = CommandType.Text; //查詢數據 var selectTableSql = $"SELECT * FROM {tableName}"; //創建datatable(源數據) var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) { //新增列 MD5 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); } //創建datatable(目標表數據) var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql); if (targetDataTable != null && targetDataTable.Rows.Count > 0) { //新增列 MD5 DataBaseInfoService.DataTableAddColumsMd5(targetDataTable); } // 計算差集 var tempTable = new DataTable(); var tempExceptTable = (from source in sourceDataTable.AsEnumerable() where !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains( source.Field<string>("MD5")) select source); if (tempExceptTable != null && tempExceptTable.Count() > 0) { tempTable = tempExceptTable.CopyToDataTable(); } //批量插入數據 if (tempTable != null && tempTable.Rows.Count > 0) { //目標資料庫寫入-先移除數據同步標識,MD5標識 DataBaseInfoService.DatatableRemoveCloumns(tempTable); var connTarget = new SqlConnection(targetConn); DataBaseInfoService.DataBulkCopy(connTarget, tableName, tempTable); // TargetDataScope.Db.Fastest<DataTable>().AS(tableName).BulkCopy(tempTable); } } //刪除 else if (sourceCount < middleCount) { //查詢數據 var selectTableSql = $"SELECT * FROM {tableName}"; //創建datatable(源數據) var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) { //新增列 MD5 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); } //創建datatable var taergetTable = targetDb.Ado.GetDataTable(selectTableSql); if (taergetTable != null && taergetTable.Rows.Count > 0) { //新增列 MD5 DataBaseInfoService.DataTableAddColumsMd5(taergetTable); } // 計算差集 var tempTable = new DataTable(); var tempExceptTable = (from target in taergetTable.AsEnumerable() where !(from source in sourceDataTable.AsEnumerable() select source.Field<string>("MD5")).Contains( target.Field<string>("MD5")) select target); if (tempExceptTable != null && tempExceptTable.Count() > 0) { tempTable = tempExceptTable.CopyToDataTable(); } if (tempTable != null && tempTable.Rows.Count > 0) { //獲取主鍵欄位 var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName); //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget); //獲取自增列 var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName); if (PrimaryKeyName != null && PrimaryKeyName.Count > 0) { foreach (DataRow row in tempTable.Rows) { var deleteDataSql = DataTableHelper.ConstructDeleteSql(tableName, PrimaryKeyName, Identities, row); //$"DELETE FROM {tableName} WHERE {PrimaryKeyName} ='{row[PrimaryKeyName[0]]}'"; //目標數據數據操作對象 targetDb.Ado.ExecuteCommand(deleteDataSql); } } } } //更新 else { //判斷是否存在需要更新的記錄 //和目標表比較取差集 //查詢數據 var selectTableSql = $"SELECT * FROM {tableName}"; //創建datatable(源數據) var sourceDataTable = sourceDb.Ado.GetDataTable(selectTableSql); if (sourceDataTable != null && sourceDataTable.Rows.Count > 0) { //新增列 MD5 DataBaseInfoService.DataTableAddColumsMd5(sourceDataTable); } //創建datatable(目標表數據) var targetDataTable = targetDb.Ado.GetDataTable(selectTableSql); if (targetDataTable != null && targetDataTable.Rows.Count > 0) { //新增列 MD5 DataBaseInfoService.DataTableAddColumsMd5(targetDataTable); } // 計算差集 var tempTable = new DataTable(); var tempExceptTable = (from source in sourceDataTable.AsEnumerable() where !(from target in targetDataTable.AsEnumerable() select target.Field<string>("MD5")).Contains( source.Field<string>("MD5")) select source); if (tempExceptTable != null && tempExceptTable.Count() > 0) { tempTable = tempExceptTable.CopyToDataTable(); } if (tempTable != null && tempTable.Rows.Count > 0) { //刪除標識列和MD5列 DataBaseInfoService.DatatableRemoveCloumns(tempTable); //獲取目標表主鍵欄位 var PrimaryKeyName = targetDb.DbMaintenance.GetPrimaries(tableName); //獲取自增列 var Identities = targetDb.DbMaintenance.GetIsIdentities(tableName); //DataTableHelper.GetPrimaryKeyFieldName(tableName, connTarget); foreach (DataRow dataRow in tempTable.Rows) { var updateDataSql = DataTableHelper.ConstructUpdateSql(tableName, PrimaryKeyName, Identities, dataRow); targetDb.Ado.ExecuteCommand(updateDataSql); } } } } }); } catch (Exception ex) { ("Error occurred while connecting to database or fetching data from tables.\n" + ex.Message).LogError(); return "同步失敗。詳見錯誤日誌!"; } return "同步完成!"; } } }
Gitee: https://gitee.com/ltf_free/sync-data.git