本文主要介紹fio是如何運行的,並且以單線程、單job為例 fio的入口在fio.c中的main函數,下麵列出了main函數,此處只出示了一些調用的關鍵函數 在main函數中主要調用了兩個關鍵函數,parse_options,顧名思義,就是分析options,也就是fio的參數,而fio_backe ...
本文主要介紹fio是如何運行的,並且以單線程、單job為例
fio的入口在fio.c中的main函數,下麵列出了main函數,此處只出示了一些調用的關鍵函數
1 int main(int argc, char *argv[], char *envp[]) 2 { 3 parse_options(argc, argv); 4 fio_backend(); 5 }
在main函數中主要調用了兩個關鍵函數,parse_options,顧名思義,就是分析options,也就是fio的參數,而fio_backend()函數則是fio進程的入口
fio_backend()函數在backend.c文件中
1 int fio_backend(void) 2 { 3 ...... 4 run_threads(); 5 ...... 6 }
在fio_backend()函數中,初始化一些數據結構之後,調用了run_threads()(backend.c)函數,該函數是fio用來創建譬如I/O, verify線程等。
1 /* 2 * Main function for kicking off and reaping jobs, as needed. 3 */ 4 static void run_threads(void) 5 { 6 ...... 7 todo = thread_number; 8 ...... 9 while (todo) { 10 if (td->o.use_thread) { 11 ...... 12 ret = pthread_create(&td->thread, NULL,thread_main, td); 13 ret = pthread_detach(td->thread); 14 ...... 15 } 16 ...... 17 }
在這個函數中,創建了thread_main線程(backend.c),這個線程功能是,生成I/O,發送並完成I/O,記錄數據等。
1 /* 2 * Entry point for the thread based jobs. The process based jobs end up 3 * here as well, after a little setup. 4 */ 5 static void *thread_main(void *data) 6 { 7 ........ 8 /* 9 * May alter parameters that init_io_u() will use, so we need to 10 * do this first. 11 * 下麵兩個函數的主要功能是生成讀寫的參數,offset,len 12 */ 13 if (init_iolog(td)) 14 goto err; 15 16 if (init_io_u(td)) 17 goto err; 18 ...... 19 while (keep_running(td)) { 20 do_io(td); 21 do_verify(td, verify_bytes);//如果需要verification的話 22 } 23 }
如果不考慮verify的話,下麵主要看do_io(backend.c)函數。
1 /* 2 * Main IO worker function. It retrieves io_u's to process and queues 3 * and reaps them, checking for rate and errors along the way. 4 * 5 * Returns number of bytes written and trimmed. 6 */ 7 static uint64_t do_io(struct thread_data *td) 8 { 9 ...... 10 //下麵是do_io的主迴圈,判斷條件是,io_log里有生成的pos信息,而且已經iuuse的數據小於總數據, 11 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 12 (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) || 13 td->o.time_based) { 14 ...... 15 io_u = get_io_u(td);// io_u,是一個io unit,是根據參數生成的io unit 16 ...... 17 ret = td_io_queue(td, io_u); //將io_u提交到隊列中 18 switch (ret) { 19 case FIO_Q_COMPLETED: //處理錯誤,以及同步的操作 20 ...... 21 case FIO_Q_QUEUED://成功入隊 22 bytes_issued += io_u->xfer_buflen; 23 case FIO_Q_BUSY: //隊伍滿了,重新入隊 24 ....... 25 /* 26 * See if we need to complete some commands. Note that we 27 * can get BUSY even without IO queued, if the system is 28 * resource starved. 29 */ 30 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 31 if (full || !td->o.iodepth_batch_complete) { 32 min_evts = min(td->o.iodepth_batch_complete,td->cur_depth); 33 /* 34 * if the queue is full, we MUST reap at least 1 event 35 */ 36 if (full && !min_evts) 37 min_evts = 1; 38 do { 39 ret = io_u_queued_complete(td, min_evts, bytes_done); 40 } while (full && (td->cur_depth > td->o.iodepth_low)); 41 } 42 }
上面do_io函數的關鍵入隊函數td_io_queue(ioengines.c)
1 int td_io_queue(struct thread_data *td, struct io_u *io_u) 2 { 3 ...... 4 ret = td->io_ops->queue(td, io_u); 5 ...... 6 else if (ret == FIO_Q_QUEUED) { 7 int r; 8 if (ddir_rw(io_u->ddir)) { 9 td->io_u_queued++; 10 td->ts.total_io_u[io_u->ddir]++; 11 } 12 if (td->io_u_queued >= td->o.iodepth_batch) { 13 r = td_io_commit(td); 14 if (r < 0) 15 return r; 16 } 17 } 18 } 19 int td_io_commit(struct thread_data *td) 20 { 21 ...... 22 int ret; 23 if (td->io_ops->commit) { 24 ret = td->io_ops->commit(td); 25 } 26 ...... 27 return 0; 28 }
對於td->iops,它是在各個engines中定義了,以libaio(/engines/libaio.c)為例,調用的函數就是相應engines里對應的函數
1 static struct ioengine_ops ioengine = { 2 .name = "libaio", 3 .version = FIO_IOOPS_VERSION, 4 .init = fio_libaio_init, 5 .prep = fio_libaio_prep, 6 .queue = fio_libaio_queue, 7 .commit = fio_libaio_commit, 8 .cancel = fio_libaio_cancel, 9 .getevents = fio_libaio_getevents, 10 .event = fio_libaio_event, 11 .cleanup = fio_libaio_cleanup, 12 .open_file = generic_open_file, 13 .close_file = generic_close_file, 14 .get_file_size = generic_get_file_size, 15 .options = options, 16 .option_struct_size = sizeof(struct libaio_options), 17 };
對於reap流程里的io_u_queued_complete(io_u.c)函數
1 /* 2 * Called to complete min_events number of io for the async engines. 3 */ 4 int io_u_queued_complete(struct thread_data *td, int min_evts, 5 uint64_t *bytes) 6 { 7 ...... 8 9 ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete, tvp); 10 ...... 11 } 12 13 int td_io_getevents(struct thread_data *td, unsigned int min, unsigned int max, 14 struct timespec *t) 15 { 16 int r = 0; 17 18 if (min > 0 && td->io_ops->commit) { 19 r = td->io_ops->commit(td); 20 if (r < 0) 21 goto out; 22 } 23 if (max > td->cur_depth) 24 max = td->cur_depth; 25 if (min > max) 26 max = min; 27 28 r = 0; 29 if (max && td->io_ops->getevents) 30 r = td->io_ops->getevents(td, min, max, t); 31 out: 32 ...... 33 return r; 34 }
這裡調用的getevents也是各個engines里定義的函數