SqlServer屬於商業資料庫,不可能像Mysql等資料庫一樣,去解析相關的資料庫binlog,從而實現增量數據的回放,結合應用屬性,最後確定採用離線遷移方式,從SqlServer中將表數據全部讀出,然後將數據寫入到pg中,採用此種方案的弊病就是程式端需停止寫入(應用可將部分數據緩存到本地),等待... ...
背景
公司某內部系統屬於商業產品,資料庫性能已出現明顯問題,服務經常卡死,員工經常反饋數據無法查詢或不能及時查詢,該系統所使用的資料庫為SqlServer,SqlServer資料庫屬於商業資料庫,依賴廠商的維護,且維護成本高,效率低,且存在版權等問題,考慮將該系統的資料庫,遷移至PostGresql資料庫,屬於BSD的開源資料庫,不存在版本問題,公司也有部分系統採用pg,維護成本也將大大減低。
遷移原理
SqlServer屬於商業資料庫,不可能像Mysql等資料庫一樣,去解析相關的資料庫binlog,從而實現增量數據的回放,結合應用屬性,最後確定採用離線遷移方式,從SqlServer中將表數據全部讀出,然後將數據寫入到pg中,採用此種方案的弊病就是程式端需停止寫入(應用可將部分數據緩存到本地),等待資料庫遷移完成後,程式端再遷移至PostGresql,遷移方法如下:
表結構遷移原理
表結構主要包含欄位,索引,主鍵,外鍵等信息組成,主要採用開源工具sqlserver2pg進行表結構的轉換
表結構轉換
從SqlServer中讀寫表結構的欄位信息,並對欄位類型進行轉換,轉換核心代碼如下
sub convert_type
{
my ($sqlstype, $sqlqual, $colname, $tablename, $typname, $schemaname) =
@_;
my $rettype;
if (defined $types{$sqlstype})
{
if ((defined $sqlqual and defined($unqual{$types{$sqlstype}}))
or not defined $sqlqual)
{
# This is one of the few types that have to be unqualified (binary type)
$rettype = $types{$sqlstype};
# but we might add a check constraint for binary data
if ($sqlstype =~ 'binary' and defined $sqlqual) {
print STDERR "convert_type: $sqlstype, $sqlqual, $colname\n";
my $constraint;
$constraint->{TYPE} = 'CHECK_BINARY_LENGTH';
$constraint->{TABLE} = $tablename;
$constraint->{TEXT} = "octet_length(" . format_identifier($colname) . ") <= $sqlqual";
push @{$objects->{SCHEMAS}->{$schemaname}->{TABLES}->{$tablename}
->{CONSTRAINTS}}, ($constraint);
}
}
elsif (defined $sqlqual)
{
$rettype = ($types{$sqlstype} . "($sqlqual)");
}
}
# A few special cases
elsif ($sqlstype eq 'bit' and not defined $sqlqual)
{
$rettype = "boolean";
}
elsif ($sqlstype eq 'ntext' and not defined $sqlqual)
{
$rettype = "text";
}
外鍵,索引,唯一鍵轉換
主要是從sqlserver導出的表結構數據中,對相關的索引,外鍵等語句進行轉換,轉換核心代碼如下
while (my ($schema, $refschema) = each %{$objects->{SCHEMAS}})
{
# Indexes
# They don't have a schema qualifier. But their table has, and they are in the same schema as their table
foreach my $table (sort keys %{$refschema->{TABLES}})
{
foreach
my $index (
sort keys %{$refschema->{TABLES}->{$table}->{INDEXES}})
{
my $index_created = 0;
my $idxref =
$refschema->{TABLES}->{$table}->{INDEXES}->{$index};
my $idxdef .= "";
if ($idxref->{DISABLE})
{
$idxdef .= "-- ";
}
$idxdef .= "CREATE";
if ($idxref->{UNIQUE})
{
$idxdef .= " UNIQUE";
}
if (defined $idxref->{COLS})
{
$idxdef .= " INDEX " . format_identifier($index) . " ON " . format_identifier($schema) . '.' . format_identifier($table) . " ("
. join(",", map{format_identifier_cols_index($_)} @{$idxref->{COLS}}) . ")";
if (defined $idxref->{INCLUDE}) {
$idxdef .= " INCLUDE (" .
join(",", map{format_identifier_cols_index($_)} @{$idxref->{INCLUDE}})
. ")";
}
if (not defined $idxref->{WHERE} and not defined $idxref->{DISABLE}) {
$idxdef .= ";\n";
print AFTER $idxdef;
# the possible comment would go to after file
$index_created = 1;
}
數據類型轉換原理
數據類型轉換
函數類型轉換
存儲過程
視圖部分需手動改造
遷移方法
表結構轉換
./sqlserver2pgsql.pl -b before.sql -a after.sql -u unsure.sql -k /opt/data_migration/data-integration/ -sd test -sh 127.0.0.1 -sp 1433 -su user_only -sw 122132321 -pd test -ph 192.168.1.1 -pp 15432 -pu postgres -pw 12345678 -pi 8 -po 8 -f script.sql
表結構導入pg
/usr/local/pgsql1201/bin/psql -h 127.0.0.1 -U postgres -p 15432 <before.sql
數據遷移
cd /opt/data_migration/data-integration/
sh kitchen.sh -file=migration.kjb -level=detailed >migration.log
數據比對
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author:jacker_zhou
@create_time: 2017-04-07
@overview: mssql pg
"""
__author__ = 'jacker_zhou'
__version__ = '0.1'
import psycopg2,pymssql
import types
import time
TableSpace='public.'
class CompareDataBase():
def __init__(self):
self.pgcnotallow=psycopg2.connect(database="test",host="127.0.0.1",port=15432,user="postgres",password="test")
self.mscnotallow=pymssql.connect(host="192.168.1.1",user="test",password="test",database="test")
def commit(self):
self.pgconn.commit()
def close(self):
self.pgconn.close()
self.msconn.close()
def rollback(self):
self.pgconn.rollback()
def exesyncdb(self):
mscursor=self.msconn.cursor()
sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ")
mscursor.execute(sql)
table=mscursor.fetchall()
print ("total table %d"%len(table))
if(table is None or len(table)<=0):
return
else:
for row in table:
self.executeTable(row[1],row[0])
print ("%s is execute success"%row[1])
def comparedb(self):
mscursor=self.msconn.cursor()
sql=("SELECT COUNT(COLUMNNAME) AS CT,TABLENAME FROM (SELECT A.NAME AS COLUMNNAME,B.NAME AS TABLENAME FROM SYSCOLUMNS A RIGHT JOIN SYSOBJECTS B ON A.ID=B.ID WHERE B.TYPE='U' AND B.NAME NOT IN ('dtproperties','0626')) A GROUP BY TABLENAME ")
mscursor.execute(sql)
table=mscursor.fetchall()
print ("total table %d"%len(table))
if(table is None or len(table)<=0):
return
else:
for row in table:
self.compareTable(row[1])
def executeTable(self,tablename,count):
#print tablename
sql1="SELECT * FROM %s"%tablename
print (sql1)
mscursor=self.msconn.cursor()
mscursor.execute(sql1)
table=mscursor.fetchall()
if(table is None or len(table)<=0):
mscursor.close()
return
lst_result=self.initColumn(table)
#print "column"
mscursor.close()
print ("execute sync %s data to postgresql"%tablename)
sql2=self.initPgSql(tablename,count)
pgcursor=self.pgconn.cursor()
pgcursor.executemany(sql2,lst_result)
pgcursor.close()
def compareTable(self,tablename):
#print tablename
sql1="SELECT count(*) FROM %s"%tablename
mscursor=self.msconn.cursor()
mscursor.execute(sql1)
ms_res=mscursor.fetchall()
mscursor.close()
pgcursor=self.pgconn.cursor()
pgcursor.execute(sql1)
pg_res=pgcursor.fetchall()
pgcursor.close()
res =""
if ms_res[0][0] == pg_res[0][0]:
res ="ok"
else:
res = "fail"
print ("execute compare table %s data postgresql: %s mssql:%s result: %s"%(tablename,pg_res[0][0],ms_res[0][0],res))
if __name__=="__main__":
sdb= CompareDataBase()
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print ("task start time %s"%start_time)
try:
sdb.comparedb()
except Exception as e:
print (e)
sdb.rollback()
else:
sdb.commit()
sdb.close()
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print ("task end time %s"%end_time)
print ("ok........" )
參考
https://github.com/dalibo/sqlserver2pgsql
本文來自博客園,作者:古道輕風,轉載請註明原文鏈接:https://www.cnblogs.com/88223100/p/Database-SqlServer-Migration-PostgreSql-Practice.html