阻塞I/O
先用一个图来描述它:
实际上,之前我们使用的套接口I/O编程都是用该模型,针对上面的图进行说明一下:一旦套接口连接成功之后,就可以recv数据了,如下:
会向系统发起请求来接收数据,而这个recv请求是阻塞的,那什么时候解除阻塞呢,直到对方等方数据过来,填充了recv这个套接口所对应的接收缓冲区,才会解除,也就是说如图:
也就是说,接收缓冲区之前是“没有数据的”,一旦对等方发送数据过来,将接收缓冲区数据填充,这时候就会将数据从内核空间(也就是套接口接收缓冲区)复制到用户空间,如下:
因为我们在recv的时候,会提供一个buffer,如下:
一旦拷贝完成,recv函数就返回了,这时就可以进行数据处理了,如下:
非阻塞I/O【用得很少,做了解既可】
这个模式也是调用recv函数进行数据接收,但是会将其设置为非阻塞模式(用fcntl(fd, F_SETFL, flag|O_NONBLOCK),之前有学过),这时recv函数既可没有数据到来,也不会阻塞,如果内核中没有数据时,它不会阻塞,会返回一个错误,错误代码为-1,会返回:EWOULDBLOCK,如下:
如果说还想获取数据,则需要再次判断如果等于-1,错误码等于EWOULDBLOCK,再次提交请求:
直到有数据到来,这时的动作跟阻塞模式差不多,会将数据从内核空间拷贝到用户空间,然后recv返回,然后就可以数据处理了。 但是这种模型应用是很少的,因为在数据没有到来的过程当中,我们需要不停地循环接收,直到数据到来,而这个循环接收并没有阻塞,实际上是对CPU资源的极大的浪费,对于这种循环接收,可以称为忙等待(需要等待数据,但是数据又没有到来,而又需要占用CPU时间片),这种不推荐使用。
I/O复用(select和poll)【这个是主要研究的对象】
这种模型主要是通过select来实现的,该模式的一个思想是:用select来管理多个文件描述符,一旦有一个文件描述符检测到了有数据到来,这时select就会返回,这时recv就不会阻塞了,就可以将数据从内核空间拷到用户空间,其中阻塞提前到了select了,这个模型是这次主要研究的对象,先大致知道了解一下。
信号驱动I/O【用得很少,做了解既可】
在用户空间中,利用sigaction安装一个SIGIO的信号处理程序,这时程序就可以做其它事情了,一旦有数据到来,就会以信号的方式来通知应用程序,然后处理程序就可以调用recv来接收数据,紧接着就是从内核空间到用户空间的数据拷贝过程,这时候recv函数是不会阻塞的,可见这种方式是通用信号来通知应用程序的,所以应用进程需要在信号处理程序中去处理接收数据的细节,那么这样的话就使得异步处理成为可能,所以信号是异步处理的一种方式,这里了解一下既可。
异步I/O
关于这个模形,由于也没有得到推广,所以这里就略过,我们把重点花在有意义的事上。
对于上面列的五种I/O模型, 接下来主要研究I/O复用模型中的select模型,对于它,上面介绍该模型时也说过,它可以管理多个文件描述符,或者说可以管理多个I/O,它的函数原形如下:
一旦某个I/O检测到了我们所感兴趣的事件就立刻返回,如果有多个I/O,当发生事件时,将返回到的一些事件,填充到对应的集合当中(readfds),并且返回一个事件的个数,这时候就可以轮循事件,一个个处理它,而这时候的事件是不会阻塞的,因为select已经提前阻塞了,它的返回意味着事件已经到来了,为了更好的理解select的用法,我们首先来看一下上次我们所实现的回射客户/服务器程序具有什么样的问题,来做一个回顾,然后再用select函数来解决这个问题:
这时候,将服务端关才掉,这里用kill命令来模拟:
这时再来查看下状态:
而根据TCP关闭连接的状态来看:
如果客户端read返回为0时,则应该会调close,进而服务端最终状态为TIME_WAIT状态,为啥没有进入此状态呢?简略的说就是由于客户端阻塞在这个位置了:
本质的原因是从键盘接收数据跟网络接收数据这两个事件没有办法同时进行处理,这时可以用select来进行管理,管理fgets标准输入的I/O和sock套接口I/O,一旦其中一个或者多个产生可读事件,则进行处理,也就是这个时候,既可以在接收键盘数据的同时,也可以检测到网络数据的到来,这时就可以进行相应的处理,因为select函数能够解除阻塞,所以,接下来,利用它来改进回射客户端程序。
理解参数:
select可以看成是一个管理者,可以管理多个I/O,一旦其中的一个I/0检测到我们感兴趣的事件,select函数就返回,返回值为检测到的事件个数,并且返回哪些I/O发生了事件,遍历这些事件,进而处理事件,根据这些理论,对于其函数的参数就比较容易理解了:
①、fd_set *readfds:这时一个集合,表示一个读的集合,也是最常用的一个集合,表示如果检测到有读的套接口则放到这个集合中,一旦数据可读,select就可以返回。
②、fd_set *writefds【这次学习先用不上,可以直接填空】:这个从单词上来看,就很容易理解,表示可写的集合。
③、fd_set *exceptfds【这次学习先用不上,可以直接填空】:异常的集合。
④、struct timeval *timeout:这表示超时时间,如果填写NULL,则不会超时,一定要检测到事件后才会返回;如果指定是超时时间,则在超时时间到来的时候还没有检测到事件,也会返回,这时返回的事件个数就等于0,另外select返回失败为-1。
⑤、nfds:它表示存放在集合中(readfds、writefds、exceptfds)的这些描述符的最大值+1,比如:readfds集合中存放了描述符3、5、8,而writefds集合中存放了描述符4、9,那么这个参数就是集合中最大描述符9+1=10。
另对,对于返回值,是返回哪些I/O发生了事件,这是什么意思呢,假如readfds集合提交了3、4、5,我要关心这三个I/O的可读事件,这时如果3跟5发生了可读事件,我如何标识它呢?实际上就是将readfds这个集合改变,集合的内容改成了3、5,这是返回的准备到的个数为2,用图来表示如下:
从图中描述可以看出,readfds是输出输出参数,同理,writefds、exceptfds、timeout也是输出输出参数(比如指定的是2s的时间,但是1s内就返回了,这时后它的值就为剩余的时间)。
与select这些集合操作相配合的有四个宏进行操作,下面来简述一下,之后都会用到:
将文件描述符fd从集合set当中移除。
判定文件描述符fd是否在集合set中,注意:这里的set不是输入输出参数,也就是只读的。
将文件描述符fd添加到集合set当中。
清空集合。
好了,下面就用上面的一些理论来改进程序,来更好的理解select函数的用法,对于之前的代码,只需要对客户端这个函数的实现进行改进,如下:
首先先将函数的实现注释掉,然后一步步用select来改造,根据select的参数来编写:
第一步,首先获得最大的文件描述符,也是第一个填充第一个参数:
另外可以将sock和stdin两个文件描述符加入到集合中:
接下来,由于事件成功返回了,那就可以判断标准输入fd_stdin、sock是否在rset集合里,如果在集合中就证明已经检测到了事件,然后就可以分别进行判断处理了:
对于套接口产生事件,应该将原来的代码挪进来:
实现如下:
对于键盘的输入事件,也应该将原来的代码挪过来,如下:
下面编译运行看下之前的问题有没有解决:
这些状态都比较好理解,下面到了关键验证步骤,就是先将服务端关闭掉:
从中可以看到,服务端变为了TIME_WAIT状态,也就是向前进常迈进了,这次就不会因为fgets阻塞造成无法进入此正常状态了,而客户端这时就变为CLOSED状态了,当然通过命令就看不到此状态了,如下:
最后再贴一下经过改用select修改的客户服务回射的完整代码如下:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nread = read(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nwritten = write(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nwritten == 0)
continue;
bufp += nwritten;
nleft -= nwritten;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while (1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == -1 && errno == EINTR)
continue;
return ret;
}
}
ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while (1)
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < 0)
return ret;
else if (ret == 0)
return ret;
nread = ret;
int i;
for (i=0; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+1);
if (ret != i+1)
exit(EXIT_FAILURE);
return ret;
}
}
if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE);
bufp += nread;
}
return -1;
}
void echo_srv(int conn)
{
char recvbuf[1024];
while (1)
{
memset(recvbuf, 0, sizeof(recvbuf));
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
}
void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-1, NULL, WNOHANG) > 0)
;
}
int main(void)
{
/* signal(SIGCHLD, SIG_IGN);*/
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
/* if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)*/
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
/*servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");*/
/*inet_aton("127.0.0.1", &servaddr.sin_addr);*/
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen = sizeof(peeraddr);
int conn;
pid_t pid;
while (1)
{
if ((conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
if (pid == 0)
{
close(listenfd);
echo_srv(conn);
exit(EXIT_SUCCESS);
}
else
close(conn);
}
return 0;
}
复制代码
echocli.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nread = read(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nwritten = write(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nwritten == 0)
continue;
bufp += nwritten;
nleft -= nwritten;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while (1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == -1 && errno == EINTR)
continue;
return ret;
}
}
ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while (1)
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < 0)
return ret;
else if (ret == 0)
return ret;
nread = ret;
int i;
for (i=0; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+1);
if (ret != i+1)
exit(EXIT_FAILURE);
return ret;
}
}
if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE);
bufp += nread;
}
return -1;
}
void echo_cli(int sock)
{
/*
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
writen(sock, sendbuf, strlen(sendbuf));
int ret = readline(sock, recvbuf, sizeof(recvbuf));
if (ret == -1)
ERR_EXIT("readline");
else if (ret == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
memset(sendbuf, 0, sizeof(sendbuf));
memset(recvbuf, 0, sizeof(recvbuf));
}
close(sock);
*/
fd_set rset;//声明一个可读的集合
FD_ZERO(&rset);//将集合清空
int nready;//检测到的事件个数
//获得最大的文件描述符
int maxfd;
int fd_stdin = fileno(stdin);
if (fd_stdin > sock)
maxfd = fd_stdin;
else
maxfd = sock;
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
while (1)
{
FD_SET(fd_stdin, &rset);
FD_SET(sock, &rset);
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
ERR_EXIT("select");
if (nready == 0)
continue;
if (FD_ISSET(sock, &rset))
{//套接口产生了事件
int ret = readline(sock, recvbuf, sizeof(recvbuf));
if (ret == -1)
ERR_EXIT("readline");
else if (ret == 0)
{
printf("server close\n");
break;
}
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
}
if (FD_ISSET(fd_stdin, &rset))
{//标准输入产生的事件
if (fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)
break;
writen(sock, sendbuf, strlen(sendbuf));
memset(sendbuf, 0, sizeof(sendbuf));
}
}
close(sock);
}
void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
}
int main(void)
{
/*
signal(SIGPIPE, handle_sigpipe);
*/
signal(SIGPIPE, SIG_IGN);
int sock;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("connect");
struct sockaddr_in localaddr;
socklen_t addrlen = sizeof(localaddr);
if (getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < 0)
ERR_EXIT("getsockname");
printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
echo_cli(sock);
return 0;
}
复制代码
上面是函数的原形,下面用理论阐述一下它,它可以看成是一个中心管理器,能够统一管理多个I/O,一旦其中的一个或多个I/O产生了我们所感兴趣的事件,就会被select检测到并返回,再来回顾一下参数,第一个参数是nfds,表示感兴趣的文件描述符的最大值+1,这里多解释一下,为什么是最大值+1呢?实际上select函数是在遍历它所感兴趣的文件描述符是否产生了事件,是从0开始遍历,至nfds的一个[0,nfds)的一个闭开区间,而通常我们写for循环时都会这样写:
for(i=0; i<n; i++){
//TODO
}
正好这也是一个闭开区间,所以第一个参数nfds为最大描述符+1,
第二个参数readfds:可读事件集合
第三个参数writefds:可写事件集合
第四个参数exceptfds:异常事件集合
第五个参数timeout:超时时间
一旦检测到了多个事件,就需要一个一个遍历它,一一处理这些I/O事件,通常将用select实现的服务器称之为并发服务器。为啥叫并发服务器呢?因为当我们检测到多个I/O事件之后,实际上是无法并行处理这些I/O事件。实际上select处理事件是按顺序执行的,比如产生了三个事件,则是先执行第一个事件,然后再执行第二个事件,以此类推,所以说它不是并行的,而是并发,为啥是并发,是因为处理这些事件时间也不能太长,也就是说select无法实现并行处理,也就是无法充分利用多核CPU的特点,实际上对于单核的cpu来说,是根据没有并行可言的,而对于多核cpu,select是无法充分利用的,那这时该怎么办呢?可以采用多进程或多线程,关于并发与并行处理,这个之后会研究,这里先大致了解一下概念既可。
上节中用select改进了回射客户端的问题,程序可以同时监测两种事件,一种是标准输入I/O事件,还一种是网络I/O,而不至于因为程序阻塞在标准输入I/O,而同时网络I/O也已经到达了而不能处理,这就是使用select的好处。
上节中只使用了读条件了,这次会对其它事件也进行学习。
以上是可读事件产生的四种情况。
下面,就用select函数来改进回射服务器程序,上节只是改进了回射客户端程序。
先来回顾一下目前的服务器程序,是每连接成功一个客户端,就会创建一个子进程出来进行处理:
这种服务器也叫做并发服务器,通过创建一个进程来达到并发的目的,当有多个客户端连接时,就会有多个进程,那有没有可能用一个进程来实现并发呢?当然是可以做到的,也就是用select,其最终原因是因为它能管理多个I/O,实际上,对于单核CPU来说,select处理并发并不会比多进程效率低,因为多进程在单核的情况下实际上还是按顺序来进行处理的,所以,下面则正式进行修改:
首先将这些代码注释掉,因为是需要改成用select实现的:
编写方法基本跟上节当中的客户端的差不多:
【提示】:记得先记住这个allset,之后随着不断加入代码逻辑,就会自然而然显现它的作用了。
接下来,由于事件成功返回了,那就可以判断标准输入listenfd是否在rset集合里,如果在集合中就证明已经检测到了事件,然后就可以分别进行判断处理了:
另外,由于这一次是单进程的实现方式,当有多个客户端连接时,其conn客户端连接信息是需要用户个数组来保存的,
int main(void)
{
/* signal(SIGCHLD, SIG_IGN);*/
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
/* if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)*/
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
/*servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");*/
/*inet_aton("127.0.0.1", &servaddr.sin_addr);*/
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen = sizeof(peeraddr);
int conn;
/*
pid_t pid;
while (1)
{
if ((conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
if (pid == 0)
{
close(listenfd);
echo_srv(conn);
exit(EXIT_SUCCESS);
}
else
close(conn);
}
*/
int client[FD_SETSIZE];//定义一个数组用来保存conn,其中FD_SETSIZE为最大文件描述符个数,不能超过它
int i;
for (i=0; i<FD_SETSIZE; i++)//对里面的数组都初使化为-1
client[i] = -1;
int nready;//检测到的事件个数
int maxfd = listenfd;//获取最大的文件描述符,目前listenfd最大
fd_set rset;//声明一个可读的集合
fd_set allset;
//以下两句是将集合清空
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);//将监听套接口放到allset当中
while (1)
{
rset = allset;
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("select");
}
if (nready == 0)
continue;
if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept");
for (i=0; i<FD_SETSIZE; i++)//将conn放到数组当中
{
if (client[i] < 0)
{
client[i] = conn;
break;
}
}
if (i == FD_SETSIZE)//没有找到空闲的位置,也就是连接数已经达到了上线,则给出提示
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
  printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));//打印输出对方的IP和端口信息
}
}
return 0;
}
复制代码
接下来要做一件事,这时已经得到了conn的套接口,下一次再调用select时我们也需要关心它的可读事件,这时需要做如下处理:
那思考一下为什么要用到allset这个变量?这时因为rset会被select函数所改变,所以对于所有感兴趣的事件需要存放在allset当中,
接下来,则处理已连接套接口事件了,对于这个套接口会有很多个,因为可以连接很多客户端,所以处理如下:
int main(void)
{
/* signal(SIGCHLD, SIG_IGN);*/
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
/* if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)*/
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
/*servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");*/
/*inet_aton("127.0.0.1", &servaddr.sin_addr);*/
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen = sizeof(peeraddr);
int conn;
int client[FD_SETSIZE];//定义一个数组用来保存conn,其中FD_SETSIZE为最大文件描述符个数,不能超过它
int i;
for (i=0; i<FD_SETSIZE; i++)//对里面的数组都初使化为-1
client[i] = -1;
int nready;//检测到的事件个数
int maxfd = listenfd;//获取最大的文件描述符,目前listenfd最大
fd_set rset;//声明一个可读的集合
fd_set allset;
//以下两句是将集合清空
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);//将监听套接口放到allset当中
while (1)
{
rset = allset;
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("select");
}
if (nready == 0)
continue;
if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept");
for (i=0; i<FD_SETSIZE; i++)//将conn放到数组当中
{
if (client[i] < 0)
{
client[i] = conn;
break;
}
}
if (i == FD_SETSIZE)//没有找到空闲的位置,也就是连接数已经达到了上线,则给出提示
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
FD_SET(conn, &allset);
if (--nready <= 0)
continue;
}
for (i=0; i<=FD_SETSIZE; i++)
{
conn = client[i];
if (conn == -1)
continue;
if (FD_ISSET(conn, &rset))
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{//对方关闭
printf("client close\n");
FD_CLR(conn, &allset);//从集合中将此已连接接口清除
client[i] = -1;//并且还原默认标识
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= 0)
break;
}
}
}
return 0;
}
复制代码
另外,还需要关心一下最大描述符maxfd,当产生了新的连接套接口时,是需要将其进行更新的,于是修改代码如下:
好了,下面来编译运行一下:
可见,用单进程的方式也实现了多个客户端并发的处理。
另外,此处程序还有可优化的地方,就是处理已连接事件的时候,总是遍历FD_SETSIZE,可以再加一个变量,用来记录最大的不空闲的i值,修改代码如下:
int main(void)
{
/* signal(SIGCHLD, SIG_IGN);*/
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
/* if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)*/
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
/*servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");*/
/*inet_aton("127.0.0.1", &servaddr.sin_addr);*/
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen = sizeof(peeraddr);
int conn;
int client[FD_SETSIZE];//定义一个数组用来保存conn,其中FD_SETSIZE为最大文件描述符个数,不能超过它
int maxi = 0;//用来记录最大的可连接套接口存放的位置
int i;
for (i=0; i<FD_SETSIZE; i++)//对里面的数组都初使化为-1
client[i] = -1;
int nready;//检测到的事件个数
int maxfd = listenfd;//获取最大的文件描述符,目前listenfd最大
fd_set rset;//声明一个可读的集合
fd_set allset;
//以下两句是将集合清空
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);//将监听套接口放到allset当中
while (1)
{
rset = allset;
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("select");
}
if (nready == 0)
continue;
if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept");
for (i=0; i<FD_SETSIZE; i++)//将conn放到数组当中
{
if (client[i] < 0)
{
client[i] = conn;
if (i > maxi)
maxi = i;
break;
}
}
if (i == FD_SETSIZE)//没有找到空闲的位置,也就是连接数已经达到了上线,则给出提示
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
FD_SET(conn, &allset);
if (conn > maxfd)
maxfd = conn;
if (--nready <= 0)
continue;
}
for (i=0; i<=maxi; i++)//这时就会减少循环的次数
{
conn = client[i];
if (conn == -1)
continue;
if (FD_ISSET(conn, &rset))
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{//对方关闭
printf("client close\n");
FD_CLR(conn, &allset);//从集合中将此已连接接口清除
client[i] = -1;//并且还原默认标识
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= 0)
break;
}
}
}
return 0;
}
复制代码
最后再来看一下整个服务端的代码如下:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nread = read(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nwritten = write(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nwritten == 0)
continue;
bufp += nwritten;
nleft -= nwritten;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while (1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == -1 && errno == EINTR)
continue;
return ret;
}
}
ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while (1)
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < 0)
return ret;
else if (ret == 0)
return ret;
nread = ret;
int i;
for (i=0; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+1);
if (ret != i+1)
exit(EXIT_FAILURE);
return ret;
}
}
if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE);
bufp += nread;
}
return -1;
}
void echo_srv(int conn)
{
char recvbuf[1024];
while (1)
{
memset(recvbuf, 0, sizeof(recvbuf));
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
}
void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-1, NULL, WNOHANG) > 0)
;
}
int main(void)
{
/* signal(SIGCHLD, SIG_IGN);*/
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
/* if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)*/
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
/*servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");*/
/*inet_aton("127.0.0.1", &servaddr.sin_addr);*/
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen = sizeof(peeraddr);
int conn;
int client[FD_SETSIZE];//定义一个数组用来保存conn,其中FD_SETSIZE为最大文件描述符个数,不能超过它
int maxi = 0;//用来记录最大的可连接套接口存放的位置
int i;
for (i=0; i<FD_SETSIZE; i++)//对里面的数组都初使化为-1
client[i] = -1;
int nready;//检测到的事件个数
int maxfd = listenfd;//获取最大的文件描述符,目前listenfd最大
fd_set rset;//声明一个可读的集合
fd_set allset;
//以下两句是将集合清空
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);//将监听套接口放到allset当中
while (1)
{
rset = allset;
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("select");
}
if (nready == 0)
continue;
if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept");
for (i=0; i<FD_SETSIZE; i++)//将conn放到数组当中
{
if (client[i] < 0)
{
client[i] = conn;
if (i > maxi)
maxi = i;
break;
}
}
if (i == FD_SETSIZE)//没有找到空闲的位置,也就是连接数已经达到了上线,则给出提示
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
FD_SET(conn, &allset);
if (conn > maxfd)
maxfd = conn;
if (--nready <= 0)
continue;
}
for (i=0; i<=maxi; i++)
{
conn = client[i];
if (conn == -1)
continue;
if (FD_ISSET(conn, &rset))
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{//对方关闭
printf("client close\n");
FD_CLR(conn, &allset);//从集合中将此已连接接口清除
client[i] = -1;//并且还原默认标识
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= 0)
break;
}
}
}
return 0;
}
复制代码
echocli.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nread = read(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nwritten = write(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nwritten == 0)
continue;
bufp += nwritten;
nleft -= nwritten;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while (1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == -1 && errno == EINTR)
continue;
return ret;
}
}
ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while (1)
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < 0)
return ret;
else if (ret == 0)
return ret;
nread = ret;
int i;
for (i=0; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+1);
if (ret != i+1)
exit(EXIT_FAILURE);
return ret;
}
}
if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE);
bufp += nread;
}
return -1;
}
void echo_cli(int sock)
{
/*
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
writen(sock, sendbuf, strlen(sendbuf));
int ret = readline(sock, recvbuf, sizeof(recvbuf));
if (ret == -1)
ERR_EXIT("readline");
else if (ret == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
memset(sendbuf, 0, sizeof(sendbuf));
memset(recvbuf, 0, sizeof(recvbuf));
}
close(sock);
*/
fd_set rset;
FD_ZERO(&rset);
int nready;
int maxfd;
int fd_stdin = fileno(stdin);
if (fd_stdin > sock)
maxfd = fd_stdin;
else
maxfd = sock;
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
while (1)
{
FD_SET(fd_stdin, &rset);
FD_SET(sock, &rset);
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
ERR_EXIT("select");
if (nready == 0)
continue;
if (FD_ISSET(sock, &rset))
{
int ret = readline(sock, recvbuf, sizeof(recvbuf));
if (ret == -1)
ERR_EXIT("readline");
else if (ret == 0)
{
printf("server close\n");
break;
}
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
}
if (FD_ISSET(fd_stdin, &rset))
{
if (fgets(sendbuf, sizeof(sendbuf), stdin) == NULL)
break;
writen(sock, sendbuf, strlen(sendbuf));
memset(sendbuf, 0, sizeof(sendbuf));
}
}
close(sock);
}
void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
}
int main(void)
{
/*
signal(SIGPIPE, handle_sigpipe);
*/
signal(SIGPIPE, SIG_IGN);
int sock;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("connect");
struct sockaddr_in localaddr;
socklen_t addrlen = sizeof(localaddr);
if (getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < 0)
ERR_EXIT("getsockname");
printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
echo_cli(sock);
return 0;
}
复制代码
①close终止了数据传送的两个方向。
②shutdown可以有选择的终止某个方向的数据传送或者终止数据传送的两个方向。 但是,在实际应用中,可能会遇到这样一个情况,既使我们关闭了其中的某一端应用,但对于先前发送出去的数据能够得到对方的应答,下面可以用图来表示一下:
进一步说明一下"close终止了数据传送的两个方向":
查看一下man帮助:
①、SHUT_RD=1代表不能向管道中读取数据了。
②、SHUT_WR代表不能向管道中写入数据了。
③、SHUT_RDWR代表不能向管道中读写数据了。
所以正好反映了,"shutdown可以有选择的终止某个方向的数据传送或者终止数据传送的两个方向"。 【说明】:
③shutdown how=1就可以保证对等方接收到一个EOF字符【也就是向对方发送了FIN TCP段】,而不管其他进程是否已经打开了套接字。而close不能保证,直到套接字引用计数减为0时才发送。也就是说直到所有的进程都关闭了套接字。
对于上面提到的,要想让client只终止写的一端,可以用shutdown设置成SHUT_WR,也就是how=1:
从上面的解释中来看:"close不能保证,直到套接字引用计数减为0时才发送。也就是说直到所有的进程都关闭了套接字",并不是只要调用了close就会立马向对方发送FIN tcp段,而是需要引用计数减为0时才发送,回想一下原来用fork()实现服务端的代码:
而如果改用shutdown:
说了一系列理论过后,下面用实验来进一步加深对shutdown的理解,也就是用shutdown来改写客户端。
目前我们的回射客户/服务端程序都已经改用select函数来实现了,下面我们来做一个这样的实验:
echocli.c:
echosrv.c:
下面来看下效果:
可见,并没回将数据回射回客户端,并且客户端这边报了一个错:
这是哪打印出来的呢?分析一下代码流程:
而且客户端的这个错误,还会导致服务端也崩溃掉了。
那如果这样处理是否可以避免这个报错呢?
编译运行:
可以看出,客户端并没有报错了,但是服务端还是崩溃了,这是为什么呢?
所以,解决服务端崩溃很简单,处理如下:
再次编译运行:
这时,可以看到服务端打印了"client close",而且没有崩溃了,这是由于执行到了这个流程:
但是目前残留在管道当中的数据都无法回显给客户端,所以接下来用shutdown来进行改进,让它在客户端关闭的情况下还能回射回来:
编译如下:
从上面可以看到,客户端成功回显了,但是,发现有个小问题,就是当客户端回显数据之后,服务端没有把客户端给关闭掉,所以,程序应该有个bug,检查一下服务端的代码:
所以,修改如下:
编译运行:
这时问题就成功解决,此时的客户端就相对要完善一些了,实际上在编写TCP程序时是需要考虑到这一点的,实际上客户端程序还有一个地方是需要注意的:
所以可以加入一个flag进行处理:
当然程序运行效果是一样的,只是程序更严谨一些。
①、alarm【不常用,了解既可】
它的实现思路是这样的:
但是这种方案有一定的问题,因为闹钟可能会作为其它的用途,这时所设置的闹钟跟其它用途的闹钟会产生冲突,而这些冲突的解决,会比较麻烦,这里就不多讨论了,因为不使用它,仅了解既可,是不会用闹钟的方式来实现超时的。
②、套接字选项【不常用,了解既可】
SO_SNDTIMEO:发送的超时时间
SO_RCVTIMEO:接收的超时时间
复制代码
具体实现思路是这样的:
但是,也不会用这种方式,因为存在移植兼容的问题。
③、select【常用,这次学习的重点】
read_timeout函数封装:
下面会仔细分析是如何封装的,在封装之后,先看下函数原形:
所以,先不关心它是如何实现的,依照这个函数原形,其使用方法如下【伪代码】:
另外,如果想按照正常的方式来处理,可以将超时参数传0既可,如下:
/**
* read_timeout - 读超时检测函数,不含读操作
* @fd: 文件描述符
* @wait_seconds: 等待超时秒数,如果为0表示不检测超时
* 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT
*/
int read_timeout(int fd, unsigned int wait_seconds)
{
int ret = 0;//默认返回值为0,也就是未超时
if (wait_seconds > 0)
{//如果当传过来的超时时间大于0时才做select超时处理
fd_set read_fdset;
struct timeval timeout;//超时参数
FD_ZERO(&read_fdset);
FD_SET(fd, &read_fdset);//将描述符加入到可读集合中
//设置超时
timeout.tv_sec = wait_seconds;//只关心秒
timeout.tv_usec = 0;//不关心微秒
do
{
ret = select(fd + 1, &read_fdset, NULL, NULL, &timeout);//这时传入超时时间
} while (ret < 0 && errno == EINTR/** 如果是中断信号,则忽略 **/);
if (ret == 0)
{//表示已经超时
ret = -1;
errno = ETIMEDOUT;
}
else if (ret == 1)//表示没有超时,成功产生了可读事件
ret = 0;
}
return ret;
}
复制代码
【说明】:对于这个工具方法,重在理解是如何封装的,不一定要自己完全写,之后可以直接拿过来用。
write_timeout函数封装:
当理解了read_timeout函数的实现,对于写函数的实现就不难了,下面直接贴出来,基本类似,就不多说了:
/**
* write_timeout - 读超时检测函数,不含写操作
* @fd: 文件描述符
* @wait_seconds: 等待超时秒数,如果为0表示不检测超时
* 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT
*/
int write_timeout(int fd, unsigned int wait_seconds)
{
int ret = 0;
if (wait_seconds > 0)
{
fd_set write_fdset;//只是这时变成了写的集合
struct timeval timeout;
FD_ZERO(&write_fdset);
FD_SET(fd, &write_fdset);//将入到写集合中
timeout.tv_sec = wait_seconds;
timeout.tv_usec = 0;
do
{
ret = select(fd + 1, NULL, NULL, &write_fdset, &timeout);
} while (ret < 0 && errno == EINTR);
if (ret == 0)
{
ret = -1;
errno = ETIMEDOUT;
}
else if (ret == 1)
ret = 0;
}
return ret;
}
复制代码
accept_timeout函数封装:
关于这个函数的封装也不是太难理解,下面也以注释的方式贴出来:
/**
* accept_timeout - 带超时的accept
* @fd: 套接字
* @addr: 输出参数,返回对方地址
* @wait_seconds: 等待超时秒数,如果为0表示正常模式
* 成功(未超时)返回已连接套接字,超时返回-1并且errno = ETIMEDOUT
*/
int accept_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
{
int ret;//返回值
socklen_t addrlen = sizeof(struct sockaddr_in);//定义一个地址的长度
if (wait_seconds > 0)
{//如果超时时间大于0才进行select超时处理,否则不检测超时,直接调用accept
fd_set accept_fdset;//定义一个集合
struct timeval timeout;//定义一个超时结构体
FD_ZERO(&accept_fdset);
FD_SET(fd, &accept_fdset);//加入集合
timeout.tv_sec = wait_seconds;//设置超时时间
timeout.tv_usec = 0;
do
{
ret = select(fd + 1, &accept_fdset, NULL, NULL, &timeout);
} while (ret < 0 && errno == EINTR);
if (ret == -1)//代表select失败了
return -1;
else if (ret == 0)
{//超时了
errno = ETIMEDOUT;
return -1;
}
}
//如果走到这里,证明检测到了事件,则需要对其进行处理;或者是超时时间没有设置也会走到这
if (addr != NULL)//有地址的accept,这时不再阻塞
ret = accept(fd, (struct sockaddr*)addr, &addrlen);//此时返回连接套接字
else//无地址的accept
ret = accept(fd, NULL, NULL);
if (ret == -1)//表示accept失败
ERR_EXIT("accept");
return ret;
}
复制代码
connect_timeout函数封装:这个函数最难~
首先先明白一点,为啥要设置连接超时呢?这里需要从连接建立的三次握手说起,如下图:
下面来看下具体函数的实现,相比前几个,这个要复杂一些,因为不能够直接调用connect(),一旦调用了它,就意味着阻塞了,所以说希望不能阻塞的方式调用,所以需要将文件描述符设置为非阻塞模式,这里封装成了一个方法,如下:
/**
* activate_noblock - 设置I/O为非阻塞模式
* @fd: 文件描符符
*/
void activate_nonblock(int fd)
{
int ret;
int flags = fcntl(fd, F_GETFL);//获得原来的模式
if (flags == -1)
ERR_EXIT("fcntl");
flags |= O_NONBLOCK;//设置非阻塞模式
ret = fcntl(fd, F_SETFL, flags);
if (ret == -1)
ERR_EXIT("fcntl");
}
复制代码
另外,还配对一个清除非阻塞模式的方法:
/**
* deactivate_nonblock - 设置I/O为阻塞模式
* @fd: 文件描符符
*/
void deactivate_nonblock(int fd)
{
int ret;
int flags = fcntl(fd, F_GETFL);
if (flags == -1)
ERR_EXIT("fcntl");
flags &= ~O_NONBLOCK;
ret = fcntl(fd, F_SETFL, flags);
if (ret == -1)
ERR_EXIT("fcntl");
}
复制代码
【说明】:关于上面两个函数的实现,可以参考之前学习的fcntl函数。
/**
* connect_timeout - connect
* @fd: 套接字
* @addr: 要连接的对方地址
* @wait_seconds: 等待超时秒数,如果为0表示正常模式
* 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT
*/
int connect_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
{
int ret;
socklen_t addrlen = sizeof(struct sockaddr_in);
if (wait_seconds > 0)
activate_nonblock(fd);//设置套接字为非阻塞模式
ret = connect(fd, (struct sockaddr*)addr, addrlen);
if (ret < 0 && errno == EINPROGRESS)
{//连接正在处理,这时应该用select检测连接的超时
fd_set connect_fdset;//定义一个连接的集合
struct timeval timeout;
FD_ZERO(&connect_fdset);//将连接加入集合中
FD_SET(fd, &connect_fdset);
timeout.tv_sec = wait_seconds;//定义超时时间
timeout.tv_usec = 0;
do
{
/* 一量连接建立,套接字就可写,这里是将关心写的事件 */
ret = select(fd + 1, NULL, &connect_fdset, NULL, &timeout);
} while (ret < 0 && errno == EINTR);
if (ret == 0)
{//表示连接超时了
ret = -1;
errno = ETIMEDOUT;
}
else if (ret < 0)//连接失败了
return -1;
else if (ret == 1)
{//这时检测到有可写事件了
/* ret返回为1,可能有两种情况,一种是连接建立成功,一种是套接字产生错误,*/
/* 此时错误信息不会保存至errno变量中,因此,需要调用getsockopt来获取。 */
int err;
socklen_t socklen = sizeof(err);
int sockoptret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &socklen);//获取套接字的错误
if (sockoptret == -1)
{//表示获取套接字错误
return -1;
}
if (err == 0)
{//连接建立成功
ret = 0;
}
else
{//套接字产生错误
errno = err;
ret = -1;
}
}
}
if (wait_seconds > 0)
{
deactivate_nonblock(fd);//还原套接字为阻塞模式
}
return ret;
}
复制代码
下面用程序来使用一下上面的超时函数,还是用回射服务端/客户程序,但是不是用之前的,而是用一个最简单的,重在测试:
srv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int main(void)
{
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn;
if ((conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen)) < 0)
ERR_EXIT("accept");
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
return 0;
}
复制代码
cli.c:
#include "sysutil.h"
int main(void)
{
int sock;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int ret = connect_timeout(sock, &servaddr, 5);//设置超时为5秒
if (ret == -1 && errno == ETIMEDOUT)
{
printf("timeout...\n");
return 1;
}
else if (ret == -1)
ERR_EXIT("connect_timeout");
struct sockaddr_in localaddr;
socklen_t addrlen = sizeof(localaddr);
if (getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < 0)
ERR_EXIT("getsockname");
printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
return 0;
}
复制代码
为了看清楚connect_timeout内部执行的流程是怎么样,可以在其内部打印一些日志就知道了,加入日志如下:
int connect_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
{
int ret;
socklen_t addrlen = sizeof(struct sockaddr_in);
if (wait_seconds > 0)
activate_nonblock(fd);//设置套接字为非阻塞模式
ret = connect(fd, (struct sockaddr*)addr, addrlen);
if (ret < 0 && errno == EINPROGRESS)
{//连接正在处理,这时应该用select检测连接的超时
printf("AAAAA\n");
fd_set connect_fdset;//定义一个连接的集合
struct timeval timeout;
FD_ZERO(&connect_fdset);//将连接加入集合中
FD_SET(fd, &connect_fdset);
timeout.tv_sec = wait_seconds;//定义超时时间
timeout.tv_usec = 0;
do
{
/* 一量连接建立,套接字就可写,这里是将关心写的事件 */
ret = select(fd + 1, NULL, &connect_fdset, NULL, &timeout);
} while (ret < 0 && errno == EINTR);
if (ret == 0)
{//表示连接超时了
ret = -1;
errno = ETIMEDOUT;
}
else if (ret < 0)//连接失败了
return -1;
else if (ret == 1)
{//这时检测到有可写事件了
printf("BBBBB\n");
/* ret返回为1,可能有两种情况,一种是连接建立成功,一种是套接字产生错误,*/
/* 此时错误信息不会保存至errno变量中,因此,需要调用getsockopt来获取。 */
int err;
socklen_t socklen = sizeof(err);
int sockoptret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &socklen);//获取套接字的错误
if (sockoptret == -1)
{//表示获取套接字错误
return -1;
}
if (err == 0)
{//连接建立成功
printf("CCCCCC\n");
ret = 0;
}
else
{//套接字产生错误
printf("DDDDDDD\n");
errno = err;
ret = -1;
}
}
}
if (wait_seconds > 0)
{
deactivate_nonblock(fd);//还原套接字为阻塞模式
}
return ret;
}
复制代码
编译运行:
由于connect本地模拟不了超时效果,因为没有网络拥塞的情况,下面可以演示一个错误,就是在服务端没有运行时,直接运行客户端,如下:
这是在客户端这一段代码报出来的:
由于数据比较少,虽说已经封装好了超时的函数,但是不好演示网络拥塞导致的超时,不过,重在理解代码。
用select实现的并发服务器,能达到的并发数,受两方面限制
①、一个进程能打开的最大文件描述符限制。这可以通过调整内核参数。
那最大文件描述符是多少呢?可以用以下命令查看出来:
其实这个数是可以进行调整的,但是前提得是root用户才有权限调整,如下:
此时再更改:
以上是通过命令调整进程支持的最大描述符个数,那用代码能改么?当然也能,下面来看下如何编写:
获取资源的限制可以通过getrlimit()函数,查看man帮助:
其中第一个参数我们需要设置它:
于是乎编写一个测试程序先来获得最大描述符的个数:
编译运行:
下面来调整一下它的大小:
nofile_limit.c:
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int main(void)
{
struct rlimit rl;
if (getrlimit(RLIMIT_NOFILE, &rl) < 0)
ERR_EXIT("getrlimit");
printf("%d\n", (int)rl.rlim_max);
rl.rlim_cur = 2048;
rl.rlim_max = 2048;
if (setrlimit(RLIMIT_NOFILE, &rl) < 0)//调整描述符限制
ERR_EXIT("setrlimit");
if (getrlimit(RLIMIT_NOFILE, &rl) < 0)//再次打印修改后的大小
ERR_EXIT("getrlimit");
printf("%d\n", (int)rl.rlim_max);
return 0;
}
复制代码
编译运行:
那么下面先将切至root用户,然后再执行该程序:
但是,它只能改变当前进程的大小,如果用ulimit -n查看,看到的大小是父进程的,还是没有变:
关于调整内核增大描述符个数,是很容易改变select并发数的,但是还有另外一个因素,就不那么容易了,因为得编译内核才行,如下:
关于最大限制,下面用程序来验证下,先编写一个简单客户端,不断循环向服务端连接,看最终连接成功的个数,不加其它逻辑:
conntest.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int main(void)
{
int count = 0;//用来记数,当客户端与服务端连接成功则加1
while(1)
{
int sock;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
ERR_EXIT("socket");
}
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("connect");
struct sockaddr_in localaddr;
socklen_t addrlen = sizeof(localaddr);
if (getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < 0)
ERR_EXIT("getsockname");
printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
printf("count = %d\n", ++count);//将个数打印出来
}
return 0;
}
复制代码
服务端还是用之前的,代码如下:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nread = read(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nwritten = write(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nwritten == 0)
continue;
bufp += nwritten;
nleft -= nwritten;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while (1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == -1 && errno == EINTR)
continue;
return ret;
}
}
ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while (1)
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < 0)
return ret;
else if (ret == 0)
return ret;
nread = ret;
int i;
for (i=0; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+1);
if (ret != i+1)
exit(EXIT_FAILURE);
return ret;
}
}
if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE);
bufp += nread;
}
return -1;
}
void echo_srv(int conn)
{
char recvbuf[1024];
while (1)
{
memset(recvbuf, 0, sizeof(recvbuf));
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
}
void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-1, NULL, WNOHANG) > 0)
;
}
void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
}
int main(void)
{
signal(SIGPIPE, handle_sigpipe);
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn;
int i;
int client[FD_SETSIZE];
int maxi = 0;
for (i=0; i<FD_SETSIZE; i++)
client[i] = -1;
int nready;
int maxfd = listenfd;
fd_set rset;
fd_set allset;
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
while (1)
{
rset = allset;
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("select");
}
if (nready == 0)
continue;
if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept");
for (i=0; i<FD_SETSIZE; i++)
{
if (client[i] < 0)
{
client[i] = conn;
if (i > maxi)
maxi = i;
break;
}
}
if (i == FD_SETSIZE)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
FD_SET(conn, &allset);
if (conn > maxfd)
maxfd = conn;
if (--nready <= 0)
continue;
}
for (i=0; i<=maxi; i++)
{
conn = client[i];
if (conn == -1)
continue;
if (FD_ISSET(conn, &rset))
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
FD_CLR(conn, &allset);
client[i] = -1;
close(conn);
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= 0)
break;
}
}
}
return 0;
}
复制代码
编译运行:
其中异常是报在这一段代码:
为了看到服务端的连接数,也打印一下个数:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nread = read(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nwritten = write(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nwritten == 0)
continue;
bufp += nwritten;
nleft -= nwritten;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while (1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == -1 && errno == EINTR)
continue;
return ret;
}
}
ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while (1)
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < 0)
return ret;
else if (ret == 0)
return ret;
nread = ret;
int i;
for (i=0; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+1);
if (ret != i+1)
exit(EXIT_FAILURE);
return ret;
}
}
if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE);
bufp += nread;
}
return -1;
}
void echo_srv(int conn)
{
char recvbuf[1024];
while (1)
{
memset(recvbuf, 0, sizeof(recvbuf));
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
}
void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-1, NULL, WNOHANG) > 0)
;
}
void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
}
int main(void)
{
int count = 0;//客户端连接个数累加器
signal(SIGPIPE, handle_sigpipe);
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn;
int i;
int client[FD_SETSIZE];
int maxi = 0;
for (i=0; i<FD_SETSIZE; i++)
client[i] = -1;
int nready;
int maxfd = listenfd;
fd_set rset;
fd_set allset;
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
while (1)
{
rset = allset;
nready = select(maxfd+1, &rset, NULL, NULL, NULL);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("select");
}
if (nready == 0)
continue;
if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept");
for (i=0; i<FD_SETSIZE; i++)
{
if (client[i] < 0)
{
client[i] = conn;
if (i > maxi)
maxi = i;
break;
}
}
if (i == FD_SETSIZE)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
printf("count = %d\n", ++count);//当客户端连接成功,打印一下个数
FD_SET(conn, &allset);
if (conn > maxfd)
maxfd = conn;
if (--nready <= 0)
continue;
}
for (i=0; i<=maxi; i++)
{
conn = client[i];
if (conn == -1)
continue;
if (FD_ISSET(conn, &rset))
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
FD_CLR(conn, &allset);
client[i] = -1;
close(conn);
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= 0)
break;
}
}
}
return 0;
}
复制代码
编译运行:
下面来解释一下为啥服务端会收到了很多client close:
由于客户端有关闭,所以服务端打印的count=1021个就不准了,下面来解决一下客户端关闭的问题,其很简单的,就是在发送1022个连接时,先休眠一段时间,然后再让进程退出既可,具体如下:
编译运行:
②、select中的fd_set集合容量的限制(FD_SETSIZE,该宏默认是1024) ,这需要重新编译内核。
这是第二个受限的地方,从代码中来解释:
下面要学习一个函数,它没有FD_SETSIZE的限制,如下:
通过man帮助查看下,struct pollfd结构体的结构:
那结构体中的events都可以取哪些值呢,如下:
对比select函数如下:
下面用poll函数来改装我们的代码,主要是改装服务端,来突然FD_SETSIZE大小的限制,下面会一点点对比原来的做法进行改装:
修改如下:
替换如下:
下面当事件产生了,则需要判断一下是哪些文件描述符产生了事件,那poll方式是如何判断的呢?具体如下:
替换如下:
然后接受客户端的连接,并加保存起来,修改如下:
替换如下:
接下来,处理已连接套接口事件,具体修改如下:
替换如下:
接下来编译运行一下:
需要包含头文件,如下:
可见改用poll函数之后的程序运行一切正常,下面再来测试并发数:
下面就来调整描述符最大个数为2048,注意:ulimit调整的是当前进程,所以要想服务端与客户端都突破1024这个限制,都需要切至root用户进行更改,切记切记~
这时再运行看下这时的连接数是否能突破1024呢?
所以,这就是poll针对select的改进,下面贴出服务端,客户端的代码:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <poll.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nread = read(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
bufp += nread;
nleft -= nread;
}
return count;
}
ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf;
while (nleft > 0)
{
if ((nwritten = write(fd, bufp, nleft)) < 0)
{
if (errno == EINTR)
continue;
return -1;
}
else if (nwritten == 0)
continue;
bufp += nwritten;
nleft -= nwritten;
}
return count;
}
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while (1)
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == -1 && errno == EINTR)
continue;
return ret;
}
}
ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while (1)
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < 0)
return ret;
else if (ret == 0)
return ret;
nread = ret;
int i;
for (i=0; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+1);
if (ret != i+1)
exit(EXIT_FAILURE);
return ret;
}
}
if (nread > nleft)
exit(EXIT_FAILURE);
nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE);
bufp += nread;
}
return -1;
}
void echo_srv(int conn)
{
char recvbuf[1024];
while (1)
{
memset(recvbuf, 0, sizeof(recvbuf));
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
}
void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-1, NULL, WNOHANG) > 0)
;
}
void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
}
int main(void)
{
int count = 0;
signal(SIGPIPE, handle_sigpipe);
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
ERR_EXIT("socket");
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");
struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn;
int i;
struct pollfd client[2048];
int maxi = 0;
for (i=0; i<2048; i++)
client[i].fd = -1;
int nready;
client[0].fd = listenfd;//第一个事件是监听套接字
client[0].events = POLLIN;//代表可读事件
while (1)
{
nready = poll(client, maxi+1, -1);
if (nready == -1)
{
if (errno == EINTR)
continue;
ERR_EXIT("select");
}
if (nready == 0)
continue;
if (client[0].revents & POLLIN)
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -1)
ERR_EXIT("accept");
for (i=0; i<2048; i++)
{
if (client[i].fd < 0)
{
client[i].fd = conn;
if (i > maxi)
maxi = i;
break;
}
}
if (i == 2048)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
printf("count = %d\n", ++count);
client[i].events = POLLIN;
if (--nready <= 0)
continue;
}
for (i=1; i<=maxi; i++)
{
conn = client[i].fd;
if (conn == -1)
continue;
if (client[i].events & POLLIN)
{
char recvbuf[1024] = {0};
int ret = readline(conn, recvbuf, 1024);
if (ret == -1)
ERR_EXIT("readline");
if (ret == 0)
{
printf("client close\n");
client[i].fd = -1;
close(conn);
}
fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
if (--nready <= 0)
break;
}
}
}
return 0;
}
复制代码
contest.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
int main(void)
{
int count = 0;//用来记数,当客户端与服务端连接成功则加1
while(1)
{
int sock;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
sleep(4);
ERR_EXIT("socket");
}
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(5188);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("connect");
struct sockaddr_in localaddr;
socklen_t addrlen = sizeof(localaddr);
if (getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < 0)
ERR_EXIT("getsockname");
printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
printf("count = %d\n", ++count);//将个数打印出来
}
return 0;
}
复制代码