/************************************************************************** * * Program: Ring * ring.c * * Written at: High-Performance Computing Laboratory * Department of Computer Science * Northern Illinois University * DeKalb, IL * * Authors: John Bresnahan * Doug Sale * * Date Written: 2/97 * * Description: * * This program requires a single command line argument, a character string. * It first configures itself and all the slave processes in a ring: * * * master * ^ \ * / v * slave n slave 1 * ^ \ * / v * slave n-1 slave 2 * ^ \ * \ v * . slave 3 * . / * ^ / * \ v * +----------- slave 4 * * * The master then places the command line arg character string into a * Nexus buffer, sends it to the first slave, and waits for it to be * passed around the ring to eventually end up where it started. * * In this program we chose to make the monitor variable a global * variable rather than placing its address in an endpoint. This * choice was made somewhat arbitrarily. See the RsrWithReply example * where we chose to place the address of the monitor in an endpoint * instead. * * Running this program requires at least one other process. * * To run this program: * * ring "hello, world" -nx -n 2 * * This will start the master and two slaves on your current machine. * * This program uses nexus_printf() which sends everything to stdout, * so executing the program in this manner will likely display all * output (including slave output) to your screen. * * It will send the message "hello, world" around the ring. * * Output: * * Master Sent Message: hello, world * olympus#1:c0:t0:p16425: Slave 1 Received Message: hello, world * olympus#2:c0:t0:p16422: Slave 2 Received Message: hello, world * Master Received Message: hello, world * * (NOTE: your output might print lines in different order) * **************************************************************************/ #include #include /***************************/ /* */ /* Nexus Handler Functions */ /* */ /***************************/ static void create_link_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler); static void reply_handler(nexus_endpoint_t * endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler); static void message_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler); /*********************************/ /* */ /* Nexus Handler Functions Table */ /* */ /*********************************/ #define CREATE_LINK_HANDLER_ID 0 #define REPLY_HANDLER_ID 1 #define MESSAGE_HANDLER_ID 2 static nexus_handler_t handlers[] = { \ {NEXUS_HANDLER_TYPE_NON_THREADED, (nexus_handler_func_t) create_link_handler}, {NEXUS_HANDLER_TYPE_NON_THREADED, (nexus_handler_func_t) reply_handler}, {NEXUS_HANDLER_TYPE_NON_THREADED, (nexus_handler_func_t) message_handler} }; /*******************/ /* */ /* Data Structures */ /* */ /*******************/ /* Monitor */ /* */ /* we can simulate a monitor by defining a struct that has */ /* at least a nexus_mutex_t to provide mutually exclusive */ /* access to the monitor. */ /* in this application we also need a monitor condition */ /* variable and an integer count field. */ typedef struct _monitor_t { /* for mutually exclusive access to monitor */ nexus_mutex_t mutex; /* a monitor condition variable and count */ nexus_cond_t cond; int count; } monitor_t; /********************/ /* */ /* Global Variables */ /* */ /********************/ static nexus_endpointattr_t EpAttr; static nexus_endpoint_t GlobalEndpoint; static monitor_t Monitor; static nexus_startpoint_t NextInRing; static int MyId; /*******************/ /* */ /* Local Functions */ /* */ /*******************/ static void create_ring_links(nexus_node_t *nodes, int n_nodes); static void send_message(nexus_startpoint_t *first_slave, char *msg); /****************************/ /* */ /* Required Nexus Functions */ /* */ /****************************/ /* * NexusExit() (required of all Nexus programs) */ void NexusExit(void) { } /* end NexusExit() */ /* * NexusAcquiredAsNode() (required of all Nexus programs) */ int NexusAcquiredAsNode(nexus_startpoint_t *startpoint) { nexus_startpoint_bind(startpoint, &GlobalEndpoint); return 0; } /* end NexusAcquiredAsNode() */ /* * NexusBoot() (required of all Nexus programs) */ int NexusBoot(nexus_startpoint_t *startpoint) { /****************************************/ /* NexusBoot() must bind its startpoint */ /* arg to an endpoint in this context */ /****************************************/ /* ** before you can bind a startpoint to an endpoint, that endpoint ** must be initialized. endpoints are initialized only once, ** and once initialized, may be bound to several startpoints. ** ** initializing an endpoint requires an endpointattr. again, before you ** can use an endpointattr to initialize an endpoint, the endpointattr ** must also be initialized. endpointattrs are also intialized only ** once, and once initialized, may be used to initialize many endpoints. ** ** So, the sequence of events to bind a startpoint to an endpoint are: ** ** 1. initialize an endpointattr ** - done once per endpointattr ** - once done, this endpointattr can be used to ** initialize several endpoints ** 2. initialize endpoint using an initialized enpointattr ** - done once per endpoint ** - once done, this endpoint can be bound to several startpoints ** 3. bind startpoint to initialized endpoint */ /* initializing an endpointattr */ nexus_endpointattr_init(&EpAttr); nexus_endpointattr_set_handler_table(&EpAttr, handlers, sizeof(handlers)/sizeof(nexus_handler_t)); /* initializing endpoint ... no particular address */ nexus_endpoint_init(&GlobalEndpoint, &EpAttr); /* binding startpoint to endpoint */ nexus_startpoint_bind(startpoint, &GlobalEndpoint); return NEXUS_SUCCESS; } /* end NexusBoot() */ /********/ /* */ /* MAIN */ /* */ /********/ int main(int argc, char **argv) { nexus_buffer_t buffer; nexus_node_t *nodes; int n_nodes; int i; /***************************************************/ /* calls to nexus_init() and nexus_start() are */ /* required at the beginning of all Nexus programs */ /***************************************************/ nexus_init(&argc, &argv, "NEXUS_ARGS", /* conf info env variable */ "nx", /* package designator */ NULL, /* package args init callback func */ NULL, /* usage message callback func */ NULL, /* new process params func */ NULL, /* module list */ &nodes, &n_nodes); /* at this point : */ /* nodes[0].starpoint is NULL */ /* 0 < i < n_nodes, nodes[i].starpoint is non-NULL */ /* and points to slave i */ nexus_start(); /* only the master proceeds from here ... all slaves sit */ /* around waiting for their handler routines to get called. */ /* see if we have at least 2 nodes */ if (n_nodes < 2) { /* must always nexus_stdio_lock/nexus_stdio_unlock when doing any I/O */ nexus_stdio_lock(); fprintf(stderr, "ERROR: must be run with at least 2 nodes\n"); nexus_stdio_unlock(); nexus_abort(); } /* endif */ /* checking for message in command line args */ if(argc != 2) { /* must always nexus_stdio_lock/nexus_stdio_unlock when doing any I/O */ nexus_stdio_lock(); fprintf(stderr, "usage: %s \n", argv[0]); nexus_stdio_unlock(); nexus_abort(); } /* endif */ /**************************/ /* Initialize the monitor */ /**************************/ /* initializing the mutex that controls access to the monitor */ nexus_mutex_init(&(Monitor.mutex), (nexus_mutexattr_t *) NULL); /* initializing the condition variable */ nexus_cond_init(&(Monitor.cond), (nexus_condattr_t *) NULL); /***************/ /* create ring */ /***************/ create_ring_links(nodes, n_nodes); /********************************/ /* send message around the ring */ /********************************/ send_message(&(nodes[1].startpoint), argv[1]); /********************/ /* initial clean-up */ /********************/ nexus_mutex_destroy(&(Monitor.mutex)); nexus_cond_destroy(&(Monitor.cond)); /*********************/ /* Standard Clean-up */ /*********************/ nexus_endpoint_destroy(&GlobalEndpoint); nexus_endpointattr_destroy(&EpAttr); /* Free the node list */ for(i = 0; i < n_nodes; i++) { nexus_free(nodes[i].name); /* OK to call startpoint destroy on nodes[0].startpoint */ /* even though nodes[0].startpoint known to be NULL */ nexus_startpoint_destroy(&(nodes[i].startpoint)); } /* endfor */ nexus_free(nodes); /* Terminate master process */ nexus_context_destroy(NEXUS_FALSE); nexus_printf("main(): ERROR: We should never get here.\n"); return 0; } /* end main() */ /*******************/ /* */ /* Local Functions */ /* */ /*******************/ /* * create_ring_links() */ static void create_ring_links(nexus_node_t *nodes, int n_nodes) { nexus_startpoint_t master_sp; nexus_startpoint_t master_copy_sp; nexus_startpoint_t next_sp; nexus_buffer_t buffer; int i; MyId = 0; /* initializing monitor count field to wait for nslaves = n_nodes - 1 */ /* to report that they have received the ring configuration info */ nexus_mutex_lock(&(Monitor.mutex)); Monitor.count = n_nodes - 1; nexus_mutex_unlock(&(Monitor.mutex)); /* creating a startpoint to this process, the master process. */ /* recall that nodes[0].startpoint is NULL, so we cannot use */ /* it as a startpoint to the master process. */ /* we must explicitly create one here. */ nexus_startpoint_bind(&master_sp, &GlobalEndpoint); /* send master's startpoint and ring successor's startpoint to slaves */ for (i = 1; i < n_nodes - 1; i++) { /* placing startpoints into buffers destroys them, so we must make */ /* a copy of the startpoints and place the copies into the buffer */ nexus_startpoint_copy(&master_copy_sp, &master_sp); nexus_startpoint_copy(&next_sp, &(nodes[i+1].startpoint)); nexus_buffer_init(&buffer, (nexus_sizeof_startpoint(&master_copy_sp, 1) + nexus_sizeof_startpoint(&next_sp, 1) + nexus_sizeof_int(1)), 0); /* master startpoint ... used to reply back */ nexus_put_startpoint_transfer(&buffer, &master_copy_sp, 1); /* next in ring startpoint */ nexus_put_startpoint_transfer(&buffer, &next_sp, 1); /* slave id */ nexus_put_int(&buffer, &i, 1); /* sending buffer to slave i and invoking create_link_handler() there */ nexus_send_rsr(&buffer, &(nodes[i].startpoint), CREATE_LINK_HANDLER_ID, NEXUS_TRUE, NEXUS_FALSE); } /* endfor */ /* placing startpoints into buffers destroys them, so we must make */ /* a copy of the startpoints and place the copies into the buffer */ nexus_startpoint_copy(&master_copy_sp, &master_sp); nexus_startpoint_copy(&next_sp, &master_sp); nexus_buffer_init(&buffer, (nexus_sizeof_startpoint(&master_copy_sp, 1) + nexus_sizeof_startpoint(&next_sp, 1) + nexus_sizeof_int(1)), 0); /* master startpoint ... used to reply back */ nexus_put_startpoint_transfer(&buffer, &master_copy_sp, 1); /* next in ring startpoint */ nexus_put_startpoint_transfer(&buffer, &next_sp, 1); /* slave id */ i = n_nodes - 1; nexus_put_int(&buffer, &i, 1); /* sending buffer to slave n_nodes-1 and */ /* invoking create_link_handler() there */ nexus_send_rsr(&buffer, &(nodes[n_nodes-1].startpoint), CREATE_LINK_HANDLER_ID, NEXUS_TRUE, NEXUS_FALSE); /* destroying the locally created master_sp */ nexus_startpoint_destroy(&master_sp); /***********************************************/ /* wait for ALL slaves to reply - ring created */ /***********************************************/ /* the implementation of nexus_cond_wait() and nexus_cond_signal() */ /* makes it possible for nexus_cond_wait() to experience a 'false */ /* wakeup', i.e., return without having had a nexus_cond_signal() */ /* applied to it. */ /* */ /* this is why we must wait on a condition variable in the manner */ /* we do below, with a loop and a int count, rather than simply: */ /* nexus_mutex_lock(&(monitor.mutex)); */ /* nexus_cond_wait(&(monitor.cond), &(monitor.mutex)); */ /* nexus_mutex_unlock(&(monitor.mutex)); */ nexus_mutex_lock(&(Monitor.mutex)); while(Monitor.count) { nexus_cond_wait(&(Monitor.cond), &(Monitor.mutex)); } /* endwhile */ nexus_mutex_unlock(&(Monitor.mutex)); } /* end create_ring_links() */ /* * send_message() */ static void send_message(nexus_startpoint_t *first_slave, char *msg) { nexus_buffer_t buffer; int msg_size; /* initializing count field as monitor to act as flag */ /* to signal when message comes back to master */ nexus_mutex_lock(&(Monitor.mutex)); Monitor.count = 0; nexus_mutex_unlock(&(Monitor.mutex)); /* initialize buffer */ msg_size = strlen(msg) + 1; nexus_buffer_init(&buffer, (nexus_sizeof_int(1) + nexus_sizeof_char(msg_size)), 0); nexus_put_int(&buffer, &msg_size, 1); nexus_put_char(&buffer, msg, msg_size); /* send message to first slave and invoking message_handler() there */ nexus_send_rsr(&buffer, first_slave, MESSAGE_HANDLER_ID, NEXUS_TRUE, NEXUS_FALSE); /* must always nexus_stdio_lock/nexus_stdio_unlock when doing any I/O */ nexus_stdio_lock(); printf("Master Sent Message: %s\n", msg); nexus_stdio_unlock(); /****************************************/ /* wait for message to be passed around */ /* the ring and received by master */ /****************************************/ /* the implementation of nexus_cond_wait() and nexus_cond_signal() */ /* makes it possible for nexus_cond_wait() to experience a 'false */ /* wakeup', i.e., return without having had a nexus_cond_signal() */ /* applied to it. */ /* */ /* this is why we must wait on a condition variable in the manner */ /* we do below, with a loop and a int count, rather than simply: */ /* nexus_mutex_lock(&(monitor.mutex)); */ /* nexus_cond_wait(&(monitor.cond), &(monitor.mutex)); */ /* nexus_mutex_unlock(&(monitor.mutex)); */ nexus_mutex_lock(&(Monitor.mutex)); while (!(Monitor.count)) { nexus_cond_wait(&(Monitor.cond), &(Monitor.mutex)); } /* endwhile */ nexus_mutex_unlock(&(Monitor.mutex)); } /* end send_message() */ /***************************/ /* */ /* Nexus Handler Functions */ /* */ /***************************/ /* * create_link_handler() */ static void create_link_handler(nexus_endpoint_t * endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler) { nexus_startpoint_t reply_sp; nexus_buffer_t reply_buffer; /* get master and ring successor's startpoints */ nexus_get_startpoint(buffer, &reply_sp, 1); nexus_get_startpoint(buffer, &NextInRing, 1); nexus_get_int(buffer, &MyId, 1); /* init buffer as empty and reply to master */ nexus_buffer_init(&reply_buffer, 0, 0); /* send empty buffer to master invoking reply_handler() there */ nexus_send_rsr(&reply_buffer, &reply_sp, REPLY_HANDLER_ID, NEXUS_TRUE, is_non_threaded_handler); /* clean up */ nexus_buffer_destroy(buffer); nexus_startpoint_destroy(&reply_sp); } /* end create_link_handler() */ /* * reply_handler() */ static void reply_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler) { /* clean-up */ nexus_buffer_destroy(buffer); /******************************************************************/ /* decrement the counting semaphore - receive replies from slaves */ /******************************************************************/ nexus_mutex_lock(&(Monitor.mutex)); if (!(--Monitor.count)) { /* signalling only when count reaches 0 */ nexus_cond_signal(&(Monitor.cond)); } /* endif */ nexus_mutex_unlock(&(Monitor.mutex)); } /* end reply_handler() */ /* * message_handler() */ static void message_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler) { nexus_buffer_t forward_buffer; int msg_size; char *msg; /* get message out of buffer */ nexus_get_int(buffer, &msg_size, 1); msg = (char *) nexus_malloc(msg_size * sizeof(char)); nexus_get_char(buffer, msg, msg_size); if (MyId) { /* I am a slave */ /* print message */ nexus_printf("Slave %d Received Message: %s\n", MyId, msg); /* pass to ring successor */ nexus_buffer_init(&forward_buffer, (nexus_sizeof_int(1) + nexus_sizeof_char(msg_size)), 0); nexus_put_int(&forward_buffer, &msg_size, 1); nexus_put_char(&forward_buffer, msg, msg_size); /* passing message to next in ring and invoking */ /* message_handler() (this handler routine) there */ nexus_send_rsr(&forward_buffer, &NextInRing, MESSAGE_HANDLER_ID, NEXUS_TRUE, is_non_threaded_handler); /* clean up and destroy self */ nexus_buffer_destroy(buffer); nexus_free(msg); nexus_startpoint_destroy(&NextInRing); nexus_context_destroy(is_non_threaded_handler); } else { /* I am the master */ /* must always nexus_stdio_lock/nexus_stdio_unlock when doing any I/O */ nexus_stdio_lock(); printf("Master Received Message: %s\n", msg); nexus_stdio_unlock(); /* clean-up */ nexus_buffer_destroy(buffer); nexus_free(msg); /******************************************************************/ /* signalling monitor that message has travelled through the ring */ /******************************************************************/ /* the implementation of nexus_cond_wait() and nexus_cond_signal() */ /* makes it possible for nexus_cond_wait() to experience a 'false */ /* wakeup', i.e., return without having had a nexus_cond_signal() */ /* applied to it. */ /* */ /* this is why we must signal a condition variable in the manner */ /* we do below, with a int count, rather than simply: */ /* nexus_mutex_lock(&(Monitor.mutex)); */ /* nexus_cond_signal(&(Monitor.cond)); */ /* nexus_mutex_unlock(&(Monitor.mutex)); */ nexus_mutex_lock(&(Monitor.mutex)); Monitor.count = 1; nexus_cond_signal(&(Monitor.cond)); nexus_mutex_unlock(&(Monitor.mutex)); } /* endif */ } /* end message_handler() */