/***********************************************************************/
/* Open Visualization Data Explorer                                    */
/* (C) Copyright IBM Corp. 1989,1999                                   */
/* ALL RIGHTS RESERVED                                                 */
/* This code licensed under the                                        */
/*    "IBM PUBLIC LICENSE - Open Visualization Data Explorer"          */
/***********************************************************************/

#include <dxconfig.h>



/* Task.c containes the routines that implement the tasks.  
 * There is a task block structure, which contains various fields including 
 * a list of tasks.  The task groups may be nested.  The common 
 * sequence of calls * is DXCreateTaskGroup, AddTasks, and 
 * DXExecuteTaskGroup.  All calls for a
 * particular task group must be executed on the same processor.  Task groups
 * are stacked per processor, and a free list is maintained per processor.
 * Because task groups may
 * be executed in a dump-and-run style, the last processor must schedule the
 * process group cleanup on the correct processor.
 *
 * Note that for performance reasons, the task work fields of the tasks are 
 * copied into a local list (with the indices), and this is sorted, not the 
 * tasks themselves.
 */

#include <dx/dx.h>

#include "rq.h"
#include "utils.h"
#include "status.h"
#include "config.h"
#include "context.h"

extern gfunc   *_dxd_exCurrentFunc;

/* #define TASK_TIME */

#define	EX_TASK_DATA	128			/* local task data	*/
#define	EX_TASK_BLOCKS	256			/* initial task blocks	*/
#define FREE_THRESHOLD	2
#define MAX_SAVED_TASKS (EX_TASK_BLOCKS)

#define	EMPTY		(_tasks == NULL)
#define	POP(_tg)	{_tg = _tasks; _tasks = _tg->link;}
#define	PUSH(_tg)	{_tg->link = _tasks; _tasks = _tg;}


typedef struct _EXTask		*EXTask;
typedef struct _EXTaskGroup	*EXTaskGroup;


typedef struct _EXTask
{
    EXTaskGroup		tg;			/* control block	*/
    float		work;			/* work estimate	*/
    PFE			func;			/* function to call	*/
    int			repeat;			/* number of requested reps */
    int			nocopy;			/* just pass arg flag	*/
    int			delete;			/* 1->not in `tasks' array */
    Pointer		arg;			/* function argument	*/
    unsigned char	data[EX_TASK_DATA];	/* local data storage	*/
    Context             taskContext;            /* copy of global context */
} _EXTask;


typedef struct _EXTaskGroup
{
    lock_type		lock;
    EXTaskGroup		link;			/* stack linkage 	*/
    int			procId;			/* Creating process ID  */
    int			nalloc;			/* # of tasks allocated	*/
    int			nused;			/* # of tasks used	*/
    int			ntodo;			/* # of tasks to do	*/
    EXTask		tasks;			/* the task blocks	*/
    float		minwork;		/* smallest work est.	*/
    float		maxwork;		/* largest  work est.	*/
    int			sync;			/* synchronous flag	*/
    ErrorCode		error;			/* for error return	*/
    char		*emsg;			/* for error return	*/
} _EXTaskGroup;

typedef struct 
{
    float	work;
    EXTask	task;
} WorkIndex;


static Error	ExDestroyTaskGroup	(EXTaskGroup tg);
static Error	ExProcessTask		(EXTask t, int iteration);
static Error	ExProcessTaskGroup	(int sync);

void		_dxf_ExPrintTask		(EXTask t);
void		_dxf_ExPrintTaskGroup	(EXTaskGroup tg);


/* Note that these structures (the task stack and free list)
 * are PER PROCESSOR.
 */
static EXTaskGroup	_tasks = NULL;

static EXTaskGroup	freeTasks = NULL;
static int		numFreeTasks = 0;

static int		taskNprocs = -1;

static EXTaskGroup	runningTG = NULL;


/********************* START OF m_fork EMULATION **********************/
#define	MFORK_ARGS	6

typedef struct
{
    lock_type	mlock;			/* m_next lock	*/
    int		mnext;
    int		myid;
    int		procs;
    lock_type	dlock;			/* # done lock	*/
    int		done;
    PFE		func;
    uint	args[MFORK_ARGS];
} MFork;

static MFork	*_mf_g	= NULL;
static MFork	_mf_l;

static Error
_mfork_init ()
{
    _mf_g = (MFork *) DXAllocate (sizeof (MFork));
    if (! _mf_g)
	return (ERROR);

    if (! DXcreate_lock (&(_mf_g->mlock), "m_next lock"))
	return (ERROR);
    if (! DXcreate_lock (&(_mf_g->dlock), "done lock"))
	return (ERROR);

    return (OK);
}

static int
_mfork_worker (Pointer p, int n)
{
    _mf_l      = * (_mf_g);
    _mf_l.myid = n;
    (* _mf_l.func) (_mf_l.args[0], _mf_l.args[1], _mf_l.args[2],
		    _mf_l.args[3], _mf_l.args[4], _mf_l.args[5]);

    _mf_l.mnext = DXfetch_and_add (&(_mf_g->done), 1, &(_mf_g->dlock), _mf_l.myid);

    return (OK);
}

void
m_fork (PFE func, uint a0, uint a1, uint a2, uint a3, uint a4, uint a5)
{
    volatile int	*count;		/* task group counter		*/

    _mf_l.mnext   = 0;
    _mf_l.myid    = 0;
    _mf_l.procs   = _mf_l.procs < 1 ? DXProcessors (0) : _mf_l.procs;
    _mf_l.done    = 0;
    _mf_l.func    = func;
    _mf_l.args[0] = a0;
    _mf_l.args[1] = a1;
    _mf_l.args[2] = a2;
    _mf_l.args[3] = a3;
    _mf_l.args[4] = a4;
    _mf_l.args[5] = a5;

    * _mf_g = _mf_l;

    _dxf_ExRQEnqueue (_mfork_worker, NULL, _mf_l.procs, (int) _mf_g, 0, FALSE);

    count = &_mf_g->done;

    /*
     * Keep trying to get more mfork tasks until we don't get another
     * at which point we just need to spin until all of the rest have
     * finished.
     */

    while (*count != _mf_l.procs)
    {
	if (! _dxf_ExRQDequeue ((int) _mf_g))
	    break;
    }

    while (*count != _mf_l.procs)
	continue;
}

int
m_next ()
{
    _mf_l.mnext = DXfetch_and_add (&(_mf_g->mnext), 1, &(_mf_g->mlock), _mf_l.myid);

    return (_mf_l.mnext);
}

int
m_set_procs (int n)
{
    int		p = DXProcessors (0);

    _mf_l.procs = n > p ? p : n;
    return (_mf_l.procs);
}

int
m_get_numprocs ()
{
    if (_mf_l.procs < 1)
	_mf_l.procs = DXProcessors (0);
    return (_mf_l.procs);
}

int
m_get_myid ()
{
    return (_mf_l.myid);
}
/********************* END   OF m_fork EMULATION **********************/


int
DXProcessorId(void)
{
    return(_dxd_exMyPID);
}

static int
ParentProcessId(void)
{
    return(_dxd_exPPID);
}

int
DXProcessors (int n)
{
    if (taskNprocs == -1 && n > 0) 
	taskNprocs = n;

    return (taskNprocs);
}


static int trace = 0;
void DXTraceTask(int t)
{
    trace = t;
    
    if (trace > 0) {
	if (EMPTY)
	    DXMessage("no active task group");
	else
	    _dxf_ExPrintTaskGroup(_tasks);
    }
}

Error _dxf_ExInitTask(int n) 
{
    taskNprocs = n;
    if (! _mfork_init ())
	return (ERROR);
    return (OK);
}

Error _dxf_ExInitTaskPerProc() 
{
    if (DXCreateTaskGroup() == ERROR)
	return (ERROR);
    if (DXAbortTaskGroup() == ERROR)
	return (ERROR);
    return (OK);
}

Error _dxf_ExCleanupTask() 
{
    return (OK);
}

/*
 * Opens a new task group.  If one is already open then it is pushed
 * onto a stack of open task groups.
 */

Error DXCreateTaskGroup ()
{
    EXTaskGroup		tg	= NULL;
    EXTask		t	= NULL;
    int			size;
    Error		l	= ERROR;

    if (numFreeTasks == 0) 
    {
	size = sizeof (_EXTaskGroup);
	tg = (EXTaskGroup) DXAllocate (size);
	if (! tg)
	    goto error;
	ExZero (tg, size);
	
	size = sizeof (_EXTask) * EX_TASK_BLOCKS;
	t = (EXTask) DXAllocate (size);
	if (! t)
	    goto error;
	ExZero (t, size);

	l = DXcreate_lock (&tg->lock, "Tasks");
	if (l != OK)
	    goto error;

	tg->procId = exJID;
	tg->nalloc = EX_TASK_BLOCKS;
	tg->tasks  = t;
	tg->error  = ERROR_NONE;
    }
    else
    {
	tg = freeTasks;
	freeTasks = tg->link;
	--numFreeTasks;
	if (tg->nalloc == 0) 
	{
	    size = sizeof (_EXTask) * EX_TASK_BLOCKS;
	    t = (EXTask) DXAllocate (size);
	    if (! t)
		goto error;
	    ExZero (t, size);
	    tg->tasks = t;
	    tg->nalloc = EX_TASK_BLOCKS;
	}

	tg->nused = 0;			/* # of tasks used	*/
	tg->ntodo = 0;
	tg->sync = 0;			/* synchronous flag	*/
	tg->error  = ERROR_NONE;
    }

    PUSH (tg);
    return (OK);

error:
    if (l == OK && tg != NULL)
	DXdestroy_lock (&tg->lock);
    DXFree ((Pointer) tg);
    DXFree ((Pointer) t);
    return (ERROR);
}


/*
 * Adds a task to the current task group.  If there is no task group open,
 * but this is called from within a task, then this task is added to the 
 * end of the current task group and "work" is ignored.
 */

Error DXAddLikeTasks (PFE func, Pointer arg, int size, double work, int repeat)
{
    EXTaskGroup	tg	= NULL;
    int		s;
    EXTask	t;
    Pointer	a;
    int 	locked	= FALSE;

    if (repeat <= 0)
    {
	DXSetError (ERROR_INTERNAL, "#8330");
	goto error;
    }

    if (EMPTY)
    {
	if (runningTG)
	{
	    tg = runningTG;
	    locked = TRUE;
	    DXlock (&tg->lock, exJID);
	}
	else
	{
	    DXSetError (ERROR_INTERNAL, "#8340");
	    goto error;
	}
	t = (EXTask) DXAllocateZero (sizeof (_EXTask));
	if (! t)
	    goto error;
	t->delete = TRUE;
    }
    else
    {
	tg = _tasks;
	locked = FALSE;
	/*
	 * Extend the task group by adding blocks if necessary
	 */
	if (tg->nalloc == tg->nused)
	{
	    tg->nalloc <<= 1;
	    s = tg->nalloc * sizeof (_EXTask);

	    t = (EXTask) DXReAllocate ((Pointer) tg->tasks, s);
	    if (! t)
		goto error;
	    tg->tasks = t;
	}
	t = tg->tasks + tg->nused++;
	t->delete = FALSE;
    }
    

    /*
     * Remember the relevant information about the task
     */
    t->tg   = tg;
    t->work = work;
    t->func = func;
    t->repeat = repeat;

    /*
     * If the argument data fits locally then copy it here, otherwise
     * allocate some space and put it there.
     */
    if (size == 0)
    {
	t->arg = arg;
	t->nocopy = TRUE;
    }
    else if (size <= EX_TASK_DATA)
    {
	ExCopy (t->data, arg, size);
	t->nocopy = FALSE;
	t->arg = NULL;
    }
    else
    {
	a = DXAllocate (size);
	if (! a)
	    goto error;
	ExCopy (a, arg, size);
	t->arg = a;
	t->nocopy = FALSE;
    }

    if (locked)
    {
	tg->ntodo += repeat;
	DXunlock (&tg->lock, exJID);
        /* copy global context data to ExTask structure */
#if 0
        t->taskContext = _dxd_exContext;
#endif
        _dxfCopyContext(&(t->taskContext), _dxd_exContext);
	_dxf_ExRQEnqueue (ExProcessTask, (Pointer)t, repeat, (int) tg, 0, FALSE);
    }
    else
    {
	if (tg->nused == 1 || work < tg->minwork)
	    tg->minwork = work;
	if (tg->nused == 1 || work > tg->maxwork)
	    tg->maxwork = work;
    }

    return (OK);

error:
    if (locked && tg != NULL)
	DXunlock (&tg->lock, exJID);
    return (ERROR);
}

Error DXAddTask (PFE func, Pointer arg, int size, double work)
{
    return DXAddLikeTasks (func, arg, size, work, 1);
}


/*
 * Executes the current task group.  If any errors occur during the execution
 * of the tasks the earliest error is reported.
 */

Error DXExecuteTaskGroup (void)
{
    Error	ret;

    ret = ExProcessTaskGroup (TRUE);
    return (ret);
}


/*
 * Queues all of the tasks in the current task group for execution and 
 * immediately returns.
 */

Error DXExecuteTaskGroupNoWait (void)
{
    ExProcessTaskGroup (FALSE);
    return (OK);
}


/*
 * Aborts the current task group without executing it.
 */

Error DXAbortTaskGroup (void)
{
    EXTaskGroup		tg;

    if (EMPTY)
	return (OK);
    POP (tg);
    ExDestroyTaskGroup (tg);
    return (OK);
}


/*
 * Aborts the current task group without executing it. (Another entrypoint)
 */

Error DXDeleteTaskGroup ()
{

    DXAbortTaskGroup ();
    return (OK);
}


/*
 * Destroys a task group.
 */

static int
ExDestroyTaskGroup (EXTaskGroup tg)
{
    int		i;
    int		n;
    EXTask	t;

    for (t = tg->tasks, n = tg->nused, i = 0; i < n; t++, i++)
	if (t->arg && ! t->nocopy)
	    DXFree ((Pointer) t->arg);
    if (tg->emsg)
    {
	DXFree ((Pointer) tg->emsg);
	tg->emsg = NULL;
    }

    if (numFreeTasks >= FREE_THRESHOLD)
    {
	DXdestroy_lock (&tg->lock);
	DXFree ((Pointer) tg->tasks);
	DXFree ((Pointer) tg);
    }
    else
    {
	++numFreeTasks;
	tg->link = freeTasks;
	freeTasks = tg;
	if (tg->nalloc > MAX_SAVED_TASKS) 
	{
	    DXFree((Pointer)tg->tasks);
	    tg->nalloc = 0;
	}
    }
    return (OK);
}


/*
 * Processes a single task.  If we find that this is the last task in the	
 * group to finish and it is an asynchronous task group then we must take
 * care to delete it.  If this is a synchronous task group then we must
 * store any errors that occur in the task group's descriptor block.
 */

static Error ExProcessTask (EXTask t, int iteration)
{
    EXTaskGroup		tg;
    Pointer		arg;		/* task argument pointer	*/
    ErrorCode		ecode;		/* error code			*/
    int			ntodo;		/* number left in group		*/
    char		*emsg;
    Error		returnVal;
    int			status;
    EXTaskGroup		oldTG;
    Context             savedContext;

    oldTG = runningTG;
    ecode = ERROR_NONE;
    emsg  = NULL;

    runningTG = tg    = t->tg;

    arg = (t->nocopy || t->arg) ? t->arg : (Pointer) t->data;

    DXResetError ();
#if 0
    savedContext = _dxd_exContext; /* save current context */
    _dxd_exContext = t->taskContext;  /* move task context to global context */
#endif
    _dxfCopyContext(&savedContext, _dxd_exContext);
    _dxfCopyContext(_dxd_exContext, &(t->taskContext));
    status = get_status ();
    set_status (PS_RUN);
    DXMarkTimeLocal ("start task");

    returnVal = (*t->func) (arg, iteration);
#if 0
    _dxd_exContext = savedContext; /* restore original context */
#endif
    _dxfCopyContext(_dxd_exContext, &savedContext);

    DXMarkTimeLocal ("end task");
    set_status (status);

    /*
     * Check for errors, if we are running without waiting, skip
     * error checking.  If the user didn't DXSetError and he didn't return
     * ERROR, no error checking.  If we have had an error in the past, 
     * don't bother getting the error stuff.
     */
    if (! tg->sync)
	goto countdown;

    ecode = DXGetError ();
    if (ecode == ERROR_NONE && returnVal == OK)
	goto countdown;

    if (ecode != ERROR_NONE && returnVal == OK)
    {
	returnVal = ERROR;
	DXWarning ("#4720",t->taskContext.graphId,_dxd_exCurrentFunc->cpath);
    }
    if (ecode == ERROR_NONE && returnVal == ERROR)
    {
	ecode = ERROR_INTERNAL;
	emsg = "#8350";
	goto copymessage;
    }

    if (tg->error != ERROR_NONE)
	goto countdown;

    emsg = DXGetErrorMessage ();

#define	L_ERROR		2048
copymessage:
    if (emsg)
    {
	char	lbuf[L_ERROR];
	int	len;

	len = strlen (emsg);
	len = len >= L_ERROR ? L_ERROR - 1 : len;
	strncpy (lbuf, emsg, len);
	lbuf[len] = NULL;
	emsg = _dxf_ExCopyString (lbuf);
    }

countdown:
    DXlock (&tg->lock, exJID);
    ntodo = --(tg->ntodo);
    if (ecode != ERROR_NONE && tg->error == ERROR_NONE)
    {
	tg->error = ecode;
	tg->emsg  = emsg;
	emsg      = NULL;
    }
    DXunlock (&tg->lock, exJID);

    DXFree ((Pointer) emsg);

    /* If this tg was run asyncronously and needs to be destroyed, schedule
     * the destruction on the creating processor.
     */
    if ((ntodo == 0) && (! tg->sync))
	_dxf_ExRQEnqueue (ExDestroyTaskGroup, (Pointer) tg, 1, 0, tg->procId, FALSE);
    
    if (t->delete)
	DXFree ((Pointer) t);
    runningTG = oldTG;
    return (ecode == ERROR_NONE ? OK : ERROR);
}


/*
 * Processes a task group.  If the sync flag is true then all tasks must
 * complete before this routine terminates.  If not then they are just
 * queued for execution.
 * Note that for sorting reasons, a list of WorkIndex pairs is created.
 * This list is sorted, and then all references to the list that require
 * sorting must be done using this intermediate form.
 */

#define TYPE WorkIndex
#define LT(a,b)	((a)->work>(b)->work)
#define GT(a,b)	((a)->work<(b)->work)
#define QUICKSORT	ExWorkIndexSort
#include "qsort.c"

static Error ExProcessTaskGroup (int sync)
{
    Error               ret	= ERROR;
    EXTaskGroup		tg;
    EXTask		task;
    int			i, j;
    int			todo;
    volatile int	*count;		/* task group counter		*/
    int			totalTodo;
#define NUM_TASKS_ALLOCED 256
    Pointer		_args[NUM_TASKS_ALLOCED];
    PFI			_funcs[NUM_TASKS_ALLOCED];
    int			_repeats[NUM_TASKS_ALLOCED];
    Pointer		*args	= _args;
    PFI			*funcs	= _funcs;
    int			*repeats = _repeats;
    int			status;
    WorkIndex 		_ilist[NUM_TASKS_ALLOCED];
    WorkIndex 		*ilist	= _ilist;
    ErrorCode		ecode;
    char		*emsg;
    EXTask		myTask	= NULL;
    int			myIter	= 0;
    
    if (EMPTY)
	return (OK);
    
    POP (tg);
    if (tg->nused == 0)
    {
	ExDestroyTaskGroup (tg);
	return (OK);
    }

    DXMarkTime ("start parallel");
    /*
     * Remember whether or not this is a syncronous task group.
     */

    ecode = DXGetError ();
    emsg  = DXGetErrorMessage ();

    if (ecode != ERROR_NONE || *emsg != '\0')
    {
	if (ecode != ERROR_NONE)
	    DXWarning ("#4840");
	else
	    DXWarning ("#4850");
	
	tg->error = ecode;
	tg->emsg  = _dxf_ExCopyString (emsg);
    }

    tg->sync  = sync;
    todo = tg->nused;

    status = get_status ();

    /*
     * Only bother to sort if the tasks actually have different cost
     * estimates associated with them.
     */
    if (todo > NUM_TASKS_ALLOCED)
    {
	ilist = (WorkIndex *) DXAllocateLocal (todo * sizeof (WorkIndex));
	if (ilist == NULL)
	    goto error;
    }
    task = tg->tasks;
    for (i = 0; i < todo; ++i)
    {
	ilist[i].task = task + i;
	ilist[i].work = task[i].work;
        _dxfCopyContext(&(task[i].taskContext), _dxd_exContext);
    }

    if (tg->minwork != tg->maxwork)
	QUICKSORT (ilist, todo);
#ifdef TASK_TIME
    DXMarkTimeLocal ("finish sort");
#endif

    /*
     * Schedule/Execute the tasks appropriately.
     */
    if (todo > NUM_TASKS_ALLOCED) 
    {
	funcs   = (PFI     *) DXAllocateLocal (todo * sizeof (PFI    ));
	args    = (Pointer *) DXAllocateLocal (todo * sizeof (Pointer));
	repeats = (int     *) DXAllocateLocal (todo * sizeof (int    ));
	if (funcs == NULL || args == NULL || repeats == NULL)
	    goto error;
    }

    /* Save a task for the executer to execute */
    i = 0;
    if (sync)
    {
	myTask = ilist[i].task;
	myIter = ilist[i].task->repeat - 1;
	ilist[i].task->repeat--;
	if (ilist[i].task->repeat == 0) 
	{
	    i = 1;
	}
    }
	
    totalTodo = 1;
    for (j = 0; i < todo; j++, i++)
    {
	funcs[j] = ExProcessTask;
	args[j] = (Pointer) ilist[i].task;
	totalTodo += (repeats[j] = ilist[i].task->repeat);
    }
    tg->ntodo = totalTodo;
    if (ilist[0].task->repeat == 0)
    {
	--todo;
    }


#ifdef TASK_TIME
    DXMarkTimeLocal ("queue all tasks");
#endif
    _dxf_ExRQEnqueueMany (todo, funcs, args, repeats, (int) tg, 0, FALSE);
#ifdef TASK_TIME
    DXMarkTimeLocal ("queued all tasks");
#endif

    if (funcs != _funcs)
	DXFree ((Pointer)funcs);
    if (args != _args)
	DXFree ((Pointer)args);
    if (repeats != _repeats)
	DXFree ((Pointer)repeats);
    if (ilist != _ilist)
	DXFree ((Pointer)ilist);

    if (! sync)
    {
	ret = OK;
    }
    else 
    {
	/*
	 * This processor is now restricted to processing tasks in this
	 * task group.  Once it can no longer get a job in this task group
	 * from the run queue then just spin and wait for all of the outstanding
	 * tasks in the group to complete.
	 */

#ifdef TASK_TIME
	DXMarkTimeLocal ("tasks enqueued");
#endif
	/* Do the task that I saved above as myTask */
	if (myTask != NULL)
	    ExProcessTask (myTask, myIter);

	count = &tg->ntodo;
	while (*count > 0)
	{
	    if (! _dxf_ExRQDequeue ((int) tg))
		break;
	}

	DXMarkTimeLocal ("waiting");

	set_status (PS_JOINWAIT);

	/* Every 100 times of checking count, try to see if anyone added
	 * on to the queue.
	 */
	while (*count > 0)
	{
	    _dxf_ExRQDequeue ((int)tg);
	    for (i = 0; *count && i < 100; ++i)
		;
	}

	DXMarkTimeLocal ("joining");

	set_status (status);

	ret = (tg->error == ERROR_NONE) ? OK : ERROR;
	if (ret != OK)
	    DXSetError (tg->error, tg->emsg? tg->emsg: "#8360");

	ExDestroyTaskGroup (tg);
    }

    DXMarkTime ("end parallel");
    return (ret);

error:
    if (funcs != _funcs)
	DXFree ((Pointer) funcs);
    if (args != _args)
	DXFree ((Pointer) args);
    if (repeats != _repeats)
	DXFree ((Pointer) repeats);
    if (ilist != _ilist)
	DXFree ((Pointer) ilist);
    return (ret);
}


void _dxf_ExPrintTask (EXTask t)
{
    DXMessage ("%08x:  [%08x] (* %08x) (%08x) = [%08x ...] %g",
	     t, t->tg, t->func,
	     t->arg ? t->arg : t->data,
	     t->arg ? * ((int *) t->arg) : * ((int *) t->data),
	     t->work);
}


void _dxf_ExPrintTaskGroup (EXTaskGroup tg)
{
    int		i;

    DXMessage ("%08x:  %08x %08x %2d/%2d %g:%g %c %2d/%s",
	     tg, tg->link, tg->tasks,
	     tg->nused, tg->nalloc,
	     tg->minwork, tg->maxwork,
	     tg->sync ? 'S' : 'A',
	     tg->error, tg->emsg);

    for (i = 0; i < tg->nused; i++)
	_dxf_ExPrintTask (tg->tasks + i);
}


typedef struct
{
    lock_type	done;			/* set to true when job is done	*/
    PFE		func;			/* function to call on processor*/
    Pointer	arg;			/* argument block for the func	*/
    int		size;			/* size of allocated argument	*/
    ErrorCode	code;			/* returned error code		*/
    char	*emsg;			/* returned error message	*/
} _EXROJob, *EXROJob;


static Error ExRunOnWorker (EXROJob job, int n)
{
    ErrorCode	code	= ERROR_NONE;
    char	*emsg	= NULL;

    DXResetError ();
    (* job->func) (job->arg);
    job->code = DXGetError ();
    job->emsg = _dxf_ExCopyString (DXGetErrorMessage ());
    DXunlock(&job->done, exJID);
    return (OK);
}


/*
 * If the size is set to 0 then just pass a pointer as the argument, no
 * need to construct the argument block.
 */

Error _dxf_ExRunOn (int JID, PFE func, Pointer arg, int size)
{
    EXROJob		job	= NULL;
    ErrorCode		ecode;
#if solaris
    int			cnt = 0;
#endif

    DXResetError ();
    if (taskNprocs == 1 || JID == exJID)
	return ((* func) (arg));
    
    job = (EXROJob) DXAllocate (sizeof (_EXROJob));
    if (job == NULL)
	goto error;
    job->func = func;
    job->arg  = size > 0 ? DXAllocate (size) : arg;
    job->size = size > 0 ? size : 0;
    job->code = ERROR_NONE;
    job->emsg = NULL;

    if (job->size > 0)
    {
	if (job->arg == NULL)
	    goto error;
	memcpy (job->arg, arg, size);
    }
    
    if (JID < 0)
	JID = 0;
    if (JID > taskNprocs)
	JID = taskNprocs;

    DXcreate_lock(&job->done, "Job");
    DXlock(&job->done, exJID);

    _dxf_ExRQEnqueue (ExRunOnWorker, (Pointer) job, 1, 0, JID, TRUE);

    DXlock(&job->done, exJID);
    DXunlock(&job->done, exJID);
    DXdestroy_lock(&job->done);

    ecode = job->code;
    if (ecode != ERROR_NONE)
	DXSetError (ecode, job->emsg);
    
    if (job->size > 0)
	DXFree ((Pointer) job->arg);
    DXFree ((Pointer) job->emsg);

    DXFree ((Pointer) job);

    return (ecode == ERROR_NONE ? OK : ERROR);

error:
    if (job && job->size > 0)
	DXFree ((Pointer) job->arg);
    DXFree ((Pointer) job);
    DXErrorReturn (ERROR_INTERNAL, "_dxf_ExRunOn:  can't DXAllocate");
}


Error
_dxf_ExRunOnAll (PFE func, Pointer arg, int size)
{
    int		i;
    Error	ret;

    for (i = 0; i < taskNprocs; i++)
    {
	ret = _dxf_ExRunOn (i + 1, func, arg, size);
	if (ret != OK)
	    break;
    }

    return (ret);
}
