blob: 3a2a933197b5a0168577476360ec3e0ecf38b915 [file] [log] [blame]
/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */
#include <u.h>
#include <libc.h>
#include <mux.h>
/*
* If you fork off two procs running muxrecvproc and muxsendproc,
* then muxrecv/muxsend (and thus muxrpc) will never block except on
* rendevouses, which is nice when it's running in one thread of many.
*/
void
_muxrecvproc(void *v)
{
void *p;
Mux *mux;
Muxqueue *q;
mux = v;
q = _muxqalloc();
qlock(&mux->lk);
mux->readq = q;
qlock(&mux->inlk);
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
while((p = mux->recv(mux)) != nil)
if(_muxqsend(q, p) < 0){
free(p);
break;
}
qunlock(&mux->inlk);
qlock(&mux->lk);
_muxqhangup(q);
p = nil;
while(_muxnbqrecv(q, &p) && p != nil){
free(p);
p = nil;
}
free(q);
mux->readq = nil;
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
}
void
_muxsendproc(void *v)
{
Muxqueue *q;
void *p;
Mux *mux;
mux = v;
q = _muxqalloc();
qlock(&mux->lk);
mux->writeq = q;
qlock(&mux->outlk);
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
while((p = _muxqrecv(q)) != nil)
if(mux->send(mux, p) < 0)
break;
qunlock(&mux->outlk);
qlock(&mux->lk);
_muxqhangup(q);
while(_muxnbqrecv(q, &p))
free(p);
free(q);
mux->writeq = nil;
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
return;
}
int
_muxrecv(Mux *mux, int canblock, void **vp)
{
void *p;
int ret;
qlock(&mux->lk);
if(mux->readq){
qunlock(&mux->lk);
if(canblock){
*vp = _muxqrecv(mux->readq);
return 1;
}
return _muxnbqrecv(mux->readq, vp);
}
qlock(&mux->inlk);
qunlock(&mux->lk);
if(canblock){
p = mux->recv(mux);
ret = 1;
}else{
if(mux->nbrecv)
ret = mux->nbrecv(mux, &p);
else{
/* send eof, not "no packet ready" */
p = nil;
ret = 1;
}
}
qunlock(&mux->inlk);
*vp = p;
return ret;
}
int
_muxsend(Mux *mux, void *p)
{
qlock(&mux->lk);
/*
if(mux->state != VtStateConnected){
packetfree(p);
werrstr("not connected");
qunlock(&mux->lk);
return -1;
}
*/
if(mux->writeq){
qunlock(&mux->lk);
if(_muxqsend(mux->writeq, p) < 0){
free(p);
return -1;
}
return 0;
}
qlock(&mux->outlk);
qunlock(&mux->lk);
if(mux->send(mux, p) < 0){
qunlock(&mux->outlk);
/* vthangup(mux); */
return -1;
}
qunlock(&mux->outlk);
return 0;
}