/***********************************************************************/ /* Open Visualization Data Explorer */ /* (C) Copyright IBM Corp. 1989,1999 */ /* ALL RIGHTS RESERVED */ /* This code licensed under the */ /* "IBM PUBLIC LICENSE - Open Visualization Data Explorer" */ /***********************************************************************/ #include /* this file is NOT used with the normal dxexec - the one from * the executive directory is used instead. these stubs are only * used for libDXlite and libDXcallm, which have primitive tasking support. */ #include #include #include #include #if DXD_HAS_UNIX_SYS_INCLUDES #include #include #include #if ibm6000 #include #else #include #endif #else #include #endif #if ibmpvs /* XXX */ #define HZ 100 #endif typedef Error (*PFE)(); /* * Task tracing */ static int trace = 0; void DXTraceTask(int t) { trace = t; } /* * Number of processors */ #define MAXPROC 100 #if sgi static int nprocs = 8; #else static int nprocs = -1; #endif int pids[MAXPROC]; int DXProcessors(int n) { int old; if (nprocs<0) { /* XXX - automatic */ nprocs = 1; } old = nprocs; if (n>0) nprocs = n; return old; } void _dxfGetProcessIds(int *childp) { int i; for (i = 0; i < nprocs; i++) childp[i] = pids[i]; } /* * */ static int id = 0; int DXProcessorId(void) { return id; } /* * The following info is modified by each processor, * so must be in a shared area in global memory, * which we allocate on the first DXCreateTaskGroup. */ static struct taskinfo { struct task { Error(*proc)(Pointer); /* function to do the task */ Pointer arg; /* pointer to argument block */ int size; /* size of argument block */ float work; /* estimate of amount of work */ struct task *next; /* for tasks added by other tasks only */ } *tasks; /* dynamically allocated */ struct task *first, *last; /* tasks added by other tasks only */ volatile int ntasks; /* count of tasks not yet started */ int undone; /* count of tasks not yet finished */ Error rc; /* return code for this task group */ /* XXX - return message */ lock_type extra_lock; /* DXlock for first, last pointers */ lock_type task_lock; /* unlocked when there is a task to do */ lock_type undone_lock; /* DXlock for undone count */ lock_type done_flag; /* unlocked when all tasks are done */ int ready[MAXPROC]; /* ready flags for each proc */ } *ti; static struct ltaskinfo { struct task *tasks; /* pointer to local task info */ int alloc; /* number allocated */ int ntasks; /* count of tasks */ } lti; /* * define a sorting routine */ #define TYPE struct task #define LT(a,b) ((a)->work < (b)->work) #define GT(a,b) ((a)->work > (b)->work) #define QUICKSORT tasksort #include "qsort.c" /* * Start out with no tasks */ Error DXCreateTaskGroup() { static int been_here = 0; if (!been_here) { been_here = 1; /* allocate shared task info */ ti = (struct taskinfo *) DXAllocateZero(sizeof(struct taskinfo)); if (!ti) return NULL; /* create the locks */ if (!DXcreate_lock(&(ti->task_lock), "task DXlock")) return NULL; if (!DXcreate_lock(&(ti->undone_lock), "undone DXlock")) return NULL; if (!DXcreate_lock(&(ti->done_flag), "done flag")) return NULL; /* initialize them */ DXlock(&(ti->done_flag), 0); /* DXunlock to signal all tasks done */ DXlock(&(ti->task_lock), 0); /* DXunlock to start task execution */ /* allocate initial local task structure */ lti.alloc = 1000; lti.tasks = (struct task *) DXAllocateLocal(lti.alloc * sizeof(struct task)); } /* initialize */ ti->tasks = NULL; ti->ntasks = 0; ti->undone = 0; ti->rc = OK; return OK; } /* * Record each task in the task array */ Error DXAddTask(Error(*proc)(Pointer), Pointer arg, int size, double work) { struct task *t; /* get a task block t */ if (ti->tasks) { /* add to list of tasks generated by other tasks */ t = (struct task *) DXAllocate(sizeof(struct task)); if (!t) return ERROR; } else { /* initial tasks generated by master */ if (lti.ntasks >= lti.alloc) { int n = lti.alloc*3/2; Pointer p = DXReAllocate((Pointer)(lti.tasks), n*sizeof(struct task)); if (!p) return ERROR; lti.alloc = n; lti.tasks = (struct task *) p; } t = lti.tasks + lti.ntasks; lti.ntasks++; } /* fill it in */ t->proc = proc; t->arg = size==0? arg : (char *) memcpy(DXAllocate(size), arg, size); t->size = size; t->work = work; /* add to queue iff this is an extra task */ if (ti->tasks) { if (ti->ntasks || ti->first) /* NB: if this is the only task, we */ DXlock(&ti->task_lock, 0); /* already have DXlock (see one_task) */ if (ti->last) { t->next = ti->last->next; ti->last->next = t; } else t->next = NULL; if (!ti->first) ti->first = t; ti->last = t; DXunlock(&ti->task_lock, 0); DXlock(&ti->undone_lock, 0); ti->undone += 1; DXunlock(&ti->undone_lock, 0); } else t->next = NULL; return OK; } Error DXAddLikeTasks(PFE func, Pointer arg, int size, double work, int repeat) { return(DXAddTask(func, arg, size, work)); } /* * Do one task. Argument indicates whether to block * if we can't get the task DXlock or just return. */ static Error one_task(int block) { struct task *t; char msg[16]; int delete; if (block) /* get DXlock w/ blocking, */ DXlock(&(ti->task_lock), 0); else /* or get DXlock w/o blocking */ if (!DXtry_lock(&(ti->task_lock), 0)) return OK; DXsyncmem(); /* make sure we're in sync */ if (ti->ntasks) { /* are there more sorted tasks? */ ti->ntasks -= 1; /* yes, remove it */ t = &(ti->tasks[ti->ntasks]); /* get the next task */ } else if (ti->first) { /* are there more extra tasks */ DXlock(&ti->extra_lock, 0); /* DXlock the queue */ t = ti->first; /* get the next one */ ti->first = t->next; /* remove from queue */ DXunlock(&ti->extra_lock, 0); /* DXunlock the queue */ delete = 1; /* remember to delete it */ } if (ti->ntasks || ti->first) /* if there are more tasks, */ DXunlock(&(ti->task_lock), 0); /* give someone else a turn */ DXMarkTimeLocal("start task"); /* record the time */ if (!t->proc(t->arg)) /* do the task */ ti->rc = ERROR; /* return error if anyone fails */ DXMarkTimeLocal("end task"); /* record the time */ if (t->size) /* if we allocated the arg block, */ DXFree(t->arg); /* free it */ DXsyncmem(); /* make sure we're in sync */ DXlock(&(ti->undone_lock), id); /* DXlock the undone count */ if (--(ti->undone)==0) /* decrement it */ DXunlock(&(ti->done_flag), 0); /* signal if all done */ DXunlock(&(ti->undone_lock), id); /* DXunlock the undone count */ return OK; } /* * XXX - clean this up */ #if ibmpvs #define slave() fork() #define master() fork() #if OPTIMIZED #define PIN (!nopin) #else #define PIN 0 #endif #else /* this isn't going to work correctly on an mp-sgi. the new parm * to memfork is supposed to be the child number. */ #define slave() DXmemfork(1) /* slaves */ #define master() fork() /* master */ #define PIN 0 #endif Error DXExecuteTaskGroup() { static int been_here = 0, nopin = 0; int i; if (!been_here) { /* create workers */ been_here = 1; /* once only */ nopin = getenv("NOPIN")? 1 : 0; /* don't fork parent: for gdb */ DXProcessors(0); /* make nprocs valid */ for (id=1; idready[id] = 1; /* we're ready */ for (;;) /* loop forever, */ one_task(1); /* do a task, blocking for each */ } } /* this loop set id for each proc, */ id = 0; /* but we're still processor 0 */ #if !ibmpvs if (nprocs>1 || PIN) { /* in multi-processor case, */ pids[0] = master(); /* fork the master */ if (pids[0]>0) { /* in parent */ #if DXD_HAS_OS2_CP for (i=0; i=0)/* wait for all children to die */ continue; #endif /* DXD_WIN */ #endif /* DXD_HAS_OS2_CP */ exit(0); /* and exit */ } } #endif } if (lti.ntasks==0) return OK; DXMarkTime("start parallel"); /* sort the tasks and copy them to global memory */ tasksort(lti.tasks, lti.ntasks); ti->tasks = (struct task *) DXAllocate(lti.ntasks * sizeof(struct task)); if (!ti->tasks) return ERROR; memcpy(ti->tasks, lti.tasks, lti.ntasks * sizeof(struct task)); ti->ntasks = lti.ntasks; /* number of tasks */ ti->undone = lti.ntasks; /* number of tasks not yet done */ lti.ntasks = 0; /* no more tasks here */ DXunlock(&(ti->task_lock), 0); /* start task execution */ while (ti->ntasks > 0) { /* while there are tasks to be done, */ one_task(0); /* do a task, but don't block */ DXqflush(); /* flush its messages, if any */ } DXlock(&(ti->done_flag), 0); /* wait for all tasks done */ DXFree((Pointer)ti->tasks); /* free allocated task storage */ ti->tasks = NULL; /* just in case */ DXMarkTime("end parallel"); /* timing */ return ti->rc; /* return error if anyone failed */ } Error DXAbortTaskGroup(void) { /* XXX */ lti.ntasks = 0; return OK; } /* these don't let you control which processor they run on, like the * exec code does. this means you can call the routines, but they * don't do what you expect. */ Error _dxf_ExRunOn(int pid, Error (*func)(), Pointer arg, int size) { DXCreateTaskGroup(); DXAddTask(func, arg, size, 1.0); return DXExecuteTaskGroup(); } Error _dxf_ExRunOnAll(Error (*func)(), Pointer arg, int size) { int i; DXCreateTaskGroup(); for (i=0; i