1 #include <sys/types.h> 2 #include <sys/socket.h> 3 #include <sys/epoll.h> 4 #include <netdb.h> 5 #include <string.h> 6 #include <stdio.h> 7 #include ...
1 #include <sys/types.h> 2 #include <sys/socket.h> 3 #include <sys/epoll.h> 4 #include <netdb.h> 5 #include <string.h> 6 #include <stdio.h> 7 #include <unistd.h> 8 #include <fcntl.h> 9 #include <stdlib.h> 10 #include <errno.h> 11 #include <sys/wait.h> 12 #include <unistd.h> 13 #include <semaphore.h> 14 #include <sys/shm.h> 15 #define IP "127.0.0.1" 16 #define PORT 8888 17 #define PROCESS_NUM 4 18 #define MAXEVENTS 64 19 20 static int 21 create_and_bind () 22 { 23 int fd = socket (PF_INET, SOCK_STREAM, 0); 24 struct sockaddr_in serveraddr; 25 serveraddr.sin_family = AF_INET; 26 inet_pton (AF_INET, IP, &serveraddr.sin_addr); 27 serveraddr.sin_port = htons (PORT); 28 bind (fd, (struct sockaddr *) &serveraddr, sizeof (serveraddr)); 29 return fd; 30 } 31 32 static int 33 make_socket_non_blocking (int sfd) 34 { 35 int flags, s; 36 flags = fcntl (sfd, F_GETFL, 0); 37 if (flags == -1) 38 { 39 perror ("fcntl"); 40 return -1; 41 } 42 flags |= O_NONBLOCK; 43 s = fcntl (sfd, F_SETFL, flags); 44 if (s == -1) 45 { 46 perror ("fcntl"); 47 return -1; 48 } 49 return 0; 50 } 51 52 void 53 worker (int sfd, int efd, struct epoll_event *events, int k, sem_t * sem) 54 { 55 /* The event loop */ 56 struct epoll_event event; 57 // struct epoll_event *events; 58 efd = epoll_create (MAXEVENTS); 59 if (efd == -1) 60 { 61 perror ("epoll_create"); 62 abort (); 63 } 64 int epoll_lock = 0; 65 while (1) 66 { 67 int n, i; 68 int s; 69 event.data.fd = sfd; 70 event.events = EPOLLIN; 71 if (0 == sem_trywait (sem)) 72 { 73 //拿到鎖的進程將listen 描述符加入epoll 74 if (!epoll_lock) 75 { 76 fprintf (stderr, "%d >>>get lock\n", k); 77 s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event); 78 if (s == -1) 79 { 80 perror ("epoll_ctl"); 81 abort (); 82 } 83 epoll_lock = 1; 84 } 85 } 86 else 87 { 88 fprintf (stderr, "%d not lock\n", k); 89 //沒有拿到鎖的進程 將lisfd 從epoll 中去掉 90 if (epoll_lock) 91 { 92 fprintf (stderr, "worker %d return from epoll_wait!\n", k); 93 if (-1 == epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event)) 94 { 95 if (errno == ENOENT) 96 { 97 fprintf (stderr, "EPOLL_CTL_DEL\n"); 98 } 99 } 100 epoll_lock = 0; 101 } 102 } 103 //epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event); 104 // fprintf(stderr, "ok\n"); 105 //不能設置為-1 為了能讓拿不到鎖的進程再次拿到鎖 106 n = epoll_wait (efd, events, MAXEVENTS, 300); 107 for (i = 0; i < n; i++) 108 { 109 if (sfd == events[i].data.fd) 110 { 111 /* We have a notification on the listening socket, which means one or more incoming connections. */ 112 struct sockaddr in_addr; 113 socklen_t in_len; 114 int infd; 115 char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; 116 in_len = sizeof in_addr; 117 while ((infd = accept (sfd, &in_addr, &in_len)) > 0) 118 { 119 fprintf(stderr, "get one\n"); 120 close (infd); 121 } 122 } 123 } 124 if (epoll_lock) 125 { 126 //這裡將鎖釋放 127 sem_post (sem); 128 epoll_lock = 0; 129 epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event); 130 } 131 } 132 } 133 134 int 135 main (int argc, char *argv[]) 136 { 137 int shmid; 138 sem_t *acctl; 139 //建立共用記憶體 140 shmid = shmget (IPC_PRIVATE, sizeof (sem_t), 0600); 141 acctl = (sem_t *) shmat (shmid, 0, 0600); 142 //進程間信號量初始化 要用到上面的共用記憶體 143 sem_init (acctl, 1, 1); 144 int sfd, s; 145 int efd; 146 // struct epoll_event event; 147 // struct epoll_event *events; 148 sfd = create_and_bind (); 149 if (sfd == -1) 150 { 151 abort (); 152 } 153 s = make_socket_non_blocking (sfd); 154 if (s == -1) 155 { 156 abort (); 157 } 158 s = listen (sfd, SOMAXCONN); 159 if (s == -1) 160 { 161 perror ("listen"); 162 abort (); 163 } 164 efd = 0; 165 int k; 166 for (k = 0; k < PROCESS_NUM; k++) 167 { 168 printf ("Create worker %d\n", k + 1); 169 int pid = fork (); 170 if (pid == 0) 171 { 172 struct epoll_event *events; 173 events = calloc (MAXEVENTS, sizeof (struct epoll_event)); 174 worker (sfd, efd, events, k, acctl); 175 break; 176 } 177 } 178 int status; 179 wait (&status); 180 close (sfd); 181 return EXIT_SUCCESS; 182 } 183 /* 184 * 這裡處理驚群 用到了進程的鎖(信號量, 共用記憶體), 根據試驗的結果多個進程時accept接收客戶端連接的效率並沒有提高太多 185 * 但是處理其他可讀可寫(非監聽描述符)時, 要比單個進程要快很多。 186 */ 187 188View Code
在早期的kernel中, 多線程或多進程調用accept就會出現如下情況, 當前多個進程阻塞在accept中, 此時有客戶端連接時, 內核就會通知阻塞在accept的所有進程, 這時就會造成驚群現象, 也就是所有accept都會返回 但是只有一個能拿到有效的文件描述符, 其他進程最後都會返回無效描述符。但在linux kernel 版本2.6 以上時, accept驚群的問題已經解決, 大致方案就是選一個阻塞在accept的進程返回。
但是在IO復用中, select/poll/epoll 還是存在這種現象,其原因就是這些阻塞函數造成了以上同樣的問題。這裡就給出了類似Nginx的解決方案, 給監聽描述符競爭枷鎖, 保證只有一個進程處理監聽描述符。 這裡還可以控制鎖的頻率,如果一個進程接受連接的數量達到一定數量就不再申請鎖。 這裡要註意的是epoll_create的位置要在fork之後的子進程中, 這是因為若在父進程中create 那麼fork之後子進程保留這個描述符副本,epoll_create其實是內核中建立的文件系統 保存在內核中, 那麼其子進程都會共用這個文件系統, 那麼多任務的epoll_ctl就會出現問題。子進程中建立各自的epoll fd 就可以避免這種情況。