blob: 4dbe115f30d7f172b103c715619d3f0ce99c3d67 [file] [log] [blame]
#include <u.h>
#include <libc.h>
#include <venti.h>
#include "queue.h"
long ventisendbytes, ventisendpackets;
long ventirecvbytes, ventirecvpackets;
static int
_vtsend(VtConn *z, Packet *p)
{
IOchunk ioc;
int n, tot;
uchar buf[4];
if(z->state != VtStateConnected) {
werrstr("session not connected");
return -1;
}
/* add framing */
n = packetsize(p);
if(z->version[1] == '2') {
if(n >= (1<<16)) {
werrstr("packet too large");
packetfree(p);
return -1;
}
buf[0] = n>>8;
buf[1] = n;
packetprefix(p, buf, 2);
ventisendbytes += n+2;
} else {
buf[0] = n>>24;
buf[1] = n>>16;
buf[2] = n>>8;
buf[3] = n;
packetprefix(p, buf, 4);
ventisendbytes += n+4;
}
ventisendpackets++;
tot = 0;
for(;;){
n = packetfragments(p, &ioc, 1, 0);
if(n == 0)
break;
if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
packetfree(p);
return -1;
}
packetconsume(p, nil, ioc.len);
tot += ioc.len;
}
vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
packetfree(p);
return 1;
}
static int
interrupted(void)
{
char e[ERRMAX];
rerrstr(e, sizeof e);
return strstr(e, "interrupted") != nil;
}
static Packet*
_vtrecv(VtConn *z)
{
uchar buf[10], *b;
int n, need;
Packet *p;
int size, len;
if(z->state != VtStateConnected) {
werrstr("session not connected");
return nil;
}
p = z->part;
/* get enough for head size */
size = packetsize(p);
need = z->version[1] - '0'; // 2 or 4
while(size < need) {
b = packettrailer(p, need);
assert(b != nil);
if(0) fprint(2, "%d read hdr\n", getpid());
n = read(z->infd, b, need);
if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
if(n==0 || (n<0 && !interrupted()))
goto Err;
size += n;
packettrim(p, 0, size);
}
if(packetconsume(p, buf, need) < 0)
goto Err;
if(z->version[1] == '2') {
len = (buf[0] << 8) | buf[1];
size -= 2;
} else {
len = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];
size -= 4;
}
while(size < len) {
n = len - size;
if(n > MaxFragSize)
n = MaxFragSize;
b = packettrailer(p, n);
if(0) fprint(2, "%d read body %d\n", getpid(), n);
n = read(z->infd, b, n);
if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
if(n > 0)
size += n;
packettrim(p, 0, size);
if(n==0 || (n<0 && !interrupted()))
goto Err;
}
ventirecvbytes += len;
ventirecvpackets++;
p = packetsplit(p, len);
vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
return p;
Err:
vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
return nil;
}
/*
* If you fork off two procs running vtrecvproc and vtsendproc,
* then vtrecv/vtsend (and thus vtrpc) will never block except on
* rendevouses, which is nice when it's running in one thread of many.
*/
void
vtrecvproc(void *v)
{
Packet *p;
VtConn *z;
Queue *q;
z = v;
q = _vtqalloc();
qlock(&z->lk);
z->readq = q;
qlock(&z->inlk);
rwakeup(&z->rpcfork);
qunlock(&z->lk);
while((p = _vtrecv(z)) != nil)
if(_vtqsend(q, p) < 0){
packetfree(p);
break;
}
qunlock(&z->inlk);
qlock(&z->lk);
_vtqhangup(q);
while((p = _vtnbqrecv(q)) != nil)
packetfree(p);
_vtqdecref(q);
z->readq = nil;
rwakeup(&z->rpcfork);
qunlock(&z->lk);
vthangup(z);
}
void
vtsendproc(void *v)
{
Queue *q;
Packet *p;
VtConn *z;
z = v;
q = _vtqalloc();
qlock(&z->lk);
z->writeq = q;
qlock(&z->outlk);
rwakeup(&z->rpcfork);
qunlock(&z->lk);
while((p = _vtqrecv(q)) != nil)
if(_vtsend(z, p) < 0)
break;
qunlock(&z->outlk);
qlock(&z->lk);
_vtqhangup(q);
while((p = _vtnbqrecv(q)) != nil)
packetfree(p);
_vtqdecref(q);
z->writeq = nil;
rwakeup(&z->rpcfork);
qunlock(&z->lk);
return;
}
Packet*
vtrecv(VtConn *z)
{
Packet *p;
Queue *q;
qlock(&z->lk);
if(z->state != VtStateConnected){
werrstr("not connected");
qunlock(&z->lk);
return nil;
}
if(z->readq){
q = _vtqincref(z->readq);
qunlock(&z->lk);
p = _vtqrecv(q);
_vtqdecref(q);
return p;
}
qlock(&z->inlk);
qunlock(&z->lk);
p = _vtrecv(z);
qunlock(&z->inlk);
if(!p)
vthangup(z);
return p;
}
int
vtsend(VtConn *z, Packet *p)
{
Queue *q;
qlock(&z->lk);
if(z->state != VtStateConnected){
packetfree(p);
werrstr("not connected");
qunlock(&z->lk);
return -1;
}
if(z->writeq){
q = _vtqincref(z->writeq);
qunlock(&z->lk);
if(_vtqsend(q, p) < 0){
_vtqdecref(q);
packetfree(p);
return -1;
}
_vtqdecref(q);
return 0;
}
qlock(&z->outlk);
qunlock(&z->lk);
if(_vtsend(z, p) < 0){
qunlock(&z->outlk);
vthangup(z);
return -1;
}
qunlock(&z->outlk);
return 0;
}