/* A partial MPI implementation for QCDOC */ /* Michael Creutz, creutz@bnl.gov */ /* see the accompanying mpitest.C and the Makefile for using this */ /* since the qmemcpy routings are in C++, compile things as if they were C++ */ /* MPI_Wait does a sync; so it is crucial that every processor call it an equal number of times */ /* implemented functions: int MPI_Init (int * argc, char *** argv); int MPI_Finalize (); int MPI_Comm_rank (MPI_Comm comm, int *rank); int MPI_Comm_size(MPI_Comm comm, int *size); int MPI_Get_processor_name( char * name, int * resultlen); int MPI_Barrier(MPI_Comm comm); int MPI_Abort(MPI_Comm comm, int errorcode); int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm); double MPI_Wtime(); int MPI_Reduce(void * sendbuf, void * recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); int MPI_Allreduce(void * sendbuf, void * recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); int MPI_Recv( void * buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status * status ) int MPI_Send( void * buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm ) int MPI_Wait (MPI_Request * request, MPI_Status * status ); int MPI_Waitall (int count; MPI_Request * request, MPI_Status * status ); int MPI_Isend( void * buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request * request ) int MPI_Irecv( void * buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request * request ) int MPI_Issend( void * buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request * request ) int MPI_Attr_get (MPI_Comm comm, int keyval, void * attr_value, int *flag ) */ /* For Send/Recv: the send buffer is immediately cleared, so many sends can be stacked up. In contrast, there is currently only one receive buffer, so don't send more than one message to a given processor without reading the earlier ones. */ /* To receive messages EVERY processor must call MPI_Wait() or the machine will lock up. This is because there are sync's in there. The receive must be separated from the matching send by an MPI_Wait() */ /* For Isend/Irecv: the MPI_Wait follows the pair, and up to MAXPENDING messages on any processor can be done together. */ /* internal parameters for qmemcpy.C */ #define DEBUG 0 #define QLEN 256 #define BLOCKSIZE 24 #include "include/qmemcpyoc.C" /* the low level data transfer routines */ #include #include #include #include "mpi.h" /* the Send/Recv buffers */ #define MYBUFFERLENGTH 1024 char myinbuffer[MYBUFFERLENGTH]; char myoutbuffer[MYBUFFERLENGTH]; /* for Isend/Irecv: tables of pending transfers */ typedef struct { int tag; int destproc; void * dest; int srcproc; void * src; int size; } pendingtrans; /* the maximum number of pending receives allowed for */ #ifndef MAXPENDING #define MAXPENDING 4 #endif int nextsend,nextrecv; pendingtrans pendingsends[MAXPENDING],pendingrecvs[MAXPENDING],matchme; extern "C" { int mikes_MPI_SIZE (MPI_Datatype datatype) { /* sizeof doesn't work for MPI_Datatype, thus this function */ /* I probably should do this with a table, but then error checking is harder */ /* see man MPI_COMM_WORLD */ switch (datatype){ case MPI_CHAR: case MPI_BYTE: case MPI_UNSIGNED_CHAR: return sizeof(char); case MPI_SHORT: case MPI_UNSIGNED_SHORT: return sizeof(short); case MPI_INT: case MPI_UNSIGNED: return sizeof(int); case MPI_LONG: case MPI_UNSIGNED_LONG: return sizeof(long); case MPI_FLOAT: return sizeof(float); case MPI_DOUBLE: return sizeof(double); case MPI_FLOAT_INT: return sizeof(float)+sizeof(int); case MPI_LONG_INT: return sizeof(long)+sizeof(int); case MPI_DOUBLE_INT: return sizeof(double)+sizeof(int); case MPI_SHORT_INT: return sizeof(short)+sizeof(int); case MPI_2INT: return 2*sizeof(int); default: die("need to insert size for new datatype in mikes_MPI_SIZE()"); } return -1; } int MPI_Init ( int * argc, char *** argv) { matchme.srcproc=-1; mycom.start(); nextsend=nextrecv=0; return MPI_SUCCESS; } int MPI_Finalize () { mycom.stop(); return MPI_SUCCESS; } int MPI_Comm_size(MPI_Comm comm, int *size){ *size=NumNodes(); return MPI_SUCCESS; } int MPI_Comm_rank (MPI_Comm comm, int * rank) { *rank=processor; return MPI_SUCCESS; } int MPI_Get_processor_name( char * name, int * resultlen){ sprintf(name,"QCDOC processor=%d",processor); *resultlen=strlen(name); return MPI_SUCCESS; } double MPI_Wtime(){ double time = (1.0 * clock()) / CLOCKS_PER_SEC; return time; } int MPI_Bcast (void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm){ /* this assumes the buffer is at the same address on all processors */ /* use a binary splitting based on processor id */ int bit,destproc,i,bitcount,topbit; /* how many bits differ between root and processor and at what bit do they start being the same */ bitcount=0; topbit=1; i=processor^root; while(i){ topbit=i<<1; i&=(i-1); bitcount++; } /* wait to get the broadcast */ for (bit=1;bit<(1<=*daccum1) ? *locaccum2 : *locaccum1; /* fall through to MPI_MAX */ case MPI_MAX: *daccum2 = (*daccum2>=*daccum1) ? *daccum2 : *daccum1; break; case MPI_MINLOC: *locaccum2=(*daccum2<*daccum1) ? *locaccum2 : *locaccum1; case MPI_MIN: *daccum2 = (*daccum2<*daccum1) ? *daccum2 : *daccum1; break; case MPI_PROD: *daccum2 *= *daccum1; break; default: die("unknown or unimplemented reduction operator"); break; } } } /* negative root flags for global reduction */ if ( (root<0) || (processor==root)) memcpy(recvbuf,daccum2,mikes_MPI_SIZE(datatype)); recvbuf=(char *) recvbuf + mikes_MPI_SIZE(datatype); sendbuf=(char *) sendbuf + mikes_MPI_SIZE(datatype); } return MPI_SUCCESS; } /* integer reductions */ if ((datatype == MPI_INT) ||(datatype == MPI_UNSIGNED) ||(datatype == MPI_2INT) ||(datatype == MPI_UNSIGNED_SHORT)) { for (i=0;i=*iaccum1) ? *locaccum2 : *locaccum1; case MPI_MAX: *iaccum2= (*iaccum2>=*iaccum1) ? *iaccum2 : *iaccum1; break; case MPI_MINLOC: *locaccum2=(*iaccum2<*iaccum1) ? *locaccum2 : *locaccum1; case MPI_MIN: *iaccum2= (*iaccum2<*iaccum1) ? *iaccum2 : *iaccum1; break; case MPI_PROD: *iaccum2*=*iaccum1; break; case MPI_BXOR: *iaccum2^=*iaccum1; break; case MPI_BAND: *iaccum2&=*iaccum1; break; case MPI_BOR: *iaccum2|=*iaccum1; break; case MPI_LXOR: /* force things to be boolian */ *iaccum2 = (*iaccum2 && 1) ^ (*iaccum1 && 1); break; case MPI_LAND: *iaccum2 = (*iaccum2) && (*iaccum1); break; case MPI_LOR: *iaccum2 = (*iaccum2) || (*iaccum1); break; default: die("unknown or unimplemented reduction operator"); break; } } } if ( (root<0) || (processor==root)) memcpy(recvbuf,iaccum2,mikes_MPI_SIZE(datatype)); recvbuf=(char *) recvbuf + mikes_MPI_SIZE(datatype); sendbuf=(char *) sendbuf + mikes_MPI_SIZE(datatype); } return MPI_SUCCESS; } die("requested reduction not implemented yet"); return MPI_SUCCESS; } int MPI_Attr_get (MPI_Comm comm, int keyval, void * attr_value, int *flag ) { /* MPI_Attr_get assumes attr_value is a pointer to a pointer to the the attribute. */ *flag=1; switch (keyval){ case MPI_HOST: case MPI_IO: ** (int **) attr_value=0; break; case MPI_TAG_UB: ** (int **) attr_value=INT_MAX; break; case MPI_WTIME_IS_GLOBAL: ** (int **) attr_value=0; break; default: printf("unknown keval in MPI_Attr_get\n"); * (int *) flag=0; } return MPI_SUCCESS; } int MPI_Allreduce(void * sendbuf, void * recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { /* use root=-1 to flag for global reduction */ return MPI_Reduce(sendbuf, recvbuf,count, datatype, op, -1, comm); } int MPI_Recv( void * buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status * status ) { /* Removes a message from the receive buffer. The message should have been sent by MPI_send and MPI_Wait must be called on all processors to be sure it has arrived. A direct call to mycom.finishcopy() would also work and be somewhat faster than MPI_Wait. */ char * ptr=myinbuffer; status->MPI_SOURCE=(* (int *) ptr); ptr+=sizeof(int); status->MPI_TAG=(* (int *) ptr); ptr+=sizeof(int); if ((status->MPI_SOURCE == source)||(source==MPI_ANY_SOURCE)){ if ((status->MPI_TAG == tag)||(tag==MPI_ANY_TAG)){ status->count=count; memcpy(buf,ptr,count*mikes_MPI_SIZE(datatype)); return MPI_SUCCESS; } } die("Wrong message found."); return MPI_ERR_TAG; } int MPI_Send( void * buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm ) { /* sends the message to the receive buffer on the destination node. This is non-blocking; the transfer is completed by a call to MPI_Wait on every node. */ char * ptr=myoutbuffer; int size; size=count*mikes_MPI_SIZE(datatype); if (MYBUFFERLENGTHMAXPENDING) die("Pending send table overflow, increase MAXPENDING"); return MPI_SUCCESS; } int MPI_Issend( void * buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request * request ){ /* only one kind of Isend */ return MPI_Isend( buf, count, datatype, dest, tag, comm, request ); } int MPI_Irecv( void * buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request * request ) { /* set up a pending receive for MPI_Wait to complete */ pendingrecvs[nextrecv].tag=tag; pendingrecvs[nextrecv].destproc=processor; pendingrecvs[nextrecv].dest=buf; pendingrecvs[nextrecv].srcproc=source; pendingrecvs[nextrecv].src=NULL; pendingrecvs[nextrecv].size=count*mikes_MPI_SIZE(datatype); nextrecv++; if (nextrecv>MAXPENDING) die("Pending receive table overflow, increase MAXPENDING"); return MPI_SUCCESS; } int MPI_Wait (MPI_Request * request, MPI_Status * status ){ /* ignore handle and wait for all pending requests to finish */ /* let the receiving end do the final transfer */ int i,j; status->MPI_ERROR = MPI_SUCCESS; /* get the send info to the recv side */ for (i=0;i=0){ for (j=0;j= 0) die("matching receive/send pair not found"); #endif nextrecv=nextsend=0; /* complete the transfer */ mycom.finishcopy(); return MPI_SUCCESS; } int MPI_Waitall (int count, MPI_Request * request, MPI_Status * status ){ return MPI_Wait(request,status); } int MPI_Barrier ( MPI_Comm comm ){ mycom.finishcopy(); return MPI_SUCCESS; } int MPI_Abort(MPI_Comm comm, int errorcode){ mycom.stop(); return errorcode; } }