| /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ |
| /* See COPYRIGHT */ |
| |
| #include <u.h> |
| #include <libc.h> |
| #include <mux.h> |
| |
| /* |
| * If you fork off two procs running muxrecvproc and muxsendproc, |
| * then muxrecv/muxsend (and thus muxrpc) will never block except on |
| * rendevouses, which is nice when it's running in one thread of many. |
| */ |
| void |
| _muxrecvproc(void *v) |
| { |
| void *p; |
| Mux *mux; |
| Muxqueue *q; |
| |
| mux = v; |
| q = _muxqalloc(); |
| |
| qlock(&mux->lk); |
| mux->readq = q; |
| qlock(&mux->inlk); |
| rwakeup(&mux->rpcfork); |
| qunlock(&mux->lk); |
| |
| while((p = mux->recv(mux)) != nil) |
| if(_muxqsend(q, p) < 0){ |
| free(p); |
| break; |
| } |
| qunlock(&mux->inlk); |
| qlock(&mux->lk); |
| _muxqhangup(q); |
| p = nil; |
| while(_muxnbqrecv(q, &p) && p != nil){ |
| free(p); |
| p = nil; |
| } |
| free(q); |
| mux->readq = nil; |
| rwakeup(&mux->rpcfork); |
| qunlock(&mux->lk); |
| } |
| |
| void |
| _muxsendproc(void *v) |
| { |
| Muxqueue *q; |
| void *p; |
| Mux *mux; |
| |
| mux = v; |
| q = _muxqalloc(); |
| |
| qlock(&mux->lk); |
| mux->writeq = q; |
| qlock(&mux->outlk); |
| rwakeup(&mux->rpcfork); |
| qunlock(&mux->lk); |
| |
| while((p = _muxqrecv(q)) != nil) |
| if(mux->send(mux, p) < 0) |
| break; |
| qunlock(&mux->outlk); |
| qlock(&mux->lk); |
| _muxqhangup(q); |
| while(_muxnbqrecv(q, &p)) |
| free(p); |
| free(q); |
| mux->writeq = nil; |
| rwakeup(&mux->rpcfork); |
| qunlock(&mux->lk); |
| return; |
| } |
| |
| int |
| _muxrecv(Mux *mux, int canblock, void **vp) |
| { |
| void *p; |
| int ret; |
| |
| qlock(&mux->lk); |
| if(mux->readq){ |
| qunlock(&mux->lk); |
| if(canblock){ |
| *vp = _muxqrecv(mux->readq); |
| return 1; |
| } |
| return _muxnbqrecv(mux->readq, vp); |
| } |
| |
| qlock(&mux->inlk); |
| qunlock(&mux->lk); |
| if(canblock){ |
| p = mux->recv(mux); |
| ret = 1; |
| }else{ |
| if(mux->nbrecv) |
| ret = mux->nbrecv(mux, &p); |
| else{ |
| /* send eof, not "no packet ready" */ |
| p = nil; |
| ret = 1; |
| } |
| } |
| qunlock(&mux->inlk); |
| *vp = p; |
| return ret; |
| } |
| |
| int |
| _muxsend(Mux *mux, void *p) |
| { |
| qlock(&mux->lk); |
| /* |
| if(mux->state != VtStateConnected){ |
| packetfree(p); |
| werrstr("not connected"); |
| qunlock(&mux->lk); |
| return -1; |
| } |
| */ |
| if(mux->writeq){ |
| qunlock(&mux->lk); |
| if(_muxqsend(mux->writeq, p) < 0){ |
| free(p); |
| return -1; |
| } |
| return 0; |
| } |
| |
| qlock(&mux->outlk); |
| qunlock(&mux->lk); |
| if(mux->send(mux, p) < 0){ |
| qunlock(&mux->outlk); |
| /* vthangup(mux); */ |
| return -1; |
| } |
| qunlock(&mux->outlk); |
| return 0; |
| } |
| |