最近看了slave IO的源碼,發現slave IO的寫relay log貌似是單線程單連接的,這讓我有點小失望。 slave IO的主函數是handle_slave_io,處理流程如下: 圖1 handle_slave_io處理流程 我們這次主要要完成safe_connect以及try_to_re ...
最近看了slave IO的源碼,發現slave IO的寫relay log貌似是單線程單連接的,這讓我有點小失望。
slave IO的主函數是handle_slave_io,處理流程如下:
圖1 handle_slave_io處理流程
我們這次主要要完成safe_connect以及try_to_reconnet用到的核心函數 mysql_real_connect流程的探索。
一、mysql_real_connect流程
在這之前我們需要弄明白連接mysql需要那幾步操作,參考自官網的文檔(http://dev.mysql.com/doc/internals/en/plain-handshake.html),據說連接時需要以下操作:
圖2 mysql_real_connect操作流程
1.建立與mysql的連接
對於需要連接的建立一個監聽埠,然後建立與鏈表中的所有服務端建立連接,並綁定到監聽埠
1 if (!net->vio && 2 (!mysql->options.protocol || 3 mysql->options.protocol == MYSQL_PROTOCOL_SOCKET) && 4 (unix_socket || mysql_unix_port) && 5 (!host || !strcmp(host,LOCAL_HOST))) 6 { 7 my_socket sock= socket(AF_UNIX, SOCK_STREAM, 0); 8 DBUG_PRINT("info", ("Using socket")); 9 if (sock == SOCKET_ERROR) 10 { 11 set_mysql_extended_error(mysql, CR_SOCKET_CREATE_ERROR, 12 unknown_sqlstate, 13 ER(CR_SOCKET_CREATE_ERROR), 14 socket_errno); 15 goto error; 16 } 17 18 net->vio= vio_new(sock, VIO_TYPE_SOCKET, 19 VIO_LOCALHOST | VIO_BUFFERED_READ); 20 if (!net->vio) 21 { 22 DBUG_PRINT("error",("Unknow protocol %d ", mysql->options.protocol)); 23 set_mysql_error(mysql, CR_CONN_UNKNOW_PROTOCOL, unknown_sqlstate); 24 closesocket(sock); 25 goto error; 26 } 27 28 host= LOCAL_HOST; 29 if (!unix_socket) 30 unix_socket= mysql_unix_port; 31 host_info= (char*) ER(CR_LOCALHOST_CONNECTION); 32 DBUG_PRINT("info", ("Using UNIX sock '%s'", unix_socket)); 33 34 memset(&UNIXaddr, 0, sizeof(UNIXaddr)); 35 UNIXaddr.sun_family= AF_UNIX; 36 strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1); 37 38 if (vio_socket_connect(net->vio, (struct sockaddr *) &UNIXaddr, 39 sizeof(UNIXaddr), get_vio_connect_timeout(mysql))) 40 { 41 DBUG_PRINT("error",("Got error %d on connect to local server", 42 socket_errno)); 43 set_mysql_extended_error(mysql, CR_CONNECTION_ERROR, 44 unknown_sqlstate, 45 ER(CR_CONNECTION_ERROR), 46 unix_socket, socket_errno); 47 vio_delete(net->vio); 48 net->vio= 0; 49 goto error; 50 } 51 mysql->options.protocol=MYSQL_PROTOCOL_SOCKET; 52 }View Code
1 for (t_res= res_lst; t_res; t_res= t_res->ai_next) 2 { 3 DBUG_PRINT("info", ("Create socket, family: %d type: %d proto: %d", 4 t_res->ai_family, t_res->ai_socktype, 5 t_res->ai_protocol)); 6 7 sock= socket(t_res->ai_family, t_res->ai_socktype, t_res->ai_protocol); 8 if (sock == SOCKET_ERROR) 9 { 10 DBUG_PRINT("info", ("Socket created was invalid")); 11 /* Try next address if there is one */ 12 saved_error= socket_errno; 13 continue; 14 } 15 16 if (client_bind_ai_lst) 17 { 18 struct addrinfo* curr_bind_ai= NULL; 19 DBUG_PRINT("info", ("Attempting to bind socket to bind address(es)")); 20 21 /* 22 We'll attempt to bind to each of the addresses returned, until 23 we find one that works. 24 If none works, we'll try the next destination host address 25 (if any) 26 */ 27 curr_bind_ai= client_bind_ai_lst; 28 29 while (curr_bind_ai != NULL) 30 { 31 /* Attempt to bind the socket to the given address */ 32 bind_result= bind(sock, 33 curr_bind_ai->ai_addr, 34 curr_bind_ai->ai_addrlen); 35 if (!bind_result) 36 break; /* Success */ 37 38 DBUG_PRINT("info", ("bind failed, attempting another bind address")); 39 /* Problem with the bind, move to next address if present */ 40 curr_bind_ai= curr_bind_ai->ai_next; 41 } 42 43 if (bind_result) 44 { 45 /* 46 Could not bind to any client-side address with this destination 47 Try the next destination address (if any) 48 */ 49 DBUG_PRINT("info", ("All bind attempts with this address failed")); 50 saved_error= socket_errno; 51 closesocket(sock); 52 continue; 53 } 54 DBUG_PRINT("info", ("Successfully bound client side of socket")); 55 } 56 57 /* Create a new Vio object to abstract the socket. */ 58 if (!net->vio) 59 { 60 if (!(net->vio= vio_new(sock, VIO_TYPE_TCPIP, flags))) 61 { 62 set_mysql_error(mysql, CR_OUT_OF_MEMORY, unknown_sqlstate); 63 closesocket(sock); 64 freeaddrinfo(res_lst); 65 if (client_bind_ai_lst) 66 freeaddrinfo(client_bind_ai_lst); 67 goto error; 68 } 69 } 70 /* Just reinitialize if one is already allocated. */ 71 else if (vio_reset(net->vio, VIO_TYPE_TCPIP, sock, NULL, flags)) 72 { 73 set_mysql_error(mysql, CR_UNKNOWN_ERROR, unknown_sqlstate); 74 closesocket(sock); 75 freeaddrinfo(res_lst); 76 if (client_bind_ai_lst) 77 freeaddrinfo(client_bind_ai_lst); 78 goto error; 79 } 80 81 DBUG_PRINT("info", ("Connect socket")); 82 status= vio_socket_connect(net->vio, t_res->ai_addr, 83 (socklen_t)t_res->ai_addrlen, 84 get_vio_connect_timeout(mysql)); 85 /* 86 Here we rely on vio_socket_connect() to return success only if 87 the connect attempt was really successful. Otherwise we would 88 stop trying another address, believing we were successful. 89 */ 90 if (!status) 91 break; 92 93 /* 94 Save either the socket error status or the error code of 95 the failed vio_connection operation. It is necessary to 96 avoid having it overwritten by later operations. 97 */ 98 saved_error= socket_errno; 99 100 DBUG_PRINT("info", ("No success, try next address.")); 101 }View Code
2.讀取初始握手報文
1 if ((pkt_length=cli_safe_read(mysql, NULL)) == packet_error) 2 { 3 if (mysql->net.last_errno == CR_SERVER_LOST) 4 set_mysql_extended_error(mysql, CR_SERVER_LOST, unknown_sqlstate, 5 ER(CR_SERVER_LOST_EXTENDED), 6 "reading initial communication packet", 7 socket_errno); 8 goto error; 9 } 10 pkt_end= (char*)net->read_pos + pkt_length; 11 /* Check if version of protocol matches current one */ 12 mysql->protocol_version= net->read_pos[0]; 13 DBUG_DUMP("packet",(uchar*) net->read_pos,10); 14 DBUG_PRINT("info",("mysql protocol version %d, server=%d", 15 PROTOCOL_VERSION, mysql->protocol_version)); 16 if (mysql->protocol_version != PROTOCOL_VERSION) 17 { 18 set_mysql_extended_error(mysql, CR_VERSION_ERROR, unknown_sqlstate, 19 ER(CR_VERSION_ERROR), mysql->protocol_version, 20 PROTOCOL_VERSION); 21 goto error; 22 } 23 server_version_end= end= strend((char*) net->read_pos+1); 24 mysql->thread_id=uint4korr((uchar*) end + 1); 25 end+=5; 26 /* 27 Scramble is split into two parts because old clients do not understand 28 long scrambles; here goes the first part. 29 */ 30 scramble_data= end; 31 scramble_data_len= AUTH_PLUGIN_DATA_PART_1_LENGTH + 1; 32 scramble_plugin= NULL; 33 end+= scramble_data_len; 34 35 if (pkt_end >= end + 1) 36 mysql->server_capabilities=uint2korr((uchar*) end); 37 if (pkt_end >= end + 18) 38 { 39 /* New protocol with 16 bytes to describe server characteristics */ 40 mysql->server_language=end[2]; 41 mysql->server_status=uint2korr((uchar*) end + 3); 42 mysql->server_capabilities|= uint2korr((uchar*) end + 5) << 16; 43 pkt_scramble_len= end[7]; 44 if (pkt_scramble_len < 0) 45 { 46 set_mysql_error(mysql, CR_MALFORMED_PACKET, 47 unknown_sqlstate); /* purecov: inspected */ 48 goto error; 49 } 50 } 51 end+= 18; 52 53 if (mysql_init_character_set(mysql)) 54 goto error;View Code
3.發送回覆握手報文
通過run_plugin_auth發送回覆握手報文
1 mpvio.mysql_change_user= data_plugin == 0; 2 mpvio.cached_server_reply.pkt= (uchar*)data; 3 mpvio.cached_server_reply.pkt_len= data_len; 4 mpvio.read_packet= client_mpvio_read_packet; 5 mpvio.write_packet= client_mpvio_write_packet; 6 mpvio.info= client_mpvio_info; 7 mpvio.mysql= mysql; 8 mpvio.packets_read= mpvio.packets_written= 0; 9 mpvio.db= db; 10 mpvio.plugin= auth_plugin; 11 12 MYSQL_TRACE(AUTH_PLUGIN, mysql, (auth_plugin->name)); 13 14 res= auth_plugin->authenticate_user((struct st_plugin_vio *)&mpvio, mysql);View Code
1 static int native_password_auth_client(MYSQL_PLUGIN_VIO *vio, MYSQL *mysql) 2 { 3 int pkt_len; 4 uchar *pkt; 5 6 DBUG_ENTER("native_password_auth_client"); 7 8 9 if (((MCPVIO_EXT *)vio)->mysql_change_user) 10 { 11 /* 12 in mysql_change_user() the client sends the first packet. 13 we use the old scramble. 14 */ 15 pkt= (uchar*)mysql->scramble; 16 pkt_len= SCRAMBLE_LENGTH + 1; 17 } 18 else 19 { 20 /* read the scramble */ 21 if ((pkt_len= vio->read_packet(vio, &pkt)) < 0) 22 DBUG_RETURN(CR_ERROR); 23 24 if (pkt_len != SCRAMBLE_LENGTH + 1) 25 DBUG_RETURN(CR_SERVER_HANDSHAKE_ERR); 26 27 /* save it in MYSQL */ 28 memcpy(mysql->scramble, pkt, SCRAMBLE_LENGTH); 29 mysql->scramble[SCRAMBLE_LENGTH] = 0; 30 } 31 32 if (mysql->passwd[0]) 33 { 34 char scrambled[SCRAMBLE_LENGTH + 1]; 35 DBUG_PRINT("info", ("sending scramble")); 36 scramble(scrambled, (char*)pkt, mysql->passwd); 37 if (vio->write_packet(vio, (uchar*)scrambled, SCRAMBLE_LENGTH)) 38 DBUG_RETURN(CR_ERROR); 39 } 40 else 41 { 42 DBUG_PRINT("info", ("no password")); 43 if (vio->write_packet(vio, 0, 0)) /* no password */ 44 DBUG_RETURN(CR_ERROR); 45 } 46 47 DBUG_RETURN(CR_OK); 48 }View Code
先通過read_packet獲得挑戰碼,再通過scramble加密,然後通過write_packet發送回覆握手報文。
client_mpvio_write_packet->send_client_reply_packet,該函數是發送回覆握手報文。
4.讀入認證回覆報文
1 /* read the OK packet (or use the cached value in mysql->net.read_pos */ 2 if (res == CR_OK) 3 pkt_length= (*mysql->methods->read_change_user_result)(mysql); 4 else /* res == CR_OK_HANDSHAKE_COMPLETE */ 5 pkt_length= mpvio.last_read_packet_len;View Code
最後通過cli_read_change_user_result即cli_safe_read讀取ok報文
5.選擇資料庫
1 int STDCALL 2 mysql_select_db(MYSQL *mysql, const char *db) 3 { 4 int error; 5 DBUG_ENTER("mysql_select_db"); 6 DBUG_PRINT("enter",("db: '%s'",db)); 7 8 if ((error=simple_command(mysql,COM_INIT_DB, (const uchar*) db, 9 (ulong) strlen(db),0))) 10 DBUG_RETURN(error); 11 my_free(mysql->db); 12 mysql->db=my_strdup(key_memory_MYSQL, 13 db,MYF(MY_WME)); 14 DBUG_RETURN(0); 15 }View Code
以command報文的形式發送命令數據
二、 登陸階段所用到的報文格式
1.初始握手報文
1 1 [0a] protocol version 2 string[NUL] server version 3 4 connection id 4 string[8] auth-plugin-data-part-1 5 1 [00] filler 6 2 capability flags (lower 2 bytes) 7 if more data in the packet: 8 1 character set 9 2 status flags 10 2 capability flags (upper 2 bytes) 11 if capabilities & CLIENT_PLUGIN_AUTH { 12 1 length of auth-plugin-data 13 } else { 14 1 [00] 15 } 16 string[10] reserved (all [00]) 17 if capabilities & CLIENT_SECURE_CONNECTION { 18 string[$len] auth-plugin-data-part-2 ($len=MAX(13, length of auth-plugin-data - 8)) 19 if capabilities & CLIENT_PLUGIN_AUTH { 20 string[NUL] auth-plugin name 21 }View Code
(1)協議的版本
(2)協議的版本名
(3)連接id其實是線程的id
(4)挑戰碼的第一部分(用於登陸密碼加密)
(5)不用關註
(6)標誌位的最低兩個,該標誌會確定較多信息後面的capabilities就是該標誌為
(8)字元集編號,其實就是採用什麼樣的字元集,如utf8等等
(9)伺服器狀態編碼
(10)標誌位的較高兩位
(12)挑戰碼總長度(用於登陸密碼加密,是一個可選項)
(16)都是0,不用關註
(18)第二段挑戰碼(用於登陸密碼加密,是一個可選項)
(20)挑戰碼生成名(是一個可選項)
2.回覆握手報文
1 4 capability flags, CLIENT_PROTOCOL_41 always set 2 4 max-packet size 3 1 character set 4 string[23] reserved (all [0]) 5 string[NUL] username 6 if capabilities & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA { 7 lenenc-int length of auth-response 8 string[n] auth-response 9 } else if capabilities & CLIENT_SECURE_CONNECTION { 10 1 length of auth-response 11 string[n] auth-response 12 } else { 13 string[NUL] auth-response 14 } 15 if capabilities & CLIENT_CONNECT_WITH_DB { 16 string[NUL] database 17 } 18 if capabilities & CLIENT_PLUGIN_AUTH { 19 string[NUL] auth plugin name 20 } 21 if capabilities & CLIENT_CONNECT_ATTRS { 22 lenenc-int length of all key-values 23 lenenc-str key 24 lenenc-str value 25 if-more data in 'length of all key-values', more keys and value pairs 26 }View Code
(1)收到的capability flags
(2)最大報文長度,這是與mysql伺服器協商的
(3)字元集
(4)不需要關註
(5)登陸的用戶名
(7)(8)一般選項為此選項,即加密的密碼報文
下麵的報文在本文件發送中沒有用到
3.認證回覆報文
1 Type Name Description 2 int<1> header [00] or [fe] the OK packet header 3 int<lenenc> affected_rows affected rows 4 int<lenenc> last_insert_id last insert-id 5 if capabilities & CLIENT_PROTOCOL_41 { 6 int<2> status_flags Status Flags 7 int<2> warnings number of warnings 8 } elseif capabilities & CLIENT_TRANSACTIONS { 9 int<2> status_flags Status Flags 10 } 11 if capabilities & CLIENT_SESSION_TRACK { 12 string<lenenc> info human readable status information 13 if status_flags & SERVER_SESSION_STATE_CHANGED { 14 string<lenenc> session_state_changes session state info 15 } 16 } else { 17 string<EOF> info human readable status information 18 }View Code