一、前言 生命不息,折騰不止。近期公司有數據遷移的計劃,從Sqlserver遷移到mysql,雖說網上有很多數據遷移方案,但閑著也是閑著,就自己整一個,權當做是練練手了 二、解決思路 整個遷移過程類似於ETL,將數據從來源端經過抽取(extract)、轉換(transform)、載入(load)至目 ...
一、前言
生命不息,折騰不止。近期公司有數據遷移的計劃,從Sqlserver遷移到mysql,雖說網上有很多數據遷移方案,但閑著也是閑著,就自己整一個,權當做是練練手了
二、解決思路
整個遷移過程類似於ETL,將數據從來源端經過抽取(extract)、轉換(transform)、載入(load)至目的端。讀取並轉換sqlserver庫數據,將數據解析為csv文件,載入文件到mysql。流程如下:
- 抽取、轉換
此過程主要是處理源資料庫與目標資料庫表欄位的映射關係,為了保證程式的通用性,通過配置文件映射欄位關係,解析配置文件並生成資料庫腳本 - 載入
數據遷移的時候最好不要用INSERT語句插入批量插入,這樣數據量稍稍大一點就很慢。sqlserver可通過SqlBulkCopy將DataTable對象快速插入到資料庫,然後mysql並沒有這東西,查閱資料後發現mysql可通過MySqlBulkLoader將csv文件快速導入到資料庫。經測試遷移10K條數據MySqlBulkLoader可在1S內處理完,速度還是相當不錯的
三、實現
- 配置文件
db_caption.xml(資料庫),主要用來存儲表描述文件名,若待遷移的表不存在外鍵關係即遷移時不用考慮先後順序,此配置文件可以不要。其中maxClients參數指的是非同步遷移時,最大併發數。
<?xml version="1.0" encoding="utf-8" ?> <root> <maxClients value="3"></maxClients> <tables> <table filename="t_drawtemplate.xml" caption="抽獎模板"></table> <table filename="t_drawprize.xml" caption="抽獎獎品"></table> <table filename="t_drawrecord.xml" caption="抽獎記錄"></table> <table filename="t_drawwinner.xml" caption="中獎記錄"></table> </tables> </root>
t_table.xml(表),主要用來描述待遷移表信息及欄位描述
<?xml version="1.0" encoding="utf-8" ?> <root> <![CDATA[抽獎記錄]]> <!--是否分頁,預設不分頁就好啦,false_不分頁--> <isPaging value="true"></isPaging> <pageSize value="10000"></pageSize> <!--mssql資料庫表主鍵--> <primaryKey value="DrawRecordId"></primaryKey> <!--mssql資料庫表名--> <msTable value="DrawRecord"></msTable> <!--mysql資料庫表名--> <myTable value="t_drawrecord"></myTable> <!--篩選條件,無特殊情況為空即可--> <filter value="1=1"></filter> <!--欄位映射--> <fields> <field msName ="DrawRecordId" myName="id"></field> <field msName ="FK_MemberId" myName="user_id"></field> <field msName ="Remark" myName="remark"></field> <field msName ="DataStatus" myName="data_status"></field> <field msName ="DrawTime" myName="drawTime"></field> <!--需要調整欄位示例--> <field msName ="CASE WHEN DrawWinnerId >0 THEN DrawWinnerId END" myName="drawwinner_id"></field> </fields> <!--遷移完成後,數據修複腳本,主要用來修正日期類型為0000-00-00 00:00:00問題--> <fixSql></fixSql> </root>
- 創建xml文件映射對象並重寫ToString方法,將對象解析為sql
db_caption.xml映射對象
1 /// <summary> 2 /// 資料庫描述類(db_caption) 3 /// </summary> 4 internal class DBCaptionModel 5 { 6 public DBCaptionModel() 7 { 8 this.Tables = new List<TableModel>(); 9 } 10 11 /// <summary> 12 /// 最大連接數 13 /// </summary> 14 public int MaxClients { get; set; } 15 16 /// <summary> 17 /// 表集合 18 /// </summary> 19 public IList<TableModel> Tables { get; private set; } 20 } 21 22 internal class TableModel 23 { 24 /// <summary> 25 /// 表xml文件名 26 /// </summary> 27 public string FileName { get; set; } 28 29 /// <summary> 30 /// 描述 31 /// </summary> 32 public string Caption { get; set; } 33 34 /// <summary> 35 /// 是否已同步 36 /// </summary> 37 public bool IsSync { get; set; } 38 }
t_table.xml映射對象
1 /// <summary> 2 /// 表描述類 3 /// </summary> 4 internal class TableCaptionModel 5 { 6 public TableCaptionModel() 7 { 8 this.Fields = new List<FieldModel>(); 9 } 10 11 /// <summary> 12 /// 是否分頁 13 /// </summary> 14 public bool IsPaging { get; set; } 15 16 /// <summary> 17 /// 分頁大小 18 /// </summary> 19 public int PageSize { get; set; } 20 21 /// <summary> 22 /// 源數據表表名 23 /// </summary> 24 public string SourceTableName { get; set; } 25 26 /// <summary> 27 /// 目標數據表表名 28 /// </summary> 29 public string TargetTableName { get; set; } 30 31 /// <summary> 32 /// 源數據表主鍵 33 /// </summary> 34 public string PrimaryKey { get; set; } 35 36 /// <summary> 37 /// 過濾條件 38 /// </summary> 39 public string Filter { get; set; } 40 41 /// <summary> 42 /// 欄位集合 43 /// </summary> 44 public List<FieldModel> Fields { get; set; } 45 46 /// <summary> 47 /// 數據遷移完成後,數據修複腳本 48 /// </summary> 49 public string FixSql { get; set; } 50 51 /// <summary> 52 /// ToString 53 /// </summary> 54 /// <returns>sql</returns> 55 public override string ToString() 56 { 57 string sql = GetBaseSql(); 58 string filter = GetFilterSql(); 59 if (!string.IsNullOrWhiteSpace(filter)) 60 { 61 sql += " WHERE " + filter; 62 } 63 64 sql += " ORDER BY " + this.PrimaryKey; 65 return sql; 66 } 67 68 /// <summary> 69 /// 獲取基礎查詢Sql 70 /// </summary> 71 /// <![CDATA[SELECT SourceField AS TargetField,...... FROM table]]> 72 /// <returns></returns> 73 private string GetBaseSql() 74 { 75 StringBuilder sb = new StringBuilder("SELECT"); 76 77 foreach (var item in this.Fields) 78 { 79 sb.AppendFormat(" {0},", item.ToString()); 80 } 81 82 sb = sb.Remove(sb.Length - 1, 1); 83 84 sb.Append(" FROM "); 85 sb.Append(this.SourceTableName); 86 return sb.ToString(); 87 } 88 89 /// <summary> 90 /// 獲取sql查詢條件 91 /// </summary> 92 /// <![CDATA[filter || PrimaryKey NOT IN (SELECT PrimaryKey FORM table WHERE filter)]]> 93 /// <returns></returns> 94 private string GetFilterSql() 95 { 96 if (!this.IsPaging) 97 { 98 return this.Filter; 99 } 100 101 StringBuilder sb = new StringBuilder(); 102 sb.AppendFormat("SELECT ROW_NUMBER() OVER(ORDER BY {0}) RowNo,{0} FROM {1}", this.PrimaryKey, this.SourceTableName); 103 104 if (!string.IsNullOrWhiteSpace(this.Filter)) 105 { 106 sb.Append(" WHERE " + this.Filter); 107 } 108 109 sb.Insert(0, string.Format("SELECT {0} FROM (", this.PrimaryKey)); 110 sb.AppendFormat(") T WHERE RowNo BETWEEN @StartIndex AND @EndIndex"); 111 112 return string.Format("{0} IN ({1})", this.PrimaryKey, sb.ToString()); 113 } 114 } 115 116 /// <summary> 117 /// 欄位類 118 /// </summary> 119 internal class FieldModel 120 { 121 /// <summary> 122 /// 源欄位名 123 /// </summary> 124 public string SourceFieldName { get; set; } 125 126 /// <summary> 127 /// 目標欄位名 128 /// </summary> 129 public string TargetFieldName { get; set; } 130 131 /// <summary> 132 /// ToString 133 /// </summary> 134 /// <returns>'SourceFieldName' AS 'TargetFieldName'" </returns> 135 public override string ToString() 136 { 137 if (this.SourceFieldName.IndexOfAny(new char[] { ' ', '(' }) < 0) 138 { 139 //非表達式 140 return string.Format("[{0}] AS '{1}'", SourceFieldName, TargetFieldName); 141 } 142 else 143 { 144 return string.Format("{0} AS '{1}'", SourceFieldName, TargetFieldName); 145 } 146 } 147 }
- 解析XML文件
XML解析可通過XmlSerializer直接反序列化為對象,此處只是為了溫習XML解析方式,故採用此方法
1 /// <summary> 2 /// 載入資料庫描述xml 3 /// </summary> 4 /// <returns></returns> 5 private static DBCaptionModel LoadDBCaption() 6 { 7 DBCaptionModel model = new DBCaptionModel(); 8 9 XmlDocument doc = new XmlDocument(); 10 doc.Load(CONN_XML_PATH + "db_caption.xml"); 11 12 XmlNode root = doc.SelectSingleNode("root"); 13 //獲取最大連接數 14 model.MaxClients = root.SelectSingleNode("maxClients").GetAttribute<int>("value"); 15 16 //獲取表描述 17 XmlNodeList tables = root.SelectSingleNode("tables").SelectNodes("table"); 18 foreach (XmlNode node in tables) 19 { 20 model.Tables.Add(new TableModel 21 { 22 FileName = node.GetAttribute("filename"), 23 Caption = node.GetAttribute("caption") 24 }); 25 } 26 27 return model; 28 } 29 30 /// <summary> 31 /// 載入表描述xml 32 /// </summary> 33 /// <param name="fileName">表描敘xml文件名</param> 34 /// <returns></returns> 35 private static TableCaptionModel LoadTableCaption(string fileName) 36 { 37 XmlDocument doc = new XmlDocument(); 38 doc.Load(CONN_XML_PATH + fileName); 39 40 TableCaptionModel model = new TableCaptionModel(); 41 42 XmlNode root = doc.SelectSingleNode("root"); 43 model.IsPaging = root.SelectSingleNode("isPaging").GetAttribute<bool>("value"); 44 if (model.IsPaging) 45 { 46 model.PageSize = root.SelectSingleNode("pageSize").GetAttribute<int>("value"); 47 } 48 model.SourceTableName = root.SelectSingleNode("msTable").GetAttribute("value"); 49 model.TargetTableName = root.SelectSingleNode("myTable").GetAttribute("value"); 50 model.PrimaryKey = root.SelectSingleNode("primaryKey").GetAttribute("value"); 51 model.FixSql = root.SelectSingleNode("fixSql").GetAttribute("value"); 52 53 XmlNodeList fields = root.SelectSingleNode("fields").SelectNodes("field"); 54 55 foreach (XmlNode field in fields) 56 { 57 model.Fields.Add(new FieldModel 58 { 59 SourceFieldName = field.GetAttribute("msName"), 60 TargetFieldName = field.GetAttribute("myName") 61 }); 62 } 63 64 return model; 65 }
Node.GetAttribute擴展方法,簡化讀取Node屬性代碼
1 public static class XmlNodeExtension 2 { 3 /// <summary> 4 /// 獲取節點屬性 5 /// </summary> 6 /// <param name="node">當前節點</param> 7 /// <param name="attrName">屬性名稱</param> 8 /// <returns></returns> 9 public static string GetAttribute(this XmlNode node, string attrName) 10 { 11 if (node == null) 12 { 13 return null; 14 } 15 return ((XmlElement)node).GetAttribute(attrName); 16 } 17 18 /// <summary> 19 /// 獲取節點屬性 20 /// </summary> 21 /// <param name="node">當前節點</param> 22 /// <param name="attrName">屬性名稱</param> 23 /// <returns></returns> 24 public static T GetAttribute<T>(this XmlNode node, string attrName) where T : struct 25 { 26 if (node == null) 27 { 28 return default(T); 29 } 30 string value = GetAttribute(node, attrName); 31 return (T)Convert.ChangeType(value, typeof(T)); 32 } 33 }
- 實現數據遷移幫助方法
FileHelper,將DataTable解析為CSV文件
1 public class FileHelper 2 { 3 /// <summary> 4 /// 將DataTable寫入CSV 5 /// </summary> 6 /// <param name="dataTable"></param> 7 /// <param name="fileFullPath"></param> 8 public static void WriteDataTableToCSVFile(DataTable dataTable, string fileFullPath) 9 { 10 WriteDataTableToCSVFile(dataTable, fileFullPath, Encoding.UTF8); 11 } 12 13 /// <summary> 14 /// 將DataTable寫入CSV 15 /// </summary> 16 /// <param name="dataTable"></param> 17 /// <param name="fileFullPath"></param> 18 /// <param name="codeType"></param> 19 public static void WriteDataTableToCSVFile(DataTable dataTable, string fileFullPath, Encoding codeType) 20 { 21 using (Stream stream = new FileStream(fileFullPath, FileMode.Create, FileAccess.Write)) 22 using (StreamWriter swriter = new StreamWriter(stream, codeType)) 23 { 24 try 25 { 26 int num = dataTable.Columns.Count; 27 string[] arr = new string[num]; 28 29 //寫標題 30 for (int i = 0; i < num; i++) 31 { 32 arr[i] = dataTable.Columns[i].ColumnName; 33 } 34 WriteArrayToCSVFile(swriter, arr); 35 36 //寫數據 37 foreach (DataRow item in dataTable.Rows) 38 { 39 for (int i = 0; i < num; i++) 40 { 41 arr[i] = Convert.IsDBNull(item[i]) ? "" : item[i].ToString(); 42 } 43 WriteArrayToCSVFile(swriter, arr); 44 } 45 } 46 catch (Exception ex) 47 { 48 throw new IOException(ex.Message); 49 } 50 } 51 } 52 53 /// <summary> 54 /// 將數據寫入CSV文件 55 /// </summary> 56 /// <param name="swriter"></param> 57 /// <param name="arr"></param> 58 private static void WriteArrayToCSVFile(StreamWriter swriter, string[] arr) 59 { 60 for (int i = 0; i < arr.Length; i++) 61 { 62 if (!string.IsNullOrWhiteSpace(arr[i])) 63 { 64 swriter.Write(arr[i]); 65 } 66 67 if (i < arr.Length - 1) 68 { 69 swriter.Write("|||"); 70 } 71 } 72 swriter.Write(swriter.NewLine); 73 } 74 }
MysqlHelper,導入VCS文件到Mysql資料庫
1 public class MySqlDBHelper 2 { 3 private static readonly string tmpBasePath = AppDomain.CurrentDomain.BaseDirectory; 4 private static readonly string tmpCSVFilePattern = "Temp\\{0}.csv"; //0表示文件名稱 5 6 /// <summary> 7 /// DB連接字元串 8 /// </summary> 9 public static string DBConnectionString 10 { 11 get 12 {