venti: reduce locking contention in buildindex
diff --git a/src/cmd/venti/srv/bloom.c b/src/cmd/venti/srv/bloom.c
index 1db36bd..ce146f6 100644
--- a/src/cmd/venti/srv/bloom.c
+++ b/src/cmd/venti/srv/bloom.c
@@ -229,6 +229,22 @@
 	runlock(&b->lk);
 }
 
+void
+markbloomfiltern(Bloom *b, u8int score[][20], int n)
+{
+	int i;
+
+	if(b == nil || b->data == nil)
+		return;
+	
+	rlock(&b->lk);
+	qlock(&b->mod);
+	for(i=0; i<n; i++)
+		_markbloomfilter(b, score[i]);
+	qunlock(&b->mod);
+	runlock(&b->lk);
+}
+
 static void
 bloomwriteproc(void *v)
 {
diff --git a/src/cmd/venti/srv/buildindex.c b/src/cmd/venti/srv/buildindex.c
index 4b7b9bd..ce5103a 100644
--- a/src/cmd/venti/srv/buildindex.c
+++ b/src/cmd/venti/srv/buildindex.c
@@ -11,6 +11,20 @@
 	MaxBufSize = 4*1024*1024,
 };
 
+typedef struct IEntryBuf IEntryBuf;
+struct IEntryBuf 
+{
+	IEntry ie[100];
+	int nie;
+};
+
+typedef struct ScoreBuf ScoreBuf;
+struct ScoreBuf
+{
+	uchar score[100][VtScoreSize];
+	int nscore;
+};
+
 int		dumb;
 int		errors;
 char		**isect;
@@ -117,10 +131,10 @@
 
 	/* start index procs */
 	fprint(2, "%T read index\n");
-	isectdonechan = chancreate(sizeof(void*), 0);
+	isectdonechan = chancreate(sizeof(void*), 1);
 	for(i=0; i<ix->nsects; i++){
 		if(shouldprocess(ix->sects[i])){
-			ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
+			ix->sects[i]->writechan = chancreate(sizeof(IEntryBuf), 1);
 			vtproc(isectproc, ix->sects[i]);
 		}
 	}
@@ -208,12 +222,17 @@
 	ClumpInfo *ci, *cis;
 	IEntry ie;
 	Part *p;
+	IEntryBuf *buf, *b;
+	uchar *score;
+	ScoreBuf sb;
 	
 	p = v;
 	threadsetname("arenaproc %s", p->name);
+	buf = MKNZ(IEntryBuf, ix->nsects);
 
 	nskip = 0;
 	tot = 0;
+	sb.nscore = 0;
 	cis = MKN(ClumpInfo, ClumpChunks);
 	for(i=0; i<ix->narenas; i++){
 		a = ix->arenas[i];
@@ -252,10 +271,23 @@
 					tot++;
 					x = indexsect(ix, ie.score);
 					assert(0 <= x && x < ix->nsects);
-					if(ix->sects[x]->writechan)
-						send(ix->sects[x]->writechan, &ie);
-					if(ix->bloom)
-						markbloomfilter(ix->bloom, ie.score);
+					if(ix->sects[x]->writechan) {
+						b = &buf[x];
+						b->ie[b->nie] = ie;
+						b->nie++;
+						if(b->nie == nelem(b->ie)) {
+							send(ix->sects[x]->writechan, b);
+							b->nie = 0;
+						}
+					}
+					if(ix->bloom) {
+						score = sb.score[sb.nscore++];
+						scorecp(score, ie.score);
+						if(sb.nscore == nelem(sb.score)) {
+							markbloomfiltern(ix->bloom, sb.score, sb.nscore);
+							sb.nscore = 0;
+						}
+					}
 				}
 			}
 		}
@@ -264,6 +296,14 @@
 	}
 	add(&arenaentries, tot);
 	add(&skipentries, nskip);
+	
+	for(i=0; i<ix->nsects; i++)
+		if(ix->sects[i]->writechan && buf[i].nie > 0)
+			send(ix->sects[i]->writechan, &buf[i]);
+	free(buf);
+	free(cis);
+	if(ix->bloom && sb.nscore > 0)
+		markbloomfiltern(ix->bloom, sb.score, sb.nscore);
 	sendp(arenadonechan, p);
 }
 
@@ -743,6 +783,7 @@
 	uchar *data, *p;
 	Buf *buf;
 	IEntry ie;
+	IEntryBuf ieb;
 	IPool *ipool;
 	ISect *is;
 	Minibuf *mbuf, *mb;
@@ -837,14 +878,17 @@
 	assert(p == data+nbuf*bufsize);
 
 	n = 0;
-	while(recv(is->writechan, &ie) == 1){
-		if(ie.ia.addr == 0)
+	while(recv(is->writechan, &ieb) == 1){
+		if(ieb.nie == 0)
 			break;
-		buck = score2bucket(is, ie.score);
-		i = buck/bufbuckets;
-		assert(i < nbuf);
-		bwrite(&buf[i], &ie);
-		n++;
+		for(j=0; j<ieb.nie; j++){
+			ie = ieb.ie[j];
+			buck = score2bucket(is, ie.score);
+			i = buck/bufbuckets;
+			assert(i < nbuf);
+			bwrite(&buf[i], &ie);
+			n++;
+		}
 	}
 	add(&indexentries, n);
 	
diff --git a/src/cmd/venti/srv/fns.h b/src/cmd/venti/srv/fns.h
index 398562c..d1f950f 100644
--- a/src/cmd/venti/srv/fns.h
+++ b/src/cmd/venti/srv/fns.h
@@ -105,6 +105,7 @@
 int		lookupscore(u8int *score, int type, IAddr *ia);
 int		maparenas(AMap *am, Arena **arenas, int n, char *what);
 void		markbloomfilter(Bloom*, u8int*);
+void		markbloomfiltern(Bloom*, u8int[][20], int);
 uint		msec(void);
 int		namecmp(char *s, char *t);
 void		namecp(char *dst, char *src);