blob: 9184644624c756f7bdff92611a59443b07912ec3 [file] [log] [blame]
rsc056fe1b2003-11-23 18:19:58 +00001#include <u.h>
2#include <libc.h>
3#include <venti.h>
4#include <thread.h>
5#include "queue.h"
6
7enum
8{
9 STACK = 8192,
10};
11
12typedef struct VtSconn VtSconn;
13struct VtSconn
14{
15 int ctl;
rscbe36ff62004-04-29 17:13:24 +000016 int ref;
17 QLock lk;
rsc056fe1b2003-11-23 18:19:58 +000018 char dir[NETPATHLEN];
19 VtSrv *srv;
20 VtConn *c;
21};
22
23struct VtSrv
24{
25 int afd;
26 int dead;
27 char adir[NETPATHLEN];
28 Queue *q; /* Queue(VtReq*) */
29};
30
31static void listenproc(void*);
32static void connproc(void*);
33
rscbe36ff62004-04-29 17:13:24 +000034static void
35scincref(VtSconn *sc)
36{
37 qlock(&sc->lk);
38 sc->ref++;
39 qunlock(&sc->lk);
40}
41
42static void
43scdecref(VtSconn *sc)
44{
45 qlock(&sc->lk);
46 if(--sc->ref > 0){
47 qunlock(&sc->lk);
48 return;
49 }
50 if(sc->c)
51 vtfreeconn(sc->c);
52 vtfree(sc);
53}
54
rsc056fe1b2003-11-23 18:19:58 +000055VtSrv*
56vtlisten(char *addr)
57{
58 VtSrv *s;
59
60 s = vtmallocz(sizeof(VtSrv));
61 s->afd = announce(addr, s->adir);
62 if(s->afd < 0){
63 free(s);
64 return nil;
65 }
66 s->q = _vtqalloc();
67 proccreate(listenproc, s, STACK);
68 return s;
69}
70
71static void
72listenproc(void *v)
73{
74 int ctl;
75 char dir[NETPATHLEN];
76 VtSrv *srv;
77 VtSconn *sc;
78
79 srv = v;
80 for(;;){
81 ctl = listen(srv->adir, dir);
82 if(ctl < 0){
83 srv->dead = 1;
84 break;
85 }
rsc056fe1b2003-11-23 18:19:58 +000086 sc = vtmallocz(sizeof(VtSconn));
rscbe36ff62004-04-29 17:13:24 +000087 sc->ref = 1;
rsc056fe1b2003-11-23 18:19:58 +000088 sc->ctl = ctl;
89 sc->srv = srv;
90 strcpy(sc->dir, dir);
91 proccreate(connproc, sc, STACK);
92 }
93
94 // hangup
95}
96
97static void
98connproc(void *v)
99{
100 VtSconn *sc;
101 VtConn *c;
102 Packet *p;
103 VtReq *r;
104 int fd;
rsca09e80f2004-05-23 00:59:17 +0000105static int first=1;
rsc056fe1b2003-11-23 18:19:58 +0000106
rsc8baa0cb2004-06-09 14:10:32 +0000107if(first && chattyventi){
rsca09e80f2004-05-23 00:59:17 +0000108 first=0;
109 fmtinstall('F', vtfcallfmt);
110}
rsc056fe1b2003-11-23 18:19:58 +0000111 r = nil;
rsc056fe1b2003-11-23 18:19:58 +0000112 sc = v;
rscbe36ff62004-04-29 17:13:24 +0000113 sc->c = nil;
rsc8baa0cb2004-06-09 14:10:32 +0000114 if(0) fprint(2, "new call %s on %d\n", sc->dir, sc->ctl);
rsc056fe1b2003-11-23 18:19:58 +0000115 fd = accept(sc->ctl, sc->dir);
116 close(sc->ctl);
117 if(fd < 0){
118 fprint(2, "accept %s: %r\n", sc->dir);
119 goto out;
120 }
121
122 c = vtconn(fd, fd);
123 sc->c = c;
124 if(vtversion(c) < 0){
125 fprint(2, "vtversion %s: %r\n", sc->dir);
126 goto out;
127 }
128 if(vtsrvhello(c) < 0){
129 fprint(2, "vtsrvhello %s: %r\n", sc->dir);
130 goto out;
131 }
132
rsc8baa0cb2004-06-09 14:10:32 +0000133 if(0) fprint(2, "new proc %s\n", sc->dir);
rsc056fe1b2003-11-23 18:19:58 +0000134 proccreate(vtsendproc, c, STACK);
135 qlock(&c->lk);
136 while(!c->writeq)
137 rsleep(&c->rpcfork);
138 qunlock(&c->lk);
139
140 while((p = vtrecv(c)) != nil){
141 r = vtmallocz(sizeof(VtReq));
142 if(vtfcallunpack(&r->tx, p) < 0){
143 packetfree(p);
144 fprint(2, "bad packet on %s: %r\n", sc->dir);
145 continue;
146 }
rsca09e80f2004-05-23 00:59:17 +0000147 if(chattyventi)
148 fprint(2, "%s <- %F\n", argv0, &r->tx);
rsc056fe1b2003-11-23 18:19:58 +0000149 packetfree(p);
150 if(r->tx.type == VtTgoodbye)
151 break;
152 r->rx.tag = r->tx.tag;
153 r->sc = sc;
rscbe36ff62004-04-29 17:13:24 +0000154 scincref(sc);
rsc056fe1b2003-11-23 18:19:58 +0000155 if(_vtqsend(sc->srv->q, r) < 0){
rscbe36ff62004-04-29 17:13:24 +0000156 scdecref(sc);
rsc056fe1b2003-11-23 18:19:58 +0000157 fprint(2, "hungup queue\n");
158 break;
159 }
160 r = nil;
161 }
162
rsc8baa0cb2004-06-09 14:10:32 +0000163 if(0) fprint(2, "eof on %s\n", sc->dir);
rsc056fe1b2003-11-23 18:19:58 +0000164
165out:
166 if(r){
167 vtfcallclear(&r->tx);
168 vtfree(r);
169 }
rsc8baa0cb2004-06-09 14:10:32 +0000170 if(0) fprint(2, "freed %s\n", sc->dir);
rscbe36ff62004-04-29 17:13:24 +0000171 scdecref(sc);
rsc056fe1b2003-11-23 18:19:58 +0000172 return;
173}
174
175VtReq*
176vtgetreq(VtSrv *srv)
177{
178 return _vtqrecv(srv->q);
179}
180
181void
182vtrespond(VtReq *r)
183{
184 Packet *p;
185 VtSconn *sc;
186
187 sc = r->sc;
188 if(r->rx.tag != r->tx.tag)
189 abort();
190 if(r->rx.type != r->tx.type+1 && r->rx.type != VtRerror)
191 abort();
rsca09e80f2004-05-23 00:59:17 +0000192 if(chattyventi)
193 fprint(2, "%s -> %F\n", argv0, &r->rx);
rsc056fe1b2003-11-23 18:19:58 +0000194 if((p = vtfcallpack(&r->rx)) == nil){
195 fprint(2, "fcallpack on %s: %r\n", sc->dir);
196 packetfree(p);
197 vtfcallclear(&r->rx);
198 return;
199 }
200 vtsend(sc->c, p);
rscbe36ff62004-04-29 17:13:24 +0000201 scdecref(sc);
rsc056fe1b2003-11-23 18:19:58 +0000202 vtfcallclear(&r->tx);
203 vtfcallclear(&r->rx);
204 vtfree(r);
205}
206