/***********************************************************************/ /* Open Visualization Data Explorer */ /* (C) Copyright IBM Corp. 1989,1999 */ /* ALL RIGHTS RESERVED */ /* This code licensed under the */ /* "IBM PUBLIC LICENSE - Open Visualization Data Explorer" */ /***********************************************************************/ #include #include #include #ifdef DXD_HAS_WINSOCKETS #include #else #include #endif #ifdef DXD_WIN #include #include #else #include #endif #ifndef DXD_HAS_WINSOCKETS #include #endif #if DXD_SOCKET_UNIXDOMAIN_OK #include #endif #ifdef DXD_WIN #include #else #include #endif #ifndef DXD_HAS_WINSOCKETS #include #endif #if HAVE_SYS_FILIO_H #include #endif #include "graph.h" #include "exobject.h" #include "d.h" #include "sysvars.h" #include "cache.h" #include "distp.h" #include "parse.h" #include "config.h" #include "obmodule.h" #include "context.h" #include "_variable.h" #if DXD_NEEDS_SYS_SELECT_H #include #endif typedef struct dmargs { DistMsg type; Pointer data; int to; } dmargs; extern int _dxd_exCacheOn; extern int *_dxd_exKillGraph; extern int *_dxd_exBadFunc; extern LIST(dpgraphstat) _dxd_dpgraphstat; extern LIST(SlavePeers) _dxd_slavepeers; extern LIST(PGassign) _dxd_pgassign; extern EXDictionary _dxd_exMacroDict; extern EXDictionary _dxd_exGlobalDict; extern EXDictionary _dxd_exGraphCache; extern Object _dxfExportBin_FP(Object o, int fd); extern Object _dxfImportBin_FP(int fd); extern void _dxf_ExQMessage(int type, int graphId, int len, char *buf); extern void _dxf_ExSendQEnqueue (DPSendQ *sendq, DPSendPkg *pkg); extern DPSendPkg *_dxf_ExSendQDequeue (DPSendQ *sendq); extern int _dxf_ExSendQEmpty(DPSendQ *sendq); static void ExSendGDictPkg(GDictSend *pkg, int fd); static void ExSendUIPkg(UIMsgPackage *pkg, int fd); static void ExRecvUIPkg(int fd, int swap); static void ExDPSendGvarPkg(DPSendPkg *pkg, int fd); static void ExRecvDPSendPkg(int fd, int swap); static Error ExCheckAcknowledge(SlavePeers *sp); static void ExOKToSend(SlavePeers *sp); static void ExCheckSendReq(SlavePeers *sp); static void DistributeMsgAP(dmargs *argp); static void DistributeMsg(DistMsg type, Pointer data, int to); static LIST(uint32) SavedCacheTags; static int dontSendCacheTags = FALSE; static int nslaves_done; #define RCV_BUF(fd, ptr, n, t, s) \ if(_dxf_ExReceiveBuffer(fd, ptr, n, t, s) < 0) { \ DXUIMessage("ERROR", "Received bad distributed packet"); \ return; \ } int _dxf_ExReceiveBuffer(int fd, Pointer buffer, int ndata, Type t, int swap) { int sts; int already = 0; int size; size = ndata * DXTypeSize(t); do { sts = read(fd, ((char*)buffer) + already, size); already += sts; size -= sts; } while (sts > 0 && size > 0); if(swap) _dxfByteSwap(buffer, buffer, ndata, t); return sts; } int _dxf_ExWriteSock(int fd, Pointer buffer, int size) { int sts; sts = write(fd, (char *)buffer, size); if(sts != size) DXUIMessage("ERROR", "error writing to socket"); return(sts); } static void ReceivePVRequired(int fd, int swap) { int pkt[5]; RCV_BUF(fd, pkt, 5, TYPE_INT, swap); _dxf_ExMarkVarRequired(pkt[0], pkt[1], pkt[2], pkt[3], pkt[4]); } Error _dxf_ExReceivePeerPacket(SlavePeers *sp) { DistMsg type; int b, i, limit, graphId, cause_exec, len; char id[80]; UIPackage pkg; if(_dxf_ExReceiveBuffer(sp->sfd, &type, 1, TYPE_INT, sp->SwapMsg) < 0) { #if 0 DXUIMessage("ERROR", "bad distributed packet type %d", type); return(ERROR); #endif _dxf_ExDie("bad distributed packet type %d", type); } if(type == DPMSG_SIGNATURE || type == DPMSG_SIGNATUREI) { if(_dxd_exDebug) printf("signature %x\n", type); /* if this is peer 0 - master peer - than SwapMsg will already be set */ if(!sp->SwapMsg && type == DPMSG_SIGNATUREI) sp->SwapMsg = TRUE; return(OK); } if(_dxd_exDebug) printf("packet type %d from peer %s\n", type, sp->peername); else ExDebug("9", "packet type %d from peer %s", type, sp->peername); switch(type) { case DM_DPSENDREQ: return(ExCheckAcknowledge(sp)); break; case DM_DPSENDACK: ExOKToSend(sp); break; case DM_DPSEND: ExRecvDPSendPkg(sp->sfd, sp->SwapMsg); /* do we need to send out another request? */ ExCheckSendReq(sp); break; case DM_UIMSG: ExRecvUIPkg(sp->sfd, sp->SwapMsg); break; case DM_UIPCK: if(_dxf_ExReceiveBuffer(sp->sfd, &(pkg.len), 1, TYPE_INT, sp->SwapMsg) < 0) { DXUIMessage("ERROR", "bad packet data"); return(ERROR); } if(_dxf_ExReceiveBuffer(sp->sfd, pkg.data, pkg.len, TYPE_UBYTE, sp->SwapMsg) < 0) { DXUIMessage("ERROR", "bad packet data"); return(ERROR); } if(_dxd_exRemote) _dxf_ExCheckPacket(pkg.data, pkg.len); break; case DM_INSERTGDICT: _dxf_ExRecvGDictPkg(sp->sfd, sp->SwapMsg, 0); break; case DM_INSERTGDICTNB: _dxf_ExRecvGDictPkg(sp->sfd, sp->SwapMsg, 1); break; case DM_INSERTCACHE: _dxf_ExCacheInsertRemoteTag(sp->sfd, sp->SwapMsg); break; case DM_EVICTCACHE: _dxf_ExCacheDeleteRemoteTag(sp->sfd, sp->SwapMsg); break; case DM_EVICTCACHELIST: _dxf_ExCacheListDeleteRemoteTag(sp->sfd, sp->SwapMsg); break; case DM_PVREQUIRED: ReceivePVRequired(sp->sfd, sp->SwapMsg); break; case DM_BADFUNC: *_dxd_exKillGraph = TRUE; *_dxd_exBadFunc = TRUE; break; case DM_KILLEXECGRAPH: if(_dxf_ExReceiveBuffer(sp->sfd, &graphId, 1, TYPE_INT, sp->SwapMsg) < 0) { DXUIMessage("ERROR", "bad packet data"); return(ERROR); } /* don't actually do anything with graphId. Could be * different from current graphId if coming from interrupt * packet from UI. */ *_dxd_exKillGraph = TRUE; break; case DM_VCRREDO: _dxf_ExVCRRedo(); break; default: _dxf_ExDie("bad distributed packet type %d", type); break; } return(OK); } /*---------------------------------------------------*/ /* Common routine to funnel all msgs passed between */ /* the master and slave execs. The size argument is */ /* the size of the data. It can be 0 if all data is */ /* in shared memory and accessable to all processors.*/ /*---------------------------------------------------*/ void _dxf_ExDistributeMsg(DistMsg type, Pointer data, int size, int to) { int procId = 1; if(!_dxd_exRemoteSlave && *_dxd_exNSlaves <= 0) return; if(exJID == procId) DistributeMsg(type, data, to); else { dmargs *argp; argp = (dmargs *) DXAllocate(sizeof(dmargs)); if(argp == NULL) _dxf_ExDie("Could not allocate memory for distributed message"); argp->type = type; if(size > 0) { argp->data = (Pointer)DXAllocate (size); if(argp->data == NULL) _dxf_ExDie("could not allocate memory for distributed msg"); memcpy (argp->data, data, size); } else argp->data = data; argp->to = to; _dxf_ExRunOn (1, DistributeMsgAP, argp, 0); if(size > 0) DXFree(argp->data); DXFree(argp); } } static void DistributeMsgAP(dmargs *argp) { DistributeMsg(argp->type, argp->data, argp->to); } static void DistributeMsg(DistMsg type, Pointer data, int to) { int i, limit; dpgraphstat *index; SlavePeers *spindex; switch(to) { case TOSLAVES: /* entry 0 is for a dummy for the master so skip it */ for (i = 1, limit = SIZE_LIST(_dxd_dpgraphstat); i < limit; ++i) { index = FETCH_LIST(_dxd_dpgraphstat, i); if (index->procfd < 0) continue; _dxf_ExDistMsgfd(type, data, index->procfd); } break; case TOPEERS: if (type == DM_EVICTCACHELIST && dontSendCacheTags) { CacheTagList *ctpkg; ctpkg = (CacheTagList *)data; for (i = 0; i < ctpkg->numtags; ++i) APPEND_LIST(uint32, SavedCacheTags, ctpkg->ct[i]); return; } if(dontSendCacheTags) DXWarning("Message type %d sent in middle of Export", type); for (i = 0, limit = SIZE_LIST(_dxd_slavepeers); i < limit; ++i) { spindex = FETCH_LIST(_dxd_slavepeers, i); if (spindex->sfd < 0 || spindex->peername == NULL) continue; _dxf_ExDistMsgfd(type, data, spindex->sfd); } break; case TOPEER0: spindex = FETCH_LIST(_dxd_slavepeers, 0); _dxf_ExDistMsgfd(type, data, spindex->sfd); break; default: /* TOMASTER */ _dxf_ExDistMsgfd(type, data, _dxd_exMasterfd); } } /* given a process group, return the socket file descriptor to the current * host to which that group is assigned. */ int _dxf_ExGetSocketId(char *procname) { int i, limit; char *hostname; dpgraphstat *index; hostname = _dxf_ExGVariableGetStr(procname); /* entry 0 is for a dummy entry for the master */ for (i = 1, limit = SIZE_LIST(_dxd_dpgraphstat); i < limit; ++i) { index = FETCH_LIST(_dxd_dpgraphstat, i); if (!strcmp(index->prochostname, hostname)) return index->procfd; } return (-1); } void _dxf_ExDistMsgfd(DistMsg type, Pointer data, int tofd) { UIPackage *uipkg; CacheTagList *ctpkg; DelRemote *drpkg; DictType whichdict = 0; int sts = 0; dpversion *dpv; dpslave_id *dpslaveid; char *name; int len; if(tofd < 0) return; if(_dxd_exDebug) printf("sending pkg %d\n", type); else ExDebug("7", "sending pkg %d", type); if(type == DM_VERSION) { /* need to write signature first */ dpv = (dpversion *)data; _dxf_ExWriteSock(tofd, &dpv->signature, sizeof(int)); } _dxf_ExWriteSock(tofd, &type, sizeof(DistMsg)); switch(type) { case DM_PARSETREE: case DM_INSERTMDICT: _dxf_ExWriteTree((struct node *)data, tofd); break; case DM_EXECGRAPH: case DM_KILLEXECGRAPH: _dxf_ExWriteSock(tofd, &data, sizeof(int)); break; case DM_EXIT: _dxf_ExWriteSock(tofd, (int *)data, sizeof(int)); break; case DM_BADFUNC: case DM_GRAPHDONE: break; case DM_INSERTGDICT: case DM_INSERTGDICTNB: ExSendGDictPkg((GDictSend *)data, tofd); break; case DM_UIMSG: ExSendUIPkg((UIMsgPackage *)data, tofd); break; case DM_UIPCK: uipkg = (UIPackage *)data; _dxf_ExWriteSock(tofd, &(uipkg->len), sizeof(int)); _dxf_ExWriteSock(tofd, uipkg->data, sizeof(char)*uipkg->len); break; case DM_DPSENDREQ: _dxf_ExWriteSock(tofd, (int *)data, sizeof(int)); break; case DM_DPSENDACK: break; case DM_DPSEND: ExDPSendGvarPkg((DPSendPkg *)data, tofd); break; case DM_INSERTCACHE: case DM_EVICTCACHE: _dxf_ExWriteSock(tofd, (uint32 *)data, sizeof(uint32)); break; case DM_EVICTCACHELIST: ctpkg = (CacheTagList *)data; _dxf_ExWriteSock(tofd, &(ctpkg->numtags), sizeof(int)); _dxf_ExWriteSock(tofd, ctpkg->ct, ctpkg->numtags * sizeof(uint32)); break; case DM_SLAVEID: dpslaveid = (dpslave_id *)data; _dxf_ExWriteSock(tofd, (int *)&dpslaveid->id, sizeof(int)); _dxf_ExWriteSock(tofd, (int *)&dpslaveid->namelen, sizeof(int)); _dxf_ExWriteSock(tofd, dpslaveid->name, dpslaveid->namelen); break; case DM_VERSION: _dxf_ExWriteSock(tofd, (int *)&dpv->version, sizeof(int)); break; case DM_FLUSHGLOBAL: case DM_FLUSHMACRO: break; case DM_FLUSHCACHE: _dxf_ExWriteSock(tofd, (int *)data, sizeof(int)); break; case DM_FLUSHDICT: if((EXDictionary)data == _dxd_exGlobalDict) whichdict = DICT_GLOBAL; else if((EXDictionary)data == _dxd_exMacroDict) whichdict == DICT_MACRO; else if((EXDictionary)data == _dxd_exGraphCache) whichdict == DICT_GRAPH; else DXUIMessage("ERROR", "flush dictionary: bad dictionary"); _dxf_ExWriteSock(tofd, &whichdict, sizeof(int)); break; case DM_GRAPHDELETE: case DM_GRAPHDELETECONF: case DM_VCRREDO: break; case DM_PERSISTDELETE: drpkg = (DelRemote *)data; _dxf_ExWriteSock(tofd, &drpkg->del_namelen, sizeof(int)); _dxf_ExWriteSock(tofd, drpkg->del_name, drpkg->del_namelen); _dxf_ExWriteSock(tofd, &drpkg->del_instance, sizeof(int)); break; case DM_LOADMDF: _dxf_ExSendMdfPkg(data, tofd); break; case DM_PVREQUIRED: _dxf_ExWriteSock(tofd, (int *)data, 5 * sizeof(int)); break; case DM_DELETEPEER: name = (char *)data; len = strlen(name) + 1; _dxf_ExWriteSock(tofd, &len, sizeof(int)); _dxf_ExWriteSock(tofd, name, len); break; default: DXMessage("sending bad package type %d", type); break; } } Error _dxf_ExCreateGDictPkg(GDictSend *pkg, char *name, EXObj ex_obj) { gvar *gv; /* are we running distributed? */ if(!_dxd_exRemoteSlave && *_dxd_exNSlaves <= 0) return(OK); pkg->varname = name; pkg->class = ex_obj->class; if(pkg->class == EXO_CLASS_GVAR) { gv = (gvar *)ex_obj; pkg->gvp.type = gv->type; pkg->gvp.reccrc = gv->reccrc; pkg->gvp.cost = gv->cost; pkg->gvp.skip = gv->skip; pkg->gvp.disable_cache = gv->disable_cache; pkg->gvp.procId = gv->procId; if(gv->obj) pkg->gvp.hasobj = 1; else pkg->gvp.hasobj = 0; pkg->obj = gv->obj; return(OK); } else return(ERROR); } static void SendSavedCacheTags() { CacheTagList ctpkg; int i; int numsaved = SIZE_LIST(SavedCacheTags); uint32 *sct; while(numsaved > 0) { sct = (uint32 *)SavedCacheTags.vals; ctpkg.numtags = (numsaved > N_CACHETAGLIST_ITEMS) ? N_CACHETAGLIST_ITEMS : numsaved; for(i = 0; i < ctpkg.numtags; i++) ctpkg.ct[i] = sct[i]; _dxf_ExDistributeMsg(DM_EVICTCACHELIST, (Pointer)&ctpkg, sizeof(CacheTagList), TOPEERS); numsaved -= ctpkg.numtags; sct += ctpkg.numtags; } INIT_LIST(SavedCacheTags); } static void ExSendGDictPkg(GDictSend *pkg, int fd) { int len, ret; len = strlen(pkg->varname) + 1; ret = _dxf_ExWriteSock(fd, &len, sizeof(int)); if(ret != sizeof(int)) printf("_dxf_ExWriteSock returned %d instead of 1\n", ret); ret = _dxf_ExWriteSock(fd, pkg->varname, sizeof(char)*len); if(ret != len*sizeof(char)) printf("_dxf_ExWriteSock returned %d instead of %d\n", ret, len); ret = _dxf_ExWriteSock(fd, &(pkg->class), sizeof(exo_class)); if(ret != sizeof(exo_class)) printf("_dxf_ExWriteSock returned %d instead of 1\n", ret); ret = _dxf_ExWriteSock(fd, &(pkg->gvp), sizeof(gvarpkg)); if(ret != sizeof(gvarpkg)) printf("_dxf_ExWriteSock returned %d instead of 1\n", ret); if(pkg->obj) { dontSendCacheTags = TRUE; _dxfExportBin_FP (pkg->obj, fd); dontSendCacheTags = FALSE; SendSavedCacheTags(); } } gvar * ConverttoGvar(gvarpkg *gp, Object obj) { gvar *gv; gv = _dxf_ExCreateGvar (gp->type); if(obj) _dxf_ExDefineGvar (gv, obj); gv->reccrc = gp->reccrc; gv->cost = gp->cost; gv->skip = gp->skip; gv->disable_cache = gp->disable_cache; gv->procId = gp->procId; return(gv); } void _dxf_ExRecvGDictPkg(int fd, int swap, int nobkgrnd) { GDictSend pkg; gvar *gv; int len, ret; RCV_BUF(fd, &len, 1, TYPE_INT, swap); pkg.varname = DXAllocate(len); if(!pkg.varname) _dxf_ExDie("Could not allocate memory for variable name"); RCV_BUF(fd, pkg.varname, len, TYPE_UBYTE, swap); if(_dxd_exDebug) printf("var %s\n", pkg.varname); else ExDebug("7", "var %s", pkg.varname); RCV_BUF(fd, &(pkg.class), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.gvp.type), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.gvp.reccrc), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.gvp.cost), 1, TYPE_DOUBLE, swap); RCV_BUF(fd, &(pkg.gvp.skip), 4, TYPE_INT, swap); pkg.obj = NULL; if(pkg.gvp.hasobj) pkg.obj = _dxfImportBin_FP (fd); if(pkg.class == EXO_CLASS_GVAR) { gv = ConverttoGvar(&(pkg.gvp), pkg.obj); if(pkg.gvp.hasobj && pkg.obj == NULL) { gv->skip = GV_SKIPERROR; DXPrintError(NULL); return; } if(nobkgrnd) _dxf_ExVariableInsertNoBackground(pkg.varname, _dxd_exGlobalDict, (EXObj)gv); else _dxf_ExVariableInsert(pkg.varname, _dxd_exGlobalDict, (EXObj)gv); } DXFree(pkg.varname); } static void ExDPSendGvarPkg(DPSendPkg *pkg, int fd) { int ret; ret = _dxf_ExWriteSock(fd, &(pkg->index), sizeof(int)); if(ret != sizeof(int)) printf("_dxf_ExWriteSock returned %d instead of %d\n",ret,sizeof(int)); ret = _dxf_ExWriteSock(fd, &(pkg->excache), sizeof(int)); if(ret != sizeof(int)) printf("_dxf_ExWriteSock returned %d instead of %d\n",ret,sizeof(int)); ret = _dxf_ExWriteSock(fd, &(pkg->gvp), sizeof(gvarpkg)); if(ret != sizeof(gvarpkg)) printf("_dxf_ExWriteSock returned %d instead of %d\n", ret, sizeof(gvarpkg)); if(pkg->obj) { dontSendCacheTags = TRUE; _dxfExportBin_FP (pkg->obj, fd); dontSendCacheTags = FALSE; SendSavedCacheTags(); } } static void ExRecvDPSendPkg(int fd, int swap) { DPSendPkg pkg; int ret; gvar *gv; ProgramVariable *pv; RCV_BUF(fd, &(pkg.index), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.excache), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.gvp.type), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.gvp.reccrc), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.gvp.cost), 1, TYPE_DOUBLE, swap); RCV_BUF(fd, &(pkg.gvp.skip), 4, TYPE_INT, swap); pkg.obj = NULL; if(pkg.gvp.hasobj) pkg.obj = _dxfImportBin_FP (fd); pv = FETCH_LIST(_dxd_exContext->program->vars, pkg.index); if(pv == NULL) { *_dxd_exKillGraph = TRUE; DXUIMessage("ERROR", "Missing variable in program"); return; } if(pv->gv == NULL) pv->gv = ConverttoGvar(&(pkg.gvp), pkg.obj); DXDebug("1", "received index %d 0x%08x 0x%08x\n", pkg.index, pv->gv->reccrc, pkg.gvp.reccrc); if ( pv->gv->reccrc != pkg.gvp.reccrc) { DXUIMessage("ERROR", "%s: bad packet data -- recipe", _dxd_exHostName); printf("%x, %x\n", pv->gv->reccrc, pkg.gvp.reccrc); pv->gv->reccrc = pkg.gvp.reccrc; } pv->gv->cost = pkg.gvp.cost; pv->gv->skip = pkg.gvp.skip; pv->gv->disable_cache = pkg.gvp.disable_cache; pv->gv->procId = pkg.gvp.procId; if(pkg.gvp.hasobj && pkg.obj == NULL) { pv->gv->skip = GV_SKIPERROR; DXPrintError(NULL); return; } _dxf_ExDefineGvar(pv->gv, pkg.obj); if(_dxd_exCacheOn && pkg.excache && !pkg.gvp.disable_cache && !pkg.gvp.skip) _dxf_ExCacheInsert (pv->gv); } void _dxf_ExSendDict(int fd, EXDictionary dict) { char *key; _dxf_ExDictionaryBeginIterate (dict); if(dict == _dxd_exMacroDict) { node *n; while((n = (node *) _dxf_ExDictionaryIterate (_dxd_exMacroDict, &key))) { /* don't send over normal modules, only macros * or modules which were loaded at runtime. */ if(n->type == NT_MACRO) _dxf_ExDistMsgfd(DM_INSERTMDICT, (Pointer)n, fd); /* this the change for sending over outboard modules. nsc 21nov93 */ else if (n->type == NT_MODULE && (n->v.module.flags & RUNTIMELOAD)) _dxf_ExDistMsgfd(DM_INSERTMDICT, (Pointer)n, fd); } } else if(dict == _dxd_exGlobalDict) { EXObj obj; GDictSend pkg; while((obj = _dxf_ExDictionaryIterate (_dxd_exGlobalDict, &key))) { _dxf_ExCreateGDictPkg(&pkg, key, obj); _dxf_ExDistMsgfd(DM_INSERTGDICT, (Pointer)&pkg, fd); } } _dxf_ExDictionaryEndIterate (dict); } static void ExSendUIPkg(UIMsgPackage *pkg, int fd) { _dxf_ExWriteSock(fd, &(pkg->ptype), sizeof(int)); _dxf_ExWriteSock(fd, &(pkg->graphId), sizeof(int)); _dxf_ExWriteSock(fd, &(pkg->len), sizeof(int)); _dxf_ExWriteSock(fd, pkg->data, sizeof(char)*pkg->len); } static void ExRecvUIPkg(int fd, int swap) { UIMsgPackage pkg; RCV_BUF(fd, &(pkg.ptype), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.graphId), 1, TYPE_INT, swap); RCV_BUF(fd, &(pkg.len), 1, TYPE_INT, swap); if(pkg.len > MSG_BUFLEN) { DXUIMessage("ERROR", "UI packet too big"); return; } RCV_BUF(fd, pkg.data, pkg.len, TYPE_UBYTE, swap); _dxf_ExQMessage(pkg.ptype, pkg.graphId, pkg.len, pkg.data); } void _dxf_ResetSlavesDone() { nslaves_done = 0; } void _dxf_SlaveDone() { nslaves_done++; } void _dxf_ExWaitOnSlaves() { fd_set fdset; int nslaves, limit, k, nselect; dpgraphstat *index; DistMsg pcktype; int maxfd; int b, ret; SlavePeers *sp; int repeat_loop; ExDebug("1", "Waiting on slaves"); /* make sure all outstanding send requests are handled */ for(k = 0; k < SIZE_LIST(_dxd_slavepeers); k++) { sp = FETCH_LIST(_dxd_slavepeers, k); while(sp->sendq.head != NULL) _dxf_ExCheckRIH(); } repeat_loop = FALSE; wait: nslaves = 0; maxfd = 0; FD_ZERO (&fdset); limit = SIZE_LIST(_dxd_dpgraphstat); /* entry 0 is a dummy for the master */ for (k = 1; k < limit; ++k) { index = FETCH_LIST(_dxd_dpgraphstat, k); if(index->procfd > 0 && (index->numpgrps > 0)) { nslaves++; FD_SET(index->procfd, &fdset); if(index->procfd > maxfd) maxfd = index->procfd; } } if(nslaves > 0) ExDebug("7", "waiting on %d slaves", nslaves); while(nslaves_done < nslaves) { _dxf_ExCheckRIH (); nselect = select (maxfd + 1, (SelectPtr) &fdset, NULL, NULL, NULL); if(nselect > 0) { /* find out who is sending messages */ for (k = 0; k < limit; ++k) { index = FETCH_LIST(_dxd_dpgraphstat, k); if(index->procfd > 0 && (index->numpgrps > 0)) { if (FD_ISSET(index->procfd, &fdset)) { if((IOCTL(index->procfd, FIONREAD, (char *)&b) < 0) || (b <= 0)) { ExDebug("7", "EOF from slave socket"); _dxf_SlaveDone(); close(index->procfd); /* If we have to, wait for the child. */ if (!_dxd_exDebugConnect) wait(0); index->procfd = -1; /* send error message? * FIX - remove from table and send a message? */ continue; } if(_dxf_ExReceiveBuffer(index->procfd, &pcktype, 1, TYPE_INT, index->SwapMsg) < 0) DXUIMessage("ERROR", "bad distributed packet type"); if(_dxd_exDebug) printf("packet type %d from %s fd %d\n", pcktype, index->prochostname, index->procfd); else ExDebug("7", "packet type %d from %s fd %d", pcktype, index->prochostname, index->procfd); switch(pcktype) { case DM_GRAPHDONE: _dxf_SlaveDone(); break; case DM_GRAPHDELETECONF: _dxf_SlaveDone(); break; default: _dxf_ExDie("bad message type %d", pcktype); /* Maybe this is too severe? */ /* FIX - better way to recover */ } /* switch */ } /* ISSET */ } /* valid fd */ } /* for k */ } /* nselect > 0 */ FD_ZERO (&fdset); for (k = 0; k < limit; ++k) { index = FETCH_LIST(_dxd_dpgraphstat, k); if(index->procfd >= 0 && (index->numpgrps > 0)) FD_SET(index->procfd, &fdset); } } /* while not done */ if(repeat_loop == TRUE) return; while((ret = _dxf_ExCheckRIH())); if(nslaves > 0) ExDebug("7", "all slaves are done with graph"); if(_dxf_NewDPTableEntry()) { ExDebug("*7", "Run _dxf_ExUpdateDPTable on processor 1"); /* Since WaitOnSlaves is run on processor 0 we don't need to use * RunOn to call _dxf_ExUpdateDPTable * _dxf_ExRunOn (1, _dxf_ExUpdateDPTable, NULL, 0); */ _dxf_ExUpdateDPTable(); } _dxf_ExDistributeMsg(DM_GRAPHDELETE, NULL, 0, TOSLAVES); repeat_loop = TRUE; _dxf_ResetSlavesDone(); goto wait; } void _dxf_ExWaitForPeers() { fd_set fdset; int npeers, npeers_done, limit, k, nselect; SlavePeers *sp; DistMsg pcktype; int maxfd; int b, ret; npeers = npeers_done = 0; maxfd = 0; FD_ZERO (&fdset); limit = SIZE_LIST(_dxd_slavepeers); /* don't wait for peer0 which is the master */ for (k = 1; k < limit; ++k) { sp = FETCH_LIST(_dxd_slavepeers, k); if(sp->sfd > 0) { npeers++; FD_SET(sp->sfd, &fdset); if(sp->sfd > maxfd) maxfd = sp->sfd; } } if(npeers > 0) if(_dxd_exDebug) printf("waiting for %d peers to close\n", npeers); else ExDebug("7", "waiting for %d peers", npeers); while(npeers_done < npeers) { nselect = select (maxfd + 1, (SelectPtr) &fdset, NULL, NULL, NULL); if(nselect > 0) { /* find out who is sending messages */ for (k = 1; k < limit; ++k) { sp = FETCH_LIST(_dxd_slavepeers, k); if(sp->sfd > 0) { if (FD_ISSET(sp->sfd, &fdset)) { if((IOCTL(sp->sfd, FIONREAD, (char *)&b) < 0) || (b <= 0)) { ExDebug("7", "EOF from peer socket"); npeers_done++; close(sp->sfd); sp->sfd = -99; continue; } if(_dxf_ExReceiveBuffer(sp->sfd, &pcktype, 1, TYPE_INT, sp->SwapMsg) < 0) DXUIMessage("ERROR", "bad peer packet type"); if(_dxd_exDebug) printf("packet type %d from peer %s fd %d\n", pcktype, sp->peername, sp->sfd); else ExDebug("7", "packet type %d from peer %s fd %d", pcktype, sp->peername, sp->sfd); switch(pcktype) { default: _dxf_ExDie("bad message type %d", pcktype); /* Maybe this is too severe? */ /* FIX - better way to recover */ } /* switch */ } /* ISSET */ } /* valid fd */ } /* for k */ } /* nselect > 0 */ FD_ZERO (&fdset); for (k = 1; k < limit; ++k) { sp = FETCH_LIST(_dxd_slavepeers, k); if(sp->sfd >= 0) FD_SET(sp->sfd, &fdset); } } /* while not done */ } void _dxf_ExSendPVRequired(int gid, int sgid, int funcId, int varId, int requiredFlag) { int pkt[5]; pkt[0] = gid; pkt[1] = sgid; pkt[2] = funcId; pkt[3] = varId; pkt[4] = requiredFlag; _dxf_ExDistributeMsg(DM_PVREQUIRED, (Pointer)pkt, 5 * sizeof(int), TOPEERS); } int _dxf_ExValidGroupAttach(char *groupname) { PGassign *pgindex; int j, limit; if(groupname == NULL) return(TRUE); for (j = 0, limit = SIZE_LIST(_dxd_pgassign); j < limit; ++j) { pgindex = FETCH_LIST(_dxd_pgassign, j); if(!strcmp(pgindex->pgname, groupname)) { dpgraphstat *hostentry; hostentry = FETCH_LIST(_dxd_dpgraphstat, pgindex->hostindex); if(hostentry && hostentry->error) { DXUIMessage("ERROR", "Group %s can not execute on host with failed connection.", pgindex->pgname); return(FALSE); } else return(TRUE); } } /* if group isn't in table default to running on the master, * in which case this group has a valid connection. */ return(TRUE); } /* do not send single constant assignments to remote slaves */ void _dxf_ExSendParseTree(node *n) { node *n2 = n; switch(n2->type) { case NT_PACKET: switch(n2->v.packet.type) { case PK_INTERRUPT: case PK_LINQ: case PK_SINQ: case PK_VINQ: case PK_IMPORT: case PK_MACRO: case PK_BACKGROUND: _dxf_ExDistributeMsg(DM_PARSETREE, (Pointer)n, 0, TOSLAVES); return; break; case PK_SYSTEM: case PK_FOREGROUND: if(n2->v.packet.packet == NULL) return; n2 = n2->v.packet.packet; if(n2->type != NT_ASSIGNMENT) { _dxf_ExDistributeMsg(DM_PARSETREE, (Pointer)n, 0, TOSLAVES); return; } /* fall through and check for assignment */ } case NT_ASSIGNMENT: n2 = n2->v.assign.rval; if(n2->type == NT_CONSTANT || n2->type == NT_ID) return; /* no break, fall through to next case */ default: _dxf_ExDistributeMsg(DM_PARSETREE, (Pointer)n, 0, TOSLAVES); } } void _dxf_ExReqDPSend(ProgramVariable *pv, int varindex, SlavePeers *sp) { DPSendPkg *pkg; pkg = (DPSendPkg *)DXAllocate(sizeof(DPSendPkg)); if(pkg == NULL) _dxf_ExDie("Can not allocate memory for DPSend Packet"); pkg->index = varindex; pkg->excache = pv->excache; pkg->gvp.type = pv->gv->type; pkg->gvp.reccrc = pv->gv->reccrc; pkg->gvp.cost = pv->gv->cost; pkg->gvp.skip = pv->gv->skip; pkg->gvp.disable_cache = pv->gv->disable_cache; pkg->gvp.procId = pv->gv->procId; if(pv->gv->obj) pkg->gvp.hasobj = 1; else pkg->gvp.hasobj = 0; pkg->obj = pv->gv->obj; _dxf_ExSendQEnqueue(&(sp->sendq), pkg); if(!sp->wait_on_ack) { sp->wait_on_ack = TRUE; _dxf_ExDistMsgfd(DM_DPSENDREQ, &(_dxd_exSlaveId), sp->sfd); } } static Error ExCheckAcknowledge(SlavePeers *sp) { int rslave_id; if(_dxf_ExReceiveBuffer(sp->sfd, &rslave_id, 1, TYPE_INT, sp->SwapMsg) < 0) { DXUIMessage("ERROR", "dpsend request packet data"); return(ERROR); } /* if we have no outstanding requests or their id is less than ours * then we can send an acknowledge. If two peers have outstanding * requests the peer with the lowest slaveId wins. */ if(!sp->wait_on_ack || rslave_id < _dxd_exSlaveId) _dxf_ExDistMsgfd(DM_DPSENDACK, NULL, sp->sfd); else { /* remember we have an outstanding request that is not acked */ sp->pending_req = TRUE; if(_dxd_exDebug) printf("pending request from peer %s\n", sp->peername); else ExDebug("9", "pending request from peer %s", sp->peername); } return(OK); } static void ExOKToSend(SlavePeers *sp) { DPSendPkg *pkg; pkg = _dxf_ExSendQDequeue(&(sp->sendq)); if(pkg != NULL) { _dxf_ExDistMsgfd(DM_DPSEND, (Pointer)pkg, sp->sfd); DXFree(pkg); sp->wait_on_ack = FALSE; if(sp->pending_req) { sp->pending_req = FALSE; _dxf_ExDistMsgfd(DM_DPSENDACK, NULL, sp->sfd); } if(!_dxf_ExSendQEmpty(&(sp->sendq))) { _dxf_ExDistMsgfd(DM_DPSENDREQ, &(_dxd_exSlaveId), sp->sfd); sp->wait_on_ack = TRUE; } } else DXUIMessage("ERROR", "No DPSend Package"); } static void ExCheckSendReq(SlavePeers *sp) { if(!sp->wait_on_ack && !sp->pending_req && !_dxf_ExSendQEmpty(&(sp->sendq))) { _dxf_ExDistMsgfd(DM_DPSENDREQ, &(_dxd_exSlaveId), sp->sfd); sp->wait_on_ack = TRUE; } }