| /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */ |
| /* See COPYRIGHT */ |
| |
| /* |
| * Generic RPC packet multiplexor. Inspired by but not derived from |
| * Plan 9 kernel. Originally developed as part of Tra, later used in |
| * libnventi, and then finally split out into a generic library. |
| */ |
| |
| #include <u.h> |
| #include <libc.h> |
| #include <mux.h> |
| |
| static int gettag(Mux*, Muxrpc*); |
| static void puttag(Mux*, Muxrpc*); |
| static void enqueue(Mux*, Muxrpc*); |
| static void dequeue(Mux*, Muxrpc*); |
| |
| void |
| muxinit(Mux *mux) |
| { |
| memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk)); |
| mux->tagrend.l = &mux->lk; |
| mux->rpcfork.l = &mux->lk; |
| mux->sleep.next = &mux->sleep; |
| mux->sleep.prev = &mux->sleep; |
| } |
| |
| static Muxrpc* |
| allocmuxrpc(Mux *mux) |
| { |
| Muxrpc *r; |
| |
| /* must malloc because stack could be private */ |
| r = mallocz(sizeof(Muxrpc), 1); |
| if(r == nil){ |
| werrstr("mallocz: %r"); |
| return nil; |
| } |
| r->mux = mux; |
| r->r.l = &mux->lk; |
| r->waiting = 1; |
| |
| return r; |
| } |
| |
| static int |
| tagmuxrpc(Muxrpc *r, void *tx) |
| { |
| int tag; |
| Mux *mux; |
| |
| mux = r->mux; |
| /* assign the tag, add selves to response queue */ |
| qlock(&mux->lk); |
| tag = gettag(mux, r); |
| /*print("gettag %p %d\n", r, tag); */ |
| enqueue(mux, r); |
| qunlock(&mux->lk); |
| |
| /* actually send the packet */ |
| if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){ |
| werrstr("settag/send tag %d: %r", tag); |
| fprint(2, "%r\n"); |
| qlock(&mux->lk); |
| dequeue(mux, r); |
| puttag(mux, r); |
| qunlock(&mux->lk); |
| return -1; |
| } |
| return 0; |
| } |
| |
| void |
| muxmsgandqlock(Mux *mux, void *p) |
| { |
| int tag; |
| Muxrpc *r2; |
| |
| tag = mux->gettag(mux, p) - mux->mintag; |
| /*print("mux tag %d\n", tag); */ |
| qlock(&mux->lk); |
| /* hand packet to correct sleeper */ |
| if(tag < 0 || tag >= mux->mwait){ |
| fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); |
| /* must leak packet! don't know how to free it! */ |
| return; |
| } |
| r2 = mux->wait[tag]; |
| if(r2 == nil || r2->prev == nil){ |
| fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag); |
| /* must leak packet! don't know how to free it! */ |
| return; |
| } |
| r2->p = p; |
| dequeue(mux, r2); |
| rwakeup(&r2->r); |
| } |
| |
| void |
| electmuxer(Mux *mux) |
| { |
| Muxrpc *rpc; |
| |
| /* if there is anyone else sleeping, wake them to mux */ |
| for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){ |
| if(!rpc->async){ |
| mux->muxer = rpc; |
| rwakeup(&rpc->r); |
| return; |
| } |
| } |
| mux->muxer = nil; |
| } |
| |
| void* |
| muxrpc(Mux *mux, void *tx) |
| { |
| int tag; |
| Muxrpc *r; |
| void *p; |
| |
| if((r = allocmuxrpc(mux)) == nil) |
| return nil; |
| |
| if((tag = tagmuxrpc(r, tx)) < 0) |
| return nil; |
| |
| qlock(&mux->lk); |
| /* wait for our packet */ |
| while(mux->muxer && mux->muxer != r && !r->p) |
| rsleep(&r->r); |
| |
| /* if not done, there's no muxer: start muxing */ |
| if(!r->p){ |
| if(mux->muxer != nil && mux->muxer != r) |
| abort(); |
| mux->muxer = r; |
| while(!r->p){ |
| qunlock(&mux->lk); |
| _muxrecv(mux, 1, &p); |
| if(p == nil){ |
| /* eof -- just give up and pass the buck */ |
| qlock(&mux->lk); |
| dequeue(mux, r); |
| break; |
| } |
| muxmsgandqlock(mux, p); |
| } |
| electmuxer(mux); |
| } |
| p = r->p; |
| puttag(mux, r); |
| qunlock(&mux->lk); |
| if(p == nil) |
| werrstr("unexpected eof"); |
| return p; |
| } |
| |
| Muxrpc* |
| muxrpcstart(Mux *mux, void *tx) |
| { |
| int tag; |
| Muxrpc *r; |
| |
| if((r = allocmuxrpc(mux)) == nil) |
| return nil; |
| r->async = 1; |
| if((tag = tagmuxrpc(r, tx)) < 0) |
| return nil; |
| return r; |
| } |
| |
| int |
| muxrpccanfinish(Muxrpc *r, void **vp) |
| { |
| void *p; |
| Mux *mux; |
| int ret; |
| |
| mux = r->mux; |
| qlock(&mux->lk); |
| ret = 1; |
| if(!r->p && !mux->muxer){ |
| mux->muxer = r; |
| while(!r->p){ |
| qunlock(&mux->lk); |
| p = nil; |
| if(!_muxrecv(mux, 0, &p)) |
| ret = 0; |
| if(p == nil){ |
| qlock(&mux->lk); |
| break; |
| } |
| muxmsgandqlock(mux, p); |
| } |
| electmuxer(mux); |
| } |
| p = r->p; |
| if(p) |
| puttag(mux, r); |
| qunlock(&mux->lk); |
| *vp = p; |
| return ret; |
| } |
| |
| static void |
| enqueue(Mux *mux, Muxrpc *r) |
| { |
| r->next = mux->sleep.next; |
| r->prev = &mux->sleep; |
| r->next->prev = r; |
| r->prev->next = r; |
| } |
| |
| static void |
| dequeue(Mux *mux, Muxrpc *r) |
| { |
| r->next->prev = r->prev; |
| r->prev->next = r->next; |
| r->prev = nil; |
| r->next = nil; |
| } |
| |
| static int |
| gettag(Mux *mux, Muxrpc *r) |
| { |
| int i, mw; |
| Muxrpc **w; |
| |
| for(;;){ |
| /* wait for a free tag */ |
| while(mux->nwait == mux->mwait){ |
| if(mux->mwait < mux->maxtag-mux->mintag){ |
| mw = mux->mwait; |
| if(mw == 0) |
| mw = 1; |
| else |
| mw <<= 1; |
| w = realloc(mux->wait, mw*sizeof(w[0])); |
| if(w == nil) |
| return -1; |
| memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0])); |
| mux->wait = w; |
| mux->freetag = mux->mwait; |
| mux->mwait = mw; |
| break; |
| } |
| rsleep(&mux->tagrend); |
| } |
| |
| i=mux->freetag; |
| if(mux->wait[i] == 0) |
| goto Found; |
| for(; i<mux->mwait; i++) |
| if(mux->wait[i] == 0) |
| goto Found; |
| for(i=0; i<mux->freetag; i++) |
| if(mux->wait[i] == 0) |
| goto Found; |
| /* should not fall out of while without free tag */ |
| fprint(2, "libfs: nwait botch\n"); |
| abort(); |
| } |
| |
| Found: |
| mux->nwait++; |
| mux->wait[i] = r; |
| r->tag = i+mux->mintag; |
| return r->tag; |
| } |
| |
| static void |
| puttag(Mux *mux, Muxrpc *r) |
| { |
| int i; |
| |
| i = r->tag - mux->mintag; |
| assert(mux->wait[i] == r); |
| mux->wait[i] = nil; |
| mux->nwait--; |
| mux->freetag = i; |
| rwakeup(&mux->tagrend); |
| free(r); |
| } |