blob: 1d8477d5624b2c521ff3d432b091916fd284c493 [file] [log] [blame]
rsc056fe1b2003-11-23 18:19:58 +00001#include <u.h>
2#include <libc.h>
3#include <venti.h>
4#include "queue.h"
5
rsc8baa0cb2004-06-09 14:10:32 +00006long ventisendbytes, ventisendpackets;
7long ventirecvbytes, ventirecvpackets;
8
rsc056fe1b2003-11-23 18:19:58 +00009static int
10_vtsend(VtConn *z, Packet *p)
11{
12 IOchunk ioc;
rsc5ddc97f2005-02-14 19:33:42 +000013 int n, tot;
rsc056fe1b2003-11-23 18:19:58 +000014 uchar buf[2];
rsc8baa0cb2004-06-09 14:10:32 +000015
rsc056fe1b2003-11-23 18:19:58 +000016 if(z->state != VtStateConnected) {
17 werrstr("session not connected");
18 return -1;
19 }
20
21 /* add framing */
22 n = packetsize(p);
23 if(n >= (1<<16)) {
24 werrstr("packet too large");
25 packetfree(p);
26 return -1;
27 }
28 buf[0] = n>>8;
29 buf[1] = n;
30 packetprefix(p, buf, 2);
rsc8baa0cb2004-06-09 14:10:32 +000031 ventisendbytes += n+2;
32 ventisendpackets++;
rsc056fe1b2003-11-23 18:19:58 +000033
rsc5ddc97f2005-02-14 19:33:42 +000034 tot = 0;
rsc056fe1b2003-11-23 18:19:58 +000035 for(;;){
36 n = packetfragments(p, &ioc, 1, 0);
37 if(n == 0)
38 break;
39 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
rsc5ddc97f2005-02-14 19:33:42 +000040 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
rsc056fe1b2003-11-23 18:19:58 +000041 packetfree(p);
42 return 0;
43 }
44 packetconsume(p, nil, ioc.len);
rsc5ddc97f2005-02-14 19:33:42 +000045 tot += ioc.len;
rsc056fe1b2003-11-23 18:19:58 +000046 }
rsc5ddc97f2005-02-14 19:33:42 +000047 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
rsc056fe1b2003-11-23 18:19:58 +000048 packetfree(p);
49 return 1;
50}
51
rsc0c148042004-06-16 16:43:22 +000052static int
53interrupted(void)
54{
55 char e[ERRMAX];
56
57 rerrstr(e, sizeof e);
58 return strstr(e, "interrupted") != nil;
59}
60
61
rsc056fe1b2003-11-23 18:19:58 +000062static Packet*
63_vtrecv(VtConn *z)
64{
65 uchar buf[10], *b;
66 int n;
67 Packet *p;
68 int size, len;
69
70 if(z->state != VtStateConnected) {
71 werrstr("session not connected");
72 return nil;
73 }
74
75 p = z->part;
76 /* get enough for head size */
77 size = packetsize(p);
78 while(size < 2) {
rsc361e2792005-01-18 20:15:18 +000079 b = packettrailer(p, 2);
rsc056fe1b2003-11-23 18:19:58 +000080 assert(b != nil);
rsca09e80f2004-05-23 00:59:17 +000081 if(0) fprint(2, "%d read hdr\n", getpid());
rsc361e2792005-01-18 20:15:18 +000082 n = read(z->infd, b, 2);
rsca09e80f2004-05-23 00:59:17 +000083 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
rsc0c148042004-06-16 16:43:22 +000084 if(n==0 || (n<0 && !interrupted()))
rsc056fe1b2003-11-23 18:19:58 +000085 goto Err;
86 size += n;
87 packettrim(p, 0, size);
88 }
89
90 if(packetconsume(p, buf, 2) < 0)
91 goto Err;
92 len = (buf[0] << 8) | buf[1];
93 size -= 2;
94
95 while(size < len) {
rsc361e2792005-01-18 20:15:18 +000096 n = len - size;
97 if(n > MaxFragSize)
rsc056fe1b2003-11-23 18:19:58 +000098 n = MaxFragSize;
99 b = packettrailer(p, n);
rsca09e80f2004-05-23 00:59:17 +0000100 if(0) fprint(2, "%d read body %d\n", getpid(), n);
101 n = read(z->infd, b, n);
102 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
103 if(n > 0)
104 size += n;
105 packettrim(p, 0, size);
rsc0c148042004-06-16 16:43:22 +0000106 if(n==0 || (n<0 && !interrupted()))
rsc056fe1b2003-11-23 18:19:58 +0000107 goto Err;
rsc056fe1b2003-11-23 18:19:58 +0000108 }
rsc8baa0cb2004-06-09 14:10:32 +0000109 ventirecvbytes += len;
110 ventirecvpackets++;
rsc056fe1b2003-11-23 18:19:58 +0000111 p = packetsplit(p, len);
rsc5ddc97f2005-02-14 19:33:42 +0000112 vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
rsc056fe1b2003-11-23 18:19:58 +0000113 return p;
114Err:
rsc5ddc97f2005-02-14 19:33:42 +0000115 vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
rsc056fe1b2003-11-23 18:19:58 +0000116 return nil;
117}
118
119/*
120 * If you fork off two procs running vtrecvproc and vtsendproc,
121 * then vtrecv/vtsend (and thus vtrpc) will never block except on
122 * rendevouses, which is nice when it's running in one thread of many.
123 */
124void
125vtrecvproc(void *v)
126{
127 Packet *p;
128 VtConn *z;
129 Queue *q;
130
131 z = v;
132 q = _vtqalloc();
133
134 qlock(&z->lk);
135 z->readq = q;
136 qlock(&z->inlk);
137 rwakeup(&z->rpcfork);
138 qunlock(&z->lk);
139
140 while((p = _vtrecv(z)) != nil)
141 if(_vtqsend(q, p) < 0){
142 packetfree(p);
143 break;
144 }
145 qunlock(&z->inlk);
146 qlock(&z->lk);
147 _vtqhangup(q);
148 while((p = _vtnbqrecv(q)) != nil)
149 packetfree(p);
150 vtfree(q);
151 z->readq = nil;
152 rwakeup(&z->rpcfork);
153 qunlock(&z->lk);
154 vthangup(z);
155}
156
157void
158vtsendproc(void *v)
159{
160 Queue *q;
161 Packet *p;
162 VtConn *z;
163
164 z = v;
165 q = _vtqalloc();
166
167 qlock(&z->lk);
168 z->writeq = q;
169 qlock(&z->outlk);
170 rwakeup(&z->rpcfork);
171 qunlock(&z->lk);
172
173 while((p = _vtqrecv(q)) != nil)
174 if(_vtsend(z, p) < 0)
175 break;
176 qunlock(&z->outlk);
177 qlock(&z->lk);
178 _vtqhangup(q);
179 while((p = _vtnbqrecv(q)) != nil)
180 packetfree(p);
181 vtfree(q);
182 z->writeq = nil;
183 rwakeup(&z->rpcfork);
184 qunlock(&z->lk);
185 return;
186}
187
188Packet*
189vtrecv(VtConn *z)
190{
191 Packet *p;
192
193 qlock(&z->lk);
194 if(z->state != VtStateConnected){
195 werrstr("not connected");
196 qunlock(&z->lk);
197 return nil;
198 }
199 if(z->readq){
200 qunlock(&z->lk);
201 return _vtqrecv(z->readq);
202 }
203
204 qlock(&z->inlk);
205 qunlock(&z->lk);
206 p = _vtrecv(z);
207 qunlock(&z->inlk);
208 if(!p)
209 vthangup(z);
210 return p;
211}
212
213int
214vtsend(VtConn *z, Packet *p)
215{
216 qlock(&z->lk);
217 if(z->state != VtStateConnected){
218 packetfree(p);
219 werrstr("not connected");
220 qunlock(&z->lk);
221 return -1;
222 }
223 if(z->writeq){
224 qunlock(&z->lk);
225 if(_vtqsend(z->writeq, p) < 0){
226 packetfree(p);
227 return -1;
228 }
229 return 0;
230 }
231
232 qlock(&z->outlk);
233 qunlock(&z->lk);
234 if(_vtsend(z, p) < 0){
235 qunlock(&z->outlk);
236 vthangup(z);
237 return -1;
238 }
239 qunlock(&z->outlk);
240 return 0;
241}
242