|  | /* | 
|  | * Multiplexed Venti client.  It would be nice if we | 
|  | * could turn this into a generic library routine rather | 
|  | * than keep it Venti specific.  A user-level 9P client | 
|  | * could use something like this too. | 
|  | * | 
|  | * (Actually it does - this should be replaced with libmux, | 
|  | * which should be renamed librpcmux.) | 
|  | * | 
|  | * This is a little more complicated than it might be | 
|  | * because we want it to work well within and without libthread. | 
|  | * | 
|  | * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel. | 
|  | */ | 
|  |  | 
|  | #include <u.h> | 
|  | #include <libc.h> | 
|  | #include <venti.h> | 
|  |  | 
|  | typedef struct Rwait Rwait; | 
|  | struct Rwait | 
|  | { | 
|  | Rendez r; | 
|  | Packet *p; | 
|  | int done; | 
|  | int sleeping; | 
|  | }; | 
|  |  | 
|  | static int gettag(VtConn*, Rwait*); | 
|  | static void puttag(VtConn*, Rwait*, int); | 
|  | static void muxrpc(VtConn*, Packet*); | 
|  |  | 
|  | Packet* | 
|  | _vtrpc(VtConn *z, Packet *p, VtFcall *tx) | 
|  | { | 
|  | int i; | 
|  | uchar tag, buf[2], *top; | 
|  | Rwait *r, *rr; | 
|  |  | 
|  | /* must malloc because stack could be private */ | 
|  | r = vtmallocz(sizeof(Rwait)); | 
|  |  | 
|  | qlock(&z->lk); | 
|  | r->r.l = &z->lk; | 
|  | tag = gettag(z, r); | 
|  | if(tx){ | 
|  | /* vtfcallrpc can't print packet because it doesn't have tag */ | 
|  | tx->tag = tag; | 
|  | if(chattyventi) | 
|  | fprint(2, "%s -> %F\n", argv0, tx); | 
|  | } | 
|  |  | 
|  | /* slam tag into packet */ | 
|  | top = packetpeek(p, buf, 0, 2); | 
|  | if(top == nil){ | 
|  | packetfree(p); | 
|  | return nil; | 
|  | } | 
|  | if(top == buf){ | 
|  | werrstr("first two bytes must be in same packet fragment"); | 
|  | packetfree(p); | 
|  | vtfree(r); | 
|  | return nil; | 
|  | } | 
|  | top[1] = tag; | 
|  | qunlock(&z->lk); | 
|  | if(vtsend(z, p) < 0){ | 
|  | vtfree(r); | 
|  | return nil; | 
|  | } | 
|  |  | 
|  | qlock(&z->lk); | 
|  | /* wait for the muxer to give us our packet */ | 
|  | r->sleeping = 1; | 
|  | z->nsleep++; | 
|  | while(z->muxer && !r->done) | 
|  | rsleep(&r->r); | 
|  | z->nsleep--; | 
|  | r->sleeping = 0; | 
|  |  | 
|  | /* if not done, there's no muxer: start muxing */ | 
|  | if(!r->done){ | 
|  | if(z->muxer) | 
|  | abort(); | 
|  | z->muxer = 1; | 
|  | while(!r->done){ | 
|  | qunlock(&z->lk); | 
|  | if((p = vtrecv(z)) == nil){ | 
|  | werrstr("unexpected eof on venti connection"); | 
|  | z->muxer = 0; | 
|  | vtfree(r); | 
|  | return nil; | 
|  | } | 
|  | qlock(&z->lk); | 
|  | muxrpc(z, p); | 
|  | } | 
|  | z->muxer = 0; | 
|  | /* if there is anyone else sleeping, wake first unfinished to mux */ | 
|  | if(z->nsleep) | 
|  | for(i=0; i<256; i++){ | 
|  | rr = z->wait[i]; | 
|  | if(rr && rr->sleeping && !rr->done){ | 
|  | rwakeup(&rr->r); | 
|  | break; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | p = r->p; | 
|  | puttag(z, r, tag); | 
|  | vtfree(r); | 
|  | qunlock(&z->lk); | 
|  | return p; | 
|  | } | 
|  |  | 
|  | Packet* | 
|  | vtrpc(VtConn *z, Packet *p) | 
|  | { | 
|  | return _vtrpc(z, p, nil); | 
|  | } | 
|  |  | 
|  | static int | 
|  | gettag(VtConn *z, Rwait *r) | 
|  | { | 
|  | int i; | 
|  |  | 
|  | Again: | 
|  | while(z->ntag == 256) | 
|  | rsleep(&z->tagrend); | 
|  | for(i=0; i<256; i++) | 
|  | if(z->wait[i] == 0){ | 
|  | z->ntag++; | 
|  | z->wait[i] = r; | 
|  | return i; | 
|  | } | 
|  | fprint(2, "libventi: ntag botch\n"); | 
|  | goto Again; | 
|  | } | 
|  |  | 
|  | static void | 
|  | puttag(VtConn *z, Rwait *r, int tag) | 
|  | { | 
|  | assert(z->wait[tag] == r); | 
|  | z->wait[tag] = nil; | 
|  | z->ntag--; | 
|  | rwakeup(&z->tagrend); | 
|  | } | 
|  |  | 
|  | static void | 
|  | muxrpc(VtConn *z, Packet *p) | 
|  | { | 
|  | uchar tag, buf[2], *top; | 
|  | Rwait *r; | 
|  |  | 
|  | if((top = packetpeek(p, buf, 0, 2)) == nil){ | 
|  | fprint(2, "libventi: short packet in vtrpc\n"); | 
|  | packetfree(p); | 
|  | return; | 
|  | } | 
|  |  | 
|  | tag = top[1]; | 
|  | if((r = z->wait[tag]) == nil){ | 
|  | fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag); | 
|  | abort(); | 
|  | packetfree(p); | 
|  | return; | 
|  | } | 
|  |  | 
|  | r->p = p; | 
|  | r->done = 1; | 
|  | rwakeup(&r->r); | 
|  | } | 
|  |  |