| #include "stdinc.h" |
| #include "dat.h" |
| #include "fns.h" |
| |
| typedef struct LumpQueue LumpQueue; |
| typedef struct WLump WLump; |
| |
| enum |
| { |
| MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */ |
| }; |
| |
| struct WLump |
| { |
| Lump *u; |
| Packet *p; |
| int creator; |
| int gen; |
| uint ms; |
| }; |
| |
| struct LumpQueue |
| { |
| QLock lock; |
| Rendez flush; |
| Rendez full; |
| Rendez empty; |
| WLump q[MaxLumpQ]; |
| int w; |
| int r; |
| }; |
| |
| static LumpQueue *lumpqs; |
| static int nqs; |
| |
| static QLock glk; |
| static int gen; |
| |
| static void queueproc(void *vq); |
| |
| int |
| initlumpqueues(int nq) |
| { |
| LumpQueue *q; |
| |
| int i; |
| nqs = nq; |
| |
| lumpqs = MKNZ(LumpQueue, nq); |
| |
| for(i = 0; i < nq; i++){ |
| q = &lumpqs[i]; |
| q->full.l = &q->lock; |
| q->empty.l = &q->lock; |
| q->flush.l = &q->lock; |
| |
| if(vtproc(queueproc, q) < 0){ |
| seterr(EOk, "can't start write queue slave: %r"); |
| return -1; |
| } |
| } |
| |
| return 0; |
| } |
| |
| /* |
| * queue a lump & it's packet data for writing |
| */ |
| int |
| queuewrite(Lump *u, Packet *p, int creator, uint ms) |
| { |
| LumpQueue *q; |
| int i; |
| |
| trace(TraceProc, "queuewrite"); |
| i = indexsect(mainindex, u->score); |
| if(i < 0 || i >= nqs){ |
| seterr(EBug, "internal error: illegal index section in queuewrite"); |
| return -1; |
| } |
| |
| q = &lumpqs[i]; |
| |
| qlock(&q->lock); |
| while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){ |
| trace(TraceProc, "queuewrite sleep"); |
| rsleep(&q->full); |
| } |
| |
| q->q[q->w].u = u; |
| q->q[q->w].p = p; |
| q->q[q->w].creator = creator; |
| q->q[q->w].ms = ms; |
| q->q[q->w].gen = gen; |
| q->w = (q->w + 1) & (MaxLumpQ - 1); |
| |
| trace(TraceProc, "queuewrite wakeup"); |
| rwakeup(&q->empty); |
| |
| qunlock(&q->lock); |
| |
| return 0; |
| } |
| |
| void |
| flushqueue(void) |
| { |
| int i; |
| LumpQueue *q; |
| |
| if(!lumpqs) |
| return; |
| |
| trace(TraceProc, "flushqueue"); |
| |
| qlock(&glk); |
| gen++; |
| qunlock(&glk); |
| |
| for(i=0; i<mainindex->nsects; i++){ |
| q = &lumpqs[i]; |
| qlock(&q->lock); |
| while(q->w != q->r && gen - q->q[q->r].gen > 0){ |
| trace(TraceProc, "flushqueue sleep q%d", i); |
| rsleep(&q->flush); |
| } |
| qunlock(&q->lock); |
| } |
| } |
| |
| static void |
| queueproc(void *vq) |
| { |
| LumpQueue *q; |
| Lump *u; |
| Packet *p; |
| int creator; |
| uint ms; |
| |
| threadsetname("queueproc"); |
| |
| q = vq; |
| for(;;){ |
| qlock(&q->lock); |
| while(q->w == q->r){ |
| trace(TraceProc, "queueproc sleep empty"); |
| rsleep(&q->empty); |
| } |
| |
| u = q->q[q->r].u; |
| p = q->q[q->r].p; |
| creator = q->q[q->r].creator; |
| ms = q->q[q->r].ms; |
| |
| q->r = (q->r + 1) & (MaxLumpQ - 1); |
| trace(TraceProc, "queueproc wakeup flush"); |
| rwakeupall(&q->flush); |
| |
| trace(TraceProc, "queueproc wakeup full"); |
| rwakeup(&q->full); |
| |
| qunlock(&q->lock); |
| |
| trace(TraceProc, "queueproc writelump %V", u->score); |
| if(writeqlump(u, p, creator, ms) < 0) |
| fprint(2, "failed to write lump for %V: %r", u->score); |
| trace(TraceProc, "queueproc wrotelump %V", u->score); |
| |
| putlump(u); |
| } |
| } |