/***********************************************************************/ /* Open Visualization Data Explorer */ /* (C) Copyright IBM Corp. 1989,1999 */ /* ALL RIGHTS RESERVED */ /* This code licensed under the */ /* "IBM PUBLIC LICENSE - Open Visualization Data Explorer" */ /***********************************************************************/ #include #include "rq.h" #include "instrument.h" #include extern int *_dxd_exTerminating; extern int _dxfExReclaimingMemory(); #define MARK_TIME(s) /* DXMarkTimeLocal(s) */ void _dxf_child_RQ_message(int *jobid); typedef struct _EXRQJob *EXRQJob; typedef struct _EXRQJob { EXRQJob next; /* list pointers */ EXRQJob prev; int highpri; /* run this NOW */ int gid; /* group id */ int JID; /* job id */ PFI func; /* function to call */ Pointer arg; /* argument to pass */ int repeat; /* number of repetitions for job*/ } _EXRQJob; typedef struct { volatile int count; volatile EXRQJob free; volatile EXRQJob head; volatile EXRQJob tail; } _EXRQ, *EXRQ; static EXRQ runQueue = NULL; static lock_type *RQlock; static send_RQ_message = 1; Error _dxf_ExRQInit (void) { Error tmp; if (runQueue != NULL) return (ERROR); runQueue = (EXRQ) DXAllocate (sizeof (_EXRQ)); if (runQueue == NULL) return (ERROR); RQlock = (lock_type*) DXAllocate (sizeof (lock_type)); if (!RQlock) { DXFree((Pointer)runQueue); return ERROR; } tmp = DXcreate_lock ((lock_type *) RQlock, "Runqueue"); if (tmp != OK) { DXFree((Pointer)runQueue); DXFree((Pointer)RQlock); return (ERROR); } runQueue->count = 0; runQueue->free = NULL; runQueue->head = NULL; runQueue->tail = NULL; return (OK); } void _dxf_ExRQEnqueue (PFI func, Pointer arg, int repeat, int gid, int JID, int highpri) { volatile EXRQ rq; lock_type *l; EXRQJob job = NULL; _EXRQJob localJob; EXRQJob tail; EXRQJob head; MARK_TIME ("RQE Enter"); DXsyncmem(); localJob.next = NULL; localJob.prev = NULL; localJob.highpri = highpri; localJob.gid = gid; localJob.JID = JID; localJob.func = func; localJob.arg = arg; localJob.repeat = repeat; rq = runQueue; l = RQlock; /* * Prepare a job block. If we can then get one from the runqueue's * own free list. If someone else sneaks in and gets the last one * or there aren't any then we'll just allocate a new one. Once * we've gotten the block then fill it in. */ MARK_TIME ("RQE Alloc"); if (rq->free) { DXlock (l, exJID); job = rq->free; if (job) rq->free = job->next; else { DXunlock (l, exJID); job = (EXRQJob) DXAllocate (sizeof (_EXRQJob)); DXlock (l, exJID); } } else { job = (EXRQJob) DXAllocate (sizeof (_EXRQJob)); DXlock (l, exJID); } if (! job) _dxf_ExDie ("_dxf_ExRQEnqueue: can't DXAllocate job"); MARK_TIME ("RQE DoneAlloc"); *job = localJob; /* * Now that the job block is set up insert it into the general list. * If the list is currently empty then this job becomes both the head * and the tail of the list and we can quit. * * NOTE: high priority jobs are placed at the head of the list, all * others at the tail. */ if (highpri) { head = rq->head; if (! head) rq->head = rq->tail = job; else { job->next = head; head->prev = job; rq->head = job; } } else { tail = rq->tail; if (! tail) rq->head = rq->tail = job; else { job->prev = tail; tail->next = job; rq->tail = job; } } MARK_TIME ("RQE Incr"); rq->count += repeat; MARK_TIME ("RQE Queued"); DXunlock (l, exJID); MARK_TIME ("RQE Exit"); if(!_dxf_ExReclaimingMemory()) { /* child 1 adding something to child 0's queue */ if(_dxd_exMyPID == 1 && JID == 1) _dxf_parent_RQ_message(); else { if(send_RQ_message) { send_RQ_message = 0; _dxf_ExRunOn (1, _dxf_child_RQ_message, &JID, sizeof(int)); send_RQ_message = 1; } } } } /* * Enqueue several jobs. All allocations and queue building happens up front * into a queuePart. * the queue is then locked, and the jobs are enqueued all at once. */ void _dxf_ExRQEnqueueMany (int n, PFI func[], Pointer arg[], int repeat[], int gid, int JID, int highpri) { volatile EXRQ rq; lock_type *l; EXRQJob job = NULL; _EXRQJob localJob; EXRQJob tail; EXRQJob head; int i; EXRQJob queuePartHead; EXRQJob queuePartTail; EXRQJob newBlock; EXRQJob prevBlock; int totalRepeat; MARK_TIME ("RQEM Enter"); if (n == 0) return; DXsyncmem(); rq = runQueue; l = RQlock; queuePartHead = NULL; queuePartTail = NULL; MARK_TIME ("RQEM Alloc"); /* DXAllocate n blocks in a list */ i = 0; if (rq->free) { DXlock (l, exJID); for (; i < n && rq->free; ++i) { newBlock = rq->free; if (i == 0) queuePartTail = newBlock; rq->free = newBlock->next; newBlock->next = queuePartHead; queuePartHead = newBlock; } DXunlock (l, exJID); } for (; i < n; ++i) { newBlock = (EXRQJob) DXAllocate (sizeof (_EXRQJob)); if (!newBlock) _dxf_ExDie ("_dxf_ExRQEnqueueMany: can't allocate newBlock"); if (i == 0) queuePartTail = newBlock; newBlock->next = queuePartHead; queuePartHead = newBlock; } MARK_TIME ("RQE DoneAlloc"); newBlock = queuePartHead; prevBlock = NULL; totalRepeat = 0; localJob.highpri = highpri; localJob.gid = gid; localJob.JID = JID; for (i = 0; i < n; ++i) { localJob.next = newBlock->next; localJob.prev = prevBlock; localJob.func = func[i]; localJob.arg = arg[i]; localJob.repeat = repeat[i]; totalRepeat += localJob.repeat; *newBlock = localJob; prevBlock = newBlock; newBlock = newBlock->next; } MARK_TIME ("RQEM DoneFill"); /* * Now that the job block is set up insert it into the general list. * If the list is currently empty then this job becomes both the head * and the tail of the list and we can quit. * * NOTE: high priority jobs are placed at the head of the list, all * others at the tail. */ DXlock (l, exJID); if (highpri) { head = rq->head; if (! head) { rq->head = queuePartHead; rq->tail = queuePartTail; } else { queuePartTail->next = head; head->prev = queuePartTail; rq->head = queuePartHead; } } else { tail = rq->tail; if (! tail) { rq->head = queuePartHead; rq->tail = queuePartTail; } else { queuePartHead->prev = tail; tail->next = queuePartHead; rq->tail = queuePartTail; } } MARK_TIME ("RQEM Incr"); rq->count += totalRepeat; MARK_TIME ("RQEM Queued"); DXunlock (l, exJID); MARK_TIME ("RQEM Exit"); send_RQ_message = 0; _dxf_ExRunOn (1, _dxf_child_RQ_message, &JID, sizeof(int)); send_RQ_message = 1; } int _dxf_ExRQDequeue (int gid) { volatile EXRQ rq; _EXRQ localRq; lock_type *l; EXRQJob job; _EXRQJob localJob; MARK_TIME ("RQD Enter"); rq = runQueue; l = RQlock; /* * If we got a worker specific job then go ahead and do it. */ DXsyncmem (); if (rq->count <= 0) return (FALSE); DXlock (l, exJID); MARK_TIME ("RQD PostLock"); localRq = *rq; if (localRq.count <= 0) { DXunlock (l, exJID); MARK_TIME ("RQD FastDone"); return (FALSE); } rq->count = --localRq.count; /* * Get a job, removing it from the queue. * There are 3 special cases: * 1) the DeQueuer wants a specific gid (group ID), * 2) the job is destined for a specific JID (Processor ID). * 3) a high priority task exists for a specific JID * If either of these cases is required, skip down the queue until * you find a job. */ MARK_TIME ("RQD GetJob"); job = localRq.head; if (!job) { rq->count++; DXunlock (l, exJID); DXSetError (ERROR_INTERNAL, "#8320"); return (FALSE); } localJob = *job; MARK_TIME ("RQD GotJob"); if (gid != 0 || localJob.JID != 0) { MARK_TIME ("RQD Find MYJob"); for ( ; job; job = localJob.next) { localJob = *job; if ((!gid || localJob.gid == gid || localJob.highpri) && (!localJob.JID || localJob.JID == exJID)) break; } if (!job) { rq->count++; DXunlock (l, exJID); return (FALSE); } } MARK_TIME ("RQD GotMYJob"); /* * If the job we got is a repeatable one, then just write the local copy * back out to the global copy, otherwize, remove it from the queue and add * it to the free list. */ if (--localJob.repeat == 0) { if (localJob.prev) localJob.prev->next = localJob.next; else localRq.head = localJob.next; if (localJob.next) localJob.next->prev = localJob.prev; else localRq.tail = localJob.prev; job->next = localRq.free; localRq.free = job; } else { *job = localJob; } *rq = localRq; MARK_TIME ("RQD DXFree Job"); DXunlock (l, exJID); /* * We have a valid job to execute. Pick up its function and argument, * return the job block to the free list, and really execute the * function. */ IFINSTRUMENT (++exInstrument[_dxd_exMyPID].tasks); (* localJob.func) (localJob.arg, localJob.repeat); MARK_TIME ("RQD Done"); return (TRUE); } int _dxf_ExRQPending(void) { return (runQueue->count > 0); }