python--同一mysql資料庫下批量遷移數據

来源:http://www.cnblogs.com/TeyGao/archive/2016/10/11/5948934.html
-Advertisement-
Play Games

最近接手些mysql資料庫維護,發現mysql在批量操作方面就是個渣渣啊,比起MS SQL SERVER簡直就是“不可同日而語”。 咨詢了下MySQL的高手,對於數據遷移這種問題,一種處理方式就是直接“一步到位” ,一次性將所有數據查詢插入到另外一個表,然後再刪除原表數據;另外一種處理方式就是使用p ...


最近接手些mysql資料庫維護,發現mysql在批量操作方面就是個渣渣啊,比起MS SQL SERVER簡直就是“不可同日而語”。

咨詢了下MySQL的高手,對於數據遷移這種問題,一種處理方式就是直接“一步到位” ,一次性將所有數據查詢插入到另外一個表,然後再刪除原表數據;另外一種處理方式就是使用pt--archiver工具來歸檔。

 

然並卵,“一步到位”法太刺激,pt--archiver工具用不順手,由於目前大部分的表都以自增id為主鍵,以此為此為前提自己寫個小腳本,厚臉拿出來供各位參考:

# coding: utf-8
import MySQLdb
import time

# common config
EXEC_DETAIL_FILE = 'exec_detail.txt'
DATETIME_FORMAT = '%Y-%m-%d %X'
Default_MySQL_Host = '192.168.166.169'
Default_MySQL_Port = 3358
Default_MySQL_User = "mysql_admin"
Default_MySQL_Password = 'mysql@Admin@Pwd'
Default_MySQL_Charset = "utf8"
Default_MySQL_Connect_TimeOut = 120

# Transfer Config
Transfer_Database_Name = "db001"
Transfer_Source_Table_Name = "tb2001"
Transfer_Target_Table_Name = "tb2001_his"
Transfer_Condition = "dt <'2016-10-01'"
Transfer_Rows_Per_Batch = 10000
Sleep_Second_Per_Batch = 0.5


def get_time_string(dt_time):
    """
    獲取指定格式的時間字元串
    :param dt_time: 要轉換成字元串的時間
    :return: 返回指定格式的字元串
    """
    global DATETIME_FORMAT
    return time.strftime(DATETIME_FORMAT, dt_time)


def get_time_string(dt_time):
    return time.strftime("%Y-%m-%d %X", dt_time)


def highlight(s):
    return "%s[30;2m%s%s[1m" % (chr(27), s, chr(27))


def print_warning_message(message):
    """
    以紅色字體顯示消息內容
    :param message: 消息內容
    :return: 無返回值
    """
    message = str(message)
    print(highlight('') + "%s[31;1m%s%s[0m" % (chr(27), message, chr(27)))


def print_info_message(message):
    """
    以綠色字體輸出提醒性的消息
    :param message: 消息內容
    :return: 無返回值
    """
    message = str(message)
    print(highlight('') + "%s[32;2m%s%s[0m" % (chr(27), message, chr(27)))


def write_file(file_path, message):
    """
    將傳入的message追加寫入到file_path指定的文件中
    請先創建文件所在的目錄
    :param file_path: 要寫入的文件路徑
    :param message: 要寫入的信息
    :return:
    """
    file_handle = open(file_path, 'a')
    file_handle.writelines(message)
    # 追加一個換行以方便瀏覽
    file_handle.writelines(chr(13))
    file_handle.close()


def get_mysql_connection():
    """
    根據預設配置返回資料庫連接
    :return: 資料庫連接
    """
    conn = MySQLdb.connect(
            host=Default_MySQL_Host,
            port=Default_MySQL_Port,
            user=Default_MySQL_User,
            passwd=Default_MySQL_Password,
            connect_timeout=Default_MySQL_Connect_TimeOut,
            charset=Default_MySQL_Charset,
            db=Transfer_Database_Name
    )
    return conn


def mysql_exec(sql_script, sql_param=None):
    """
    執行傳入的腳本,返回影響行數
    :param sql_script:
    :param sql_param:
    :return: 腳本最後一條語句執行影響行數
    """
    try:
        conn = get_mysql_connection()
        print_info_message("在伺服器{0}上執行腳本:{1}".format(
                conn.get_host_info(), sql_script))
        cursor = conn.cursor()
        if sql_param is not None:
            cursor.execute(sql_script, sql_param)
        else:
            cursor.execute(sql_script)
        affect_rows = cursor.rowcount
        conn.commit()
        cursor.close()
        conn.close()
        return affect_rows
    except Exception as ex:
        cursor.close()
        conn.rollback()
        raise Exception(str(ex))


def mysql_exec_many(sql_script_list):
    """
    執行傳入的腳本,返回影響行數
    :param sql_script_list: 要執行的腳本List,List中每個元素為sql_script, sql_param對
    :return: 返回執行每個腳本影響的行數列表
    """
    try:
        conn = get_mysql_connection()
        exec_result_list = []
        for sql_script, sql_param in sql_script_list:
            print_info_message("在伺服器{0}上執行腳本:{1}".format(
                    conn.get_host_info(), sql_script))
            cursor = conn.cursor()
            if sql_param is not None:
                cursor.execute(sql_script, sql_param)
            else:
                cursor.execute(sql_script)
            affect_rows = cursor.rowcount
            exec_result_list.append("影響行數:{0}".format(affect_rows))
        conn.commit()
        cursor.close()
        conn.close()
        return exec_result_list

    except Exception as ex:
        cursor.close()
        conn.rollback()
        raise Exception(str(ex))


def mysql_query(sql_script, sql_param=None):
    """
    執行傳入的SQL腳本,並返回查詢結果
    :param sql_script:
    :param sql_param:
    :return: 返回SQL查詢結果
    """
    try:
        conn = get_mysql_connection()
        print_info_message("在伺服器{0}上執行腳本:{1}".format(
                conn.get_host_info(), sql_script))
        cursor = conn.cursor()
        if sql_param != '':
            cursor.execute(sql_script, sql_param)
        else:
            cursor.execute(sql_script)
        exec_result = cursor.fetchall()
        cursor.close()
        conn.close()
        return exec_result
    except Exception as ex:
        cursor.close()
        conn.close()
        raise Exception(str(ex))


def get_column_info_list(table_name):
    sql_script = """
DESC {0}
""".format(table_name)
    column_info_list = []
    query_result = mysql_query(sql_script=sql_script, sql_param=None)
    for row in query_result:
        column_name = row[0]
        column_key = row[3]
        column_info = column_name, column_key
        column_info_list.append(column_info)
    return column_info_list


def get_id_range():
    """
    按照傳入的表獲取要刪除數據最大ID、最小ID、刪除總行數
    :return: 返回要刪除數據最大ID、最小ID、刪除總行數
    """
    global Transfer_Condition
    global Transfer_Rows_Per_Batch
    sql_script = """
SELECT
MAX(ID) AS MAX_ID,
MIN(ID) AS MIN_ID,
COUNT(1) AS Total_Count
FROM {0}
WHERE {1};
""".format(Transfer_Source_Table_Name, Transfer_Condition)
    query_result = mysql_query(sql_script=sql_script, sql_param=None)
    max_id, min_id, total_count = query_result[0]
    # 此處有一坑,可能出現total_count不為0 但是max_id 和min_id 為None的情況
    # 因此判斷max_id和min_id 是否為NULL
    if (max_id is None) or (min_id is None):
        max_id, min_id, total_count = 0, 0, 0
    return max_id, min_id, total_count


def check_env():
    try:
        source_columns_info_list = get_column_info_list(Transfer_Source_Table_Name)
        target_columns_info_list = get_column_info_list(Transfer_Target_Table_Name)
        if len(source_columns_info_list) != len(target_columns_info_list):
            print_info("源表和目標表的列數不對,不滿足遷移條件")
            return False
        column_count = len(source_columns_info_list)
        id_flag = False
        for column_id in range(column_count):
            source_column_name, source_column_key = source_columns_info_list[column_id]
            target_column_name, target_column_key = target_columns_info_list[column_id]
            if source_column_name != target_column_name:
                print_info("源表和目標表的列名不匹配,不滿足遷移條件")
                return False
            if source_column_name.lower() == 'id' \
                    and source_column_key.lower() == 'pri' \
                    and target_column_name.lower() == 'id' \
                    and target_column_key.lower() == 'pri':
                id_flag = True
        if not id_flag:
            print_info("未找到為主鍵的id列,不滿足遷移條件")
            return False
        return True
    except Exception as ex:
        print_info("執行出現異常,異常為{0}".format(ex.message))
        return False


def main():
    flag = check_env()
    if not flag:
        return
    loop_trans_data()


def trans_data(current_min_id, current_max_id):
    global Transfer_Source_Table_Name
    global Transfer_Target_Table_Name
    global Transfer_Condition
    global Transfer_Rows_Per_Batch
    print_info_message("*" * 70)
    copy_data_script = """
INSERT INTO {0}
SELECT * FROM {1}
WHERE ID>={2}
AND ID<{3}
AND {4} ;
""".format(Transfer_Target_Table_Name, Transfer_Source_Table_Name, current_min_id, current_max_id, Transfer_Condition)
    delete_data_script = """
DELETE FROM {0}
WHERE ID IN (
SELECT ID
FROM {1}
WHERE ID>={2}
AND ID<{3})
AND ID>={4}
AND ID<{5};
""".format(Transfer_Source_Table_Name, Transfer_Target_Table_Name, current_min_id, current_max_id, current_min_id,
           current_max_id)
    sql_script_list = []
    tem_sql_script = copy_data_script, None
    sql_script_list.append(tem_sql_script)
    tem_sql_script = delete_data_script, None
    sql_script_list.append(tem_sql_script)
    exec_result_list = mysql_exec_many(sql_script_list)
    print_info_message("執行結果:")
    for item in exec_result_list:
        print_info_message(item)


def loop_trans_data():
    max_id, min_id, total_count = get_id_range()
    if min_id == max_id:
        print_info_message("無數據需要結轉")
        return

    current_min_id = min_id
    global Transfer_Rows_Per_Batch
    while current_min_id <= max_id:
        current_max_id = current_min_id + Transfer_Rows_Per_Batch
        trans_data(current_min_id, current_max_id)
        current_percent = (current_max_id - min_id) * 100.0 / (max_id - min_id)
        left_rows = max_id - current_max_id
        if left_rows < 0:
            left_rows = 0
        current_percent_str = "%.2f" % current_percent
        info = "當前複製進度{0}/{1},剩餘{2},進度為{3}%".format(current_max_id,
                                                    max_id, left_rows,
                                                    current_percent_str)
        print_info_message(info)
        time.sleep(Sleep_Second_Per_Batch)
        current_min_id = current_max_id
    print_info_message("*" * 70)
    print_info_message("執行完成")


if __name__ == '__main__':
    main()
View Code

 

按照各位場景的,需要修改資料庫連接信息:

還有需要遷移表的信息:

 

生成測試數據的mysql腳本:

CREATE TABLE `tb2001` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `c1` varchar(200) DEFAULT NULL,
  `dt` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

create table tb2001_his like tb2001;

insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM mysql.user;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
View Code

 

最終運行結果如下:

顯示簡單粗暴,有興趣的可以在此基礎上修改!

=================================================================


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • SQLServer事務的隔離級別 資料庫是要被廣大客戶所共用訪問的,那麼在資料庫操作過程中很可能出現以下幾種不確定情況。 更新丟失(Lost update) 兩個事務都同時更新一行數據,但是第二個事務卻中途失敗退出,導致對數據的兩個修改都失效了。這是因為系統沒有執行任何的鎖操作,因此併發事務並沒有被 ...
  • DECLARE @hdoc int DECLARE @doc xml SET @doc = '<CityValueSet> <CityItem> <CityId>201</CityId><CityName>北京</CityName><EngName>beijing</EngName><Level>1 ...
  • 創建表: 表數據的增刪改查: 修改表結構: 去除重覆記錄: 排序: 查詢數據前多少條: 模糊查詢: 通配符:_ 表示任意的單個字元 % 匹配任意多個字元 [] 表示範圍內的單個字元 [^] 不在指定範圍內的單個字元 自定義轉義符:escape 空值判斷: 類型轉換函數:cast(表達式 as 數據類 ...
  • Laxcus大數據管理系統是我們Laxcus大數據實驗室歷時5年,全體系全功能設計研發的產品,目前已經發展到2.1版本,並投入到多個大數據和雲計算項目中使用。Laxcus大數據管理系統採用松耦合架構,整合了大數據和關係資料庫的技術,實現了一站式數據處理,具有易操作、易維護、運行穩定的特點,並行節點數... ...
  • 花點時間整理下sql基礎,溫故而知新,也方便複習查看。文章的demo來自oracle自帶的dept,emp,salgrade三張表。解鎖scott用戶,使用scott用戶登錄就可以看到自帶的表。 #使用oracle用戶登錄linux [oracle@localhost ~]$ sqlplus / a... ...
  • ...
  • 80%的前500強企業就數據管理方面都有一個共性——管理規範,高效輔助流程。 但數據管理並不是一言即成,尤其是處於快速發展和轉型的企業。就數據系統而言,一旦系統增多,相應的數據問題也隨之而來。那麼如何統一有效地管理數據?實現數據可視化?這裡分享某百強集團搭建數據平臺的建設經驗。 ...
  • truncate table page_frame_mst; select setval('page_frame_mst_id_seq', 1, false); select setval('image_group_mst_id_seq', (select max(id) from image_gr ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...