Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files. =================================================================== RCS file: /ftp/cvs/cvsroot/src/lib/librumpclient/rumpclient.c,v rcsdiff: /ftp/cvs/cvsroot/src/lib/librumpclient/rumpclient.c,v: warning: Unknown phrases like `commitid ...;' are present. retrieving revision 1.14 retrieving revision 1.15 diff -u -p -r1.14 -r1.15 --- src/lib/librumpclient/rumpclient.c 2011/01/09 14:10:03 1.14 +++ src/lib/librumpclient/rumpclient.c 2011/01/10 19:49:43 1.15 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpclient.c,v 1.14 2011/01/09 14:10:03 pooka Exp $ */ +/* $NetBSD: rumpclient.c,v 1.15 2011/01/10 19:49:43 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -33,6 +33,7 @@ __RCSID("$NetBSD"); #include +#include #include #include @@ -60,6 +61,7 @@ __RCSID("$NetBSD"); int (*host_socket)(int, int, int); int (*host_close)(int); int (*host_connect)(int, const struct sockaddr *, socklen_t); +int (*host_fcntl)(int, int, ...); int (*host_poll)(struct pollfd *, nfds_t, int); int (*host_pollts)(struct pollfd *, nfds_t, const struct timespec *, const sigset_t *); @@ -74,28 +76,14 @@ static struct spclient clispc = { .spc_fd = -1, }; -/* - * This version of waitresp is optimized for single-threaded clients - * and is required by signal-safe clientside rump syscalls. - */ - -static void -releasercvlock(struct spclient *spc) -{ - - pthread_mutex_lock(&spc->spc_mtx); - if (spc->spc_istatus == SPCSTATUS_WANTED) - kickall(spc); - spc->spc_istatus = SPCSTATUS_FREE; -} - +static int kq; static sigset_t fullset; + static int waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask) { - struct pollfd pfd; - int rv = 0; + pthread_mutex_lock(&spc->spc_mtx); sendunlockl(spc); rw->rw_error = 0; @@ -103,32 +91,41 @@ waitresp(struct spclient *spc, struct re && spc->spc_state != SPCSTATE_DYING){ /* are we free to receive? */ if (spc->spc_istatus == SPCSTATUS_FREE) { + struct kevent kev[8]; + int gotresp, dosig, rv, i; + spc->spc_istatus = SPCSTATUS_BUSY; pthread_mutex_unlock(&spc->spc_mtx); - pfd.fd = spc->spc_fd; - pfd.events = POLLIN; - - switch (readframe(spc)) { - case 0: - releasercvlock(spc); - pthread_mutex_unlock(&spc->spc_mtx); - host_pollts(&pfd, 1, NULL, mask); - pthread_mutex_lock(&spc->spc_mtx); - continue; - case -1: - releasercvlock(spc); - rv = errno; - spc->spc_state = SPCSTATE_DYING; - continue; - default: - break; - } + dosig = 0; + for (gotresp = 0; !gotresp; ) { + switch (readframe(spc)) { + case 0: + rv = kevent(kq, NULL, 0, + kev, __arraycount(kev), NULL); + assert(rv > 0); + for (i = 0; i < rv; i++) { + if (kev[i].filter + == EVFILT_SIGNAL) + dosig++; + } + if (dosig) + goto cleanup; + + continue; + case -1: + spc->spc_state = SPCSTATE_DYING; + goto cleanup; + default: + break; + } - switch (spc->spc_hdr.rsp_class) { + switch (spc->spc_hdr.rsp_class) { case RUMPSP_RESP: case RUMPSP_ERROR: kickwaiter(spc); + gotresp = spc->spc_hdr.rsp_reqno == + rw->rw_reqno; break; case RUMPSP_REQ: handlereq(spc); @@ -136,9 +133,22 @@ waitresp(struct spclient *spc, struct re default: /* panic */ break; + } } - releasercvlock(spc); + cleanup: + pthread_mutex_lock(&spc->spc_mtx); + if (spc->spc_istatus == SPCSTATUS_WANTED) + kickall(spc); + spc->spc_istatus = SPCSTATUS_FREE; + + /* take one for the team */ + if (dosig) { + pthread_mutex_unlock(&spc->spc_mtx); + pthread_sigmask(SIG_SETMASK, mask, NULL); + pthread_sigmask(SIG_SETMASK, &fullset, NULL); + pthread_mutex_lock(&spc->spc_mtx); + } } else { spc->spc_istatus = SPCSTATUS_WANTED; pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx); @@ -148,8 +158,6 @@ waitresp(struct spclient *spc, struct re pthread_mutex_unlock(&spc->spc_mtx); pthread_cond_destroy(&rw->rw_cv); - if (rv) - return rv; if (spc->spc_state == SPCSTATE_DYING) return ENOTCONN; return rw->rw_error; @@ -385,8 +393,9 @@ static struct sockaddr *serv_sa; static int doconnect(void) { + struct kevent kev[NSIG+1]; char banner[MAXBANNER]; - int s, error; + int s, error, flags, i; ssize_t n; s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0); @@ -421,8 +430,34 @@ doconnect(void) } banner[n] = '\0'; + flags = host_fcntl(s, F_GETFL, 0); + if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { + fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n"); + errno = EINVAL; + return -1; + } + /* parse the banner some day */ + /* setup kqueue, we want all signals and the fd */ + if ((kq = kqueue()) == -1) { + error = errno; + fprintf(stderr, "rump_sp: cannot setup kqueue"); + errno = error; + return -1; + } + + for (i = 0; i < NSIG; i++) { + EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0); + } + EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0); + if (kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) { + error = errno; + fprintf(stderr, "rump_sp: kevent() failed"); + errno = error; + return -1; + } + clispc.spc_fd = s; TAILQ_INIT(&clispc.spc_respwait); pthread_mutex_init(&clispc.spc_mtx, NULL); @@ -455,6 +490,7 @@ rumpclient_init() FINDSYM2(socket,__socket30); FINDSYM(close); FINDSYM(connect); + FINDSYM(fcntl); FINDSYM(poll); FINDSYM(pollts); FINDSYM(read); @@ -522,6 +558,8 @@ rumpclient_fork_init(struct rumpclient_f int error; host_close(clispc.spc_fd); + host_close(kq); + kq = -1; memset(&clispc, 0, sizeof(clispc)); clispc.spc_fd = -1;