/***********************************************************************/ /* Open Visualization Data Explorer */ /* (C) Copyright IBM Corp. 1989,1999 */ /* ALL RIGHTS RESERVED */ /* This code licensed under the */ /* "IBM PUBLIC LICENSE - Open Visualization Data Explorer" */ /***********************************************************************/ #include #include #include #ifdef DXD_HAS_WINSOCKETS #include #else #include #endif #ifdef DXD_WIN #include #include #else #include #include #endif #if DXD_SOCKET_UNIXDOMAIN_OK #include #endif #ifdef DXD_WIN #include #else #include #endif #ifndef DXD_HAS_WINSOCKETS #include #endif #include "sysvars.h" #include "distp.h" #include "config.h" #include "context.h" #include "obmodule.h" #if DXD_NEEDS_SYS_SELECT_H #include #endif #if defined(HAVE_SYS_FILIO_H) #include #endif static slave_id = 1; /* We start at 1 because master will be slave 0 */ static Error ConnectPStoS(dpgraphstat *index, dpgraphstat *index2); static Error ConnectPMtoS(dpgraphstat *index); extern LIST(dpgraphstat) _dxd_dpgraphstat; extern LIST(SlavePeers) _dxd_slavepeers; extern EXDictionary _dxd_exMacroDict; extern EXDictionary _dxd_exGlobalDict; extern int _dxfExRemoteExec(int connectd, char *host, char *ruser, int r_argc, char **r_argv, int outboard); extern void _dxf_ExDeleteHost(char *host, int err, int closepeer); extern void _dxf_ExDeletePeer(SlavePeers *sp, int closepeer); extern void _dxf_ExAddPeer(SlavePeers *sp); static int ExNextSlaveId() { int next; next = slave_id; slave_id++; return(next); } static Error ExSendModuleInputHandler(int fd, Pointer arg) { SlavePeers *sp = NULL; int b, i, limit; for (i = 0, limit = SIZE_LIST(_dxd_slavepeers); i < limit; ++i) { sp = FETCH_LIST(_dxd_slavepeers, i); if (sp->sfd == fd) break; } if(sp == NULL) { DXUIMessage("ERROR", "Input Handler can find peer entry in table.\n"); return(ERROR); } if ((IOCTL(fd, FIONREAD, (char *)&b) < 0) || (b <= 0)) { if(sp->peername == NULL) { /* this was marked for delete */ _dxf_ExDeletePeer(sp, 1); return(ERROR); } _dxf_ExExecCommandStr ("kill"); if(_dxd_exContext->program) _dxd_exContext->program->runable = 0; if(_dxd_exDebug) printf("shut down socket for peer %s\n", sp->peername); /* if I'm the master, close the master/slave connection */ if(!_dxd_exRemoteSlave) _dxf_ExDeleteHost(sp->peername, 1, 1); else _dxf_ExDeletePeer(sp, 1); return(ERROR); } return(_dxf_ExReceivePeerPacket(sp)); } void _dxf_ExUpdateDPTable() { int i, j,k,limit; dpgraphstat *index, *index2; char *opt; char ** t; char **av = NULL; int ac = 1; DistMsg type; int addedhost = FALSE; int init_av = TRUE; dpversion dpv = {DPMSG_SIGNATURE, DPMSG_VERSION}; dpslave_id dpslaveid; ExDebug("*1", "Checking distributed host table"); /*----------------------------------------------------------------------*/ /* start up slaves if they have not already been started */ /* start at 1 since the zeroth entry is a dummy for the master. */ /*----------------------------------------------------------------------*/ for (k = 1; k < SIZE_LIST(_dxd_dpgraphstat); ++k) { index = FETCH_LIST(_dxd_dpgraphstat, k); if(index->procfd < 0 && (index->numpgrps > 0) && (!index->error)) { if(init_av) { /* Make an argc/argv pair for dx command and options */ av = (char **)DXAllocate(2 * sizeof (char*)); if (av == NULL) { goto error_exit; } av[0] = (char *)DXAllocate(3 * sizeof (char)); if (av[0] == NULL) goto error_exit; strcpy(av[0], "dx"); init_av = FALSE; } opt = index->options; if(opt == NULL) ac = 1; else { for (ac = 1; *opt; ++ac) { t = (char**)DXReAllocate((char*)av,(ac + 1)*sizeof (char*)); if (t == NULL) { goto error_exit; } else av = t; opt += strspn(opt, " \t"); for (i = 0; opt[i] != ' ' && opt[i] != '\0'; ++i) ; av[ac] = (char *)DXAllocate((i + 1)*sizeof (char)); if (av[ac] == NULL) goto error_exit; for (i = 0; *opt != ' ' && *opt != '\0'; ++i, ++opt) av[ac][i] = *opt; av[ac][i] = '\0'; } } index->procfd = _dxfExRemoteExec (_dxd_exDebugConnect, index->prochostname, NULL, ac, av, 0); if(index->procfd < 0) { DXUIMessage("ERROR", "Connection to peer %s failed.\n", index->prochostname); index->error = TRUE; index->numpgrps = 0; continue; } (*_dxd_exNSlaves)++; addedhost = TRUE; _dxf_ExDistMsgfd(DM_VERSION, (Pointer)&dpv, index->procfd); dpslaveid.id = index->SlaveId = ExNextSlaveId(); dpslaveid.name = index->prochostname; dpslaveid.namelen = strlen(dpslaveid.name) + 1; /* Tell new slave it's slaveid */ _dxf_ExDistMsgfd(DM_SLAVEID, (Pointer)&(dpslaveid), index->procfd); /* send state of global dictionary */ _dxf_ExSendDict(index->procfd, _dxd_exGlobalDict); /* send state of macro definitions */ _dxf_ExSendDict(index->procfd, _dxd_exMacroDict); /* add master as first peer */ ConnectPMtoS(index); for(j = 1; j < SIZE_LIST(_dxd_dpgraphstat); j++) { if(j == k) continue; index2 = FETCH_LIST(_dxd_dpgraphstat, j); if(index2->procfd >= 0) ConnectPStoS(index,index2); } DXMessage("Connected to %s\n", index->prochostname); } } /* flush non-permanent cache entries */ if(addedhost) _dxf_ExCacheFlush(FALSE); if (av) { for (i = 0; i < ac; ++i) if (av[i] != NULL) { ExDebug("*1", "freeing %d %x", i, av[i]); DXFree (av[i]); } ExDebug("*1", "freeing av %x", av); DXFree (av); } return; error_exit: _dxf_ExDie("Could not allocate memory for argument list"); } /* Connect peers master to slave */ static Error ConnectPMtoS(dpgraphstat *index) { DistMsg type; int dxport, nselect; int dxsock; #if DXD_SOCKET_UNIXDOMAIN_OK int dxusock; #endif int dxfd = -1; int len, slaveid, k, limit; struct sockaddr_in dxserver; #if DXD_SOCKET_UNIXDOMAIN_OK struct sockaddr_un dxuserver; #endif SlavePeers spentry; fd_set fdset; int fd; if(_dxd_exDebug) printf("Connecting %s to %s\n", _dxd_exHostName, index->prochostname); #if DXD_SOCKET_UNIXDOMAIN_OK dxport = _dxfSetupServer(OBPORT, &dxsock, &dxserver, &dxusock, &dxuserver); #else dxport = _dxfSetupServer(OBPORT, &dxsock, &dxserver); #endif if (dxport < 0) { DXUIMessage("ERROR", "Failed connecting %s to %s. Bad port number", _dxd_exHostName, index->prochostname); return(ERROR); } /* tell slave to connect */ type = DM_SCONNECT; write(index->procfd, &type, sizeof(DistMsg)); len = strlen(_dxd_exHostName) + 1; write(index->procfd, &len, sizeof(int)); write(index->procfd, _dxd_exHostName, sizeof(char)*len); write(index->procfd, &dxport, sizeof(int)); slaveid = 0; /* this slave is the master */ write(index->procfd, &slaveid, sizeof(int)); /* select becomes os2_select in arch.h for os2 */ FD_ZERO (&fdset); FD_SET(index->procfd, &fdset); nselect = select (index->procfd + 1, (SelectPtr) &fdset, NULL,NULL,NULL); if(nselect > 0) { if(_dxf_ExReceiveBuffer(index->procfd, &type, 1, TYPE_INT, 0) < 0) { DXUIMessage("ERROR", "bad distributed packet type"); return(ERROR); } if(type == DPMSG_SIGNATURE || type == DPMSG_SIGNATUREI) { if(type == DPMSG_SIGNATUREI) index->SwapMsg = TRUE; else index->SwapMsg = FALSE; if(_dxf_ExReceiveBuffer(index->procfd, &type, 1, TYPE_INT, index->SwapMsg) < 0) { DXUIMessage("ERROR", "bad distributed packet type"); return(ERROR); } } if(type == DM_SACCEPT) { #if DXD_SOCKET_UNIXDOMAIN_OK /* call _dxfCompleteServer and set a timeout on connect */ dxfd = _dxfCompleteServer(dxsock, dxserver, dxusock, dxuserver, 1); #else /* call _dxfCompleteServer and set a timeout on connect */ dxfd = _dxfCompleteServer(dxsock, dxserver, 1); #endif if (dxfd < 0) { DXUIMessage("ERROR", "Failed connecting %s to %s\n", _dxd_exHostName, index->prochostname); return(ERROR); } len = strlen(index->prochostname) + 1; spentry.peername = (char *)DXAllocateLocal(len); if(!spentry.peername) _dxf_ExDie("Cannot allocate memory for peername"); strcpy(spentry.peername, index->prochostname); spentry.SlaveId = index->SlaveId; spentry.sfd = dxfd; spentry.SwapMsg = index->SwapMsg; _dxf_ExSendQInit(&(spentry.sendq)); spentry.wait_on_ack = FALSE; spentry.pending_req = FALSE; _dxf_ExAddPeer(&spentry); DXRegisterInputHandler(ExSendModuleInputHandler, dxfd, NULL); if(_dxd_exDebug) printf("Done Connect Master to Slave peer at port %d\n", dxport); return(OK); } } DXUIMessage("ERROR", "Failed connecting %s to %s", _dxd_exHostName, spentry.peername); return(ERROR); } /* Connect peers slave to slave */ static Error ConnectPStoS(dpgraphstat *index, dpgraphstat *index2) { int len, port, nselect; DistMsg type; fd_set fdset; if(_dxd_exDebug) printf("Connecting %s to %s\n", index->prochostname, index2->prochostname); type = DM_SLISTEN; /* give first slave hostname of slave to connect to */ write(index->procfd, &type, sizeof(DistMsg)); len = strlen(index2->prochostname) + 1; write(index->procfd, &len, sizeof(int)); write(index->procfd, index2->prochostname, sizeof(char)*len); write(index->procfd, &(index2->SlaveId), sizeof(int)); /* wait for a port to accept from */ FD_ZERO (&fdset); FD_SET(index->procfd, &fdset); nselect = select (index->procfd + 1, (SelectPtr) &fdset, NULL,NULL,NULL); if(nselect > 0) { if(_dxf_ExReceiveBuffer(index->procfd, &type, 1, TYPE_INT, index->SwapMsg) < 0) { DXUIMessage("ERROR", "bad distributed packet type"); return(ERROR); } if(type == DM_SCONNECTPORT) { if(_dxf_ExReceiveBuffer(index->procfd, &port, 1, TYPE_INT, index->SwapMsg) < 0) { DXUIMessage("ERROR", "bad port number"); return(ERROR); } /* tell other slave to connect */ type = DM_SCONNECT; write(index2->procfd, &type, sizeof(DistMsg)); len = strlen(index->prochostname) + 1; write(index2->procfd, &len, sizeof(int)); write(index2->procfd, index->prochostname, sizeof(char)*len); write(index2->procfd, &port, sizeof(int)); write(index2->procfd, &(index->SlaveId), sizeof(int)); FD_ZERO (&fdset); FD_SET(index2->procfd, &fdset); nselect = select (index2->procfd + 1, (SelectPtr) &fdset, NULL,NULL,NULL); if(nselect > 0) { /* when second slave has connected tell first slave to accept */ if(_dxf_ExReceiveBuffer(index2->procfd, &type, 1, TYPE_INT, index2->SwapMsg) < 0) { DXUIMessage("ERROR", "bad distributed packet type"); return(ERROR); } if(type == DM_GRAPHDONE) { _dxf_SlaveDone(); FD_ZERO (&fdset); FD_SET(index2->procfd, &fdset); nselect = select (index2->procfd + 1, (SelectPtr) &fdset, NULL,NULL,NULL); if(nselect <= 0) goto error_return; if(_dxf_ExReceiveBuffer(index2->procfd, &type, 1, TYPE_INT, index2->SwapMsg) < 0) { DXUIMessage("ERROR", "bad distributed packet type"); return(ERROR); } } if(type == DM_SACCEPT) { write(index->procfd, &type, sizeof(DistMsg)); if(_dxd_exDebug) { printf("connected %s to %s at port %d\n", index->prochostname, index2->prochostname, port); printf("Done Connect\n"); } return(OK); } else { DXUIMessage("ERROR", "Failed connecting %s to %s", index->prochostname, index2->prochostname); DXMessage("Peer did not accept connection, packet type returned %d\n", type); return(ERROR); } } /* select */ } else { error_return: DXUIMessage("ERROR", "Failed connecting %s to %s", index->prochostname, index2->prochostname); return(ERROR); } } /* select */ if(_dxd_exDebug) { printf("connected %s to %s at port %d\n", index->prochostname, index2->prochostname, port); printf("Done Connect\n"); } } Error _dxf_ExSlaveListen() { DistMsg msgtype; int dxport; int dxsock; #if DXD_SOCKET_UNIXDOMAIN_OK int dxusock; #endif int dxfd = -1; int len; struct sockaddr_in dxserver; #if DXD_SOCKET_UNIXDOMAIN_OK struct sockaddr_un dxuserver; #endif SlavePeers spentry; int k, limit; if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, &len, 1, TYPE_INT, _dxd_exSwapMsg) < 0) _dxf_ExDie("%s: bad packet data", _dxd_exHostName); spentry.peername = (char *)DXAllocateLocal(len); if(!spentry.peername) _dxf_ExDie("%s: could not allocate memory", _dxd_exHostName); spentry.SwapMsg = FALSE; if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, spentry.peername, len, TYPE_UBYTE, _dxd_exSwapMsg) < 0) _dxf_ExDie("%s: bad packet data", _dxd_exHostName); if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, &(spentry.SlaveId), 1, TYPE_INT, _dxd_exSwapMsg) < 0) _dxf_ExDie("%s: bad packet data", _dxd_exHostName); #if DXD_SOCKET_UNIXDOMAIN_OK dxport = _dxfSetupServer(OBPORT, &dxsock, &dxserver, &dxusock, &dxuserver); #else dxport = _dxfSetupServer(OBPORT, &dxsock, &dxserver); #endif if (dxport < 0) { DXFree(spentry.peername); msgtype = DM_CONNECTERROR; write(_dxd_exMasterfd, &msgtype, sizeof(DistMsg)); return(ERROR); } msgtype = DM_SCONNECTPORT; write(_dxd_exMasterfd, &msgtype, sizeof(DistMsg)); write(_dxd_exMasterfd, &dxport, sizeof(int)); /* wait for accept */ if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, &msgtype, 1, TYPE_INT, _dxd_exSwapMsg) < 0) { DXUIMessage("ERROR", "%s: bad distribute packet type", _dxd_exHostName); return(ERROR); } if(msgtype == DM_SACCEPT) { #if DXD_SOCKET_UNIXDOMAIN_OK /* call _dxfCompleteServer and set a timeout on connect */ dxfd = _dxfCompleteServer(dxsock, dxserver, dxusock, dxuserver, 1); #else /* call _dxfCompleteServer and set a timeout on connect */ dxfd = _dxfCompleteServer(dxsock, dxserver, 1); #endif if (dxfd < 0) { DXUIMessage("ERROR","Connection to %s failed.\n", spentry.peername); DXFree(spentry.peername); return(ERROR); } spentry.sfd = dxfd; _dxf_ExSendQInit(&(spentry.sendq)); spentry.wait_on_ack = FALSE; spentry.pending_req = FALSE; _dxf_ExAddPeer(&spentry); DXRegisterInputHandler(ExSendModuleInputHandler, dxfd, NULL); /* send signature to peer, this must be the first msg that */ /* the peer gets to determine if it needs to swap bytes */ msgtype = DPMSG_SIGNATURE; write(dxfd, &msgtype, sizeof(int)); } else { DXUIMessage("ERROR", "Connect to %s failed.", spentry.peername); DXFree(spentry.peername); return(ERROR); } return(OK); } Error _dxf_ExSlaveConnect() { int len, port, fd; SlavePeers spentry; int k, limit; DistMsg msgtype = DM_SACCEPT; spentry.peername = NULL; spentry.SwapMsg = FALSE; if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, &len, 1, TYPE_INT, _dxd_exSwapMsg) < 0) _dxf_ExDie("%s: bad packet data", _dxd_exHostName); spentry.peername = (char *)DXAllocateLocal(len); if(!spentry.peername) _dxf_ExDie("%s: couldn't allocate memory", _dxd_exHostName); if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, spentry.peername, len, TYPE_UBYTE, _dxd_exSwapMsg) < 0) _dxf_ExDie("%s: bad packet data", _dxd_exHostName); if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, &port, 1, TYPE_INT, _dxd_exSwapMsg) < 0) _dxf_ExDie("%s: bad packet data", _dxd_exHostName); if(_dxf_ExReceiveBuffer(_dxd_exMasterfd, &(spentry.SlaveId),1, TYPE_INT, _dxd_exSwapMsg) < 0) _dxf_ExDie("%s: bad packet data", _dxd_exHostName); /* connecting as peer to master */ if(spentry.SlaveId == 0) { msgtype = DPMSG_SIGNATURE; write(_dxd_exMasterfd, &msgtype, sizeof(int)); spentry.SwapMsg = _dxd_exSwapMsg; } fd = DXConnectToServer(spentry.peername, port); if(fd > 0) { spentry.sfd = fd; _dxf_ExSendQInit(&(spentry.sendq)); spentry.wait_on_ack = FALSE; spentry.pending_req = FALSE; _dxf_ExAddPeer(&spentry); DXRegisterInputHandler(ExSendModuleInputHandler, fd, NULL); msgtype = DM_SACCEPT; write(_dxd_exMasterfd, &msgtype, sizeof(DistMsg)); /* send signature to peer, this must be the first msg that */ /* the peer gets to determine if it needs to swap bytes */ msgtype = DPMSG_SIGNATURE; write(fd, &msgtype, sizeof(int)); /* send MP license failed message now that it's safe to use DXMessage */ #ifdef DXD_LICENSED_VERSION if(spentry.SlaveId == 0) { ExLicenseFinish(); } #endif return(OK); } DXFree(spentry.peername); msgtype = DM_CONNECTERROR; write(_dxd_exMasterfd, &msgtype, sizeof(DistMsg)); return(ERROR); } extern int _dxf_SuspendPeers() { int i, ilimit; SlavePeers *sp; for (i = 0, ilimit = SIZE_LIST(_dxd_slavepeers); i < ilimit; ++i) { sp = FETCH_LIST(_dxd_slavepeers, i); if(sp->sfd == -99) continue; DXRegisterInputHandler(NULL, sp->sfd, NULL); } return OK; } extern int _dxf_ResumePeers() { int i, ilimit; SlavePeers *sp; for (i = 0, ilimit = SIZE_LIST(_dxd_slavepeers); i < ilimit; ++i) { sp = FETCH_LIST(_dxd_slavepeers, i); if(sp->sfd == -99) continue; DXRegisterInputHandler(ExSendModuleInputHandler, sp->sfd, NULL); } return OK; }