/************************************************************************** * * Program: Single Remote Service Request With Reply * rsr_with_reply.c * * Written at: High-Performance Computing Laboratory * Department of Computer Science * Northern Illinois University * DeKalb, IL * * Author: Nicholas T. Karonis * * Date Written: 2/97 * * Description: * * This program contains the *minimum* amount of code required to * send a message from one process to another and wait for a reply. * * This program requires two command line arguments, two integers. * It first creates a endpoint/startpoint pair and places the startpoint * along with the two command line argument integers into a Nexus buffer * and sends it to an awaiting process. It then waits for a reply from * the other process. This waiting is implemented by creating a monitor * and placing a flag and condition variable inside. The sending * process enters the monitor, clears the flag, and waits on the * monitor's condition variable. * * Sending a message is called initiating a "remote service request" * (rsr). An rsr invokes a specified function (called a handler * routine, or simply handler) on the receiving process. * In this program, the rsr instructs the receiving process to * execute the function addem_handler(), which is a routine that is * found in this file. * * The receiving process extracts the startpoint (to be used as a reply * startpoint) and both integers from the buffer and calculates their sum. * It then places the sum into another Nexus buffer and initiates an * rsr which invokes the get_answer_handler() (also in this file) on * the original process. * * The get_answer_handler() extracts the sum from the buffer. It then * enters the monitor, places the sum in the monitor's data field, * sets the flag, and signals the condition variable. * * Once the condition variable is signaled and the flag has been set, * the sum may be extracted from the monitor's data field and the original * operands and sum are printed. * * In this program we chose to place the address of the monitor variable * in the endpoint rather than simply making the monitor variable a * global variable. This choice was made somewhat arbitrarily. The * main motivation was to illustrate how to set a user pointer in an * endpoint. See the Ring example where we chose to make the monitor * a global variable instead. * * To run this program: * * rsr_with_reply 1 2 -nx -n 1 * * This will start the master and slave on your current machine. * It will calculate 1+2. * * Output: * * 1 + 2 = 3 * **************************************************************************/ #include #include /***************************/ /* */ /* Nexus Handler Functions */ /* */ /***************************/ static void addem_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler); static void get_answer_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler); /*********************************/ /* */ /* Nexus Handler Functions Table */ /* */ /*********************************/ #define ADDEM_HANDLER_ID 0 #define GET_ANSWER_HANDLER_ID 1 static nexus_handler_t handlers[] = { \ {NEXUS_HANDLER_TYPE_NON_THREADED, (nexus_handler_func_t) addem_handler}, {NEXUS_HANDLER_TYPE_NON_THREADED, (nexus_handler_func_t) get_answer_handler} }; /*******************/ /* */ /* Data Structures */ /* */ /*******************/ /* Monitor */ /* */ /* we can simulate a monitor by defining a struct that has */ /* at least a nexus_mutex_t to provide mutually exclusive */ /* access to the monitor. */ /* in this application we also need a monitor condition */ /* variable and two data fields; a flag and an integer. */ typedef struct _monitor_t { /* for mutually exclusive access to monitor */ nexus_mutex_t mutex; /* a monitor condition variable and flag */ nexus_cond_t cond; nexus_bool_t done; /* data portion */ int sum; } monitor_t; /********************/ /* */ /* Global Variables */ /* */ /********************/ static nexus_endpointattr_t EpAttr; static nexus_endpoint_t GlobalEndpoint; /****************************/ /* */ /* Required Nexus Functions */ /* */ /****************************/ /* * NexusExit() (required of all Nexus programs) */ void NexusExit(void) { } /* end NexusExit() */ /* * NexusAcquiredAsNode() (required of all Nexus programs) */ int NexusAcquiredAsNode(nexus_startpoint_t *startpoint) { nexus_startpoint_bind(startpoint, &GlobalEndpoint); return 0; } /* end NexusAcquiredAsNode() */ /* * NexusBoot() (required of all Nexus programs) */ int NexusBoot(nexus_startpoint_t *startpoint) { /****************************************/ /* NexusBoot() must bind its startpoint */ /* arg to an endpoint in this context */ /****************************************/ /* ** before you can bind a startpoint to an endpoint, that endpoint ** must be initialized. endpoints are initialized only once, ** and once initialized, may be bound to several startpoints. ** ** initializing an endpoint requires an endpointattr. again, before you ** can use an endpointattr to initialize an endpoint, the endpointattr ** must also be initialized. endpointattrs are also intialized only ** once, and once initialized, may be used to initialize many endpoints. ** ** So, the sequence of events to bind a startpoint to an endpoint are: ** ** 1. initialize an endpointattr ** - done once per endpointattr ** - once done, this endpointattr can be used to ** initialize several endpoints ** 2. initialize endpoint using an initialized enpointattr ** - done once per endpoint ** - once done, this endpoint can be bound to several startpoints ** 3. bind startpoint to initialized endpoint */ /* initializing an endpointattr */ nexus_endpointattr_init(&EpAttr); nexus_endpointattr_set_handler_table(&EpAttr, handlers, sizeof(handlers)/sizeof(nexus_handler_t)); /* initializing endpoint ... no particular address */ nexus_endpoint_init(&GlobalEndpoint, &EpAttr); /* binding startpoint to endpoint */ nexus_startpoint_bind(startpoint, &GlobalEndpoint); return NEXUS_SUCCESS; } /* end NexusBoot() */ /********/ /* */ /* MAIN */ /* */ /********/ int main(int argc, char **argv) { nexus_node_t *nodes; int n_nodes; nexus_buffer_t buffer; monitor_t monitor; nexus_endpoint_t reply_done_ep; nexus_startpoint_t reply_done_sp; int x,y; int i; /***************************************************/ /* calls to nexus_init() and nexus_start() are */ /* required at the beginning of all Nexus programs */ /***************************************************/ nexus_init(&argc, &argv, "NEXUS_ARGS", /* conf info env variable */ "nx", /* package designator */ NULL, /* package args init callback func */ NULL, /* usage message callback func */ NULL, /* new process params func */ NULL, /* module list */ &nodes, &n_nodes); /* at this point : */ /* nodes[0].starpoint is NULL */ /* 0 < i < n_nodes, nodes[i].starpoint is non-NULL */ /* and points to slave i */ nexus_start(); /* only the master proceeds from here ... all slaves sit */ /* around waiting for their handler routines to get called. */ /* See if we have 2 nodes */ if (n_nodes != 2) { /* must always nexus_stdio_lock/nexus_stdio_unlock when doing any I/O */ nexus_stdio_lock(); fprintf(stderr, "ERROR: Must run with exactly 2 nodes.\n"); nexus_stdio_unlock(); nexus_abort(); } /* endif */ /* checking for 2 command line args */ if (argc != 3) { /* must always nexus_stdio_lock/nexus_stdio_unlock when doing any I/O */ nexus_stdio_lock(); fprintf(stderr, "usage: %s \n", argv[0]); nexus_stdio_unlock(); nexus_abort(); } /* endif */ /***********************************************************/ /* Initialize the monitor that will be used for signalling */ /* that we have received the reply. */ /***********************************************************/ /* initializing the mutex that controls access to the monitor */ nexus_mutex_init(&(monitor.mutex), (nexus_mutexattr_t *) NULL); /* initializing the condition variable */ nexus_cond_init(&(monitor.cond), (nexus_condattr_t *) NULL); /* entering the monitor and clearing the flag */ nexus_mutex_lock(&(monitor.mutex)); monitor.done = NEXUS_FALSE; nexus_mutex_unlock(&(monitor.mutex)); /*****************************************************/ /* Set up the reply endpoint to point to the monitor */ /*****************************************************/ /* initializing the endpoint with the already initialized */ /* endpointattr EpAttr. note that this is the second */ /* endpoint to be initialized with the same EpAttr, the */ /* first being GlobalEndpoint. many endpoints may be */ /* initialized using the same endpointattr. */ nexus_endpoint_init(&reply_done_ep, &EpAttr); nexus_endpoint_set_user_pointer(&reply_done_ep, (void *) (&monitor)); /**********************************************/ /* binding a startpoint to the reply endpoint */ /**********************************************/ nexus_startpoint_bind(&reply_done_sp, &reply_done_ep); /**************************/ /* remote service request */ /**************************/ x = atoi(argv[1]); y = atoi(argv[2]); /* initializing the buffer to hold the reply_done_sp and two integers */ /* Note: there is no difference between (nexus_sizeof_int(1) * 2) and */ /* simply nexus_sizeof_int(2). */ nexus_buffer_init(&buffer, nexus_sizeof_startpoint(&reply_done_sp, 1) + (nexus_sizeof_int(1) * 2), 0); /* placing the startpoint and two integers into the buffer */ nexus_put_startpoint_transfer(&buffer, (&reply_done_sp), 1); nexus_put_int(&buffer, &x, 1); nexus_put_int(&buffer, &y, 1); /* sending buffer to other process and invoking addem_handler() there */ nexus_send_rsr(&buffer, &(nodes[1].startpoint), ADDEM_HANDLER_ID, NEXUS_TRUE, NEXUS_FALSE); /********************************/ /* wait for reply using monitor */ /********************************/ /* the implementation of nexus_cond_wait() and nexus_cond_signal() */ /* makes it possible for nexus_cond_wait() to experience a 'false */ /* wakeup', i.e., return without having had a nexus_cond_signal() */ /* applied to it. */ /* */ /* this is why we must wait on a condition variable in the manner */ /* we do below, with a loop and a nexus_bool_t done, rather than */ /* simply: */ /* nexus_mutex_lock(&(monitor.mutex)); */ /* nexus_cond_wait(&(monitor.cond), &(monitor.mutex)); */ /* nexus_mutex_unlock(&(monitor.mutex)); */ nexus_mutex_lock(&(monitor.mutex)); while (!(monitor.done)) { nexus_cond_wait(&(monitor.cond), &(monitor.mutex)); } /* endwhile */ nexus_mutex_unlock(&(monitor.mutex)); /* at this point, flag in monitor has been set */ /* ... OK to extract sum from monitor's data field */ /***********************/ /* printing the answer */ /***********************/ nexus_mutex_lock(&(monitor.mutex)); /* must always nexus_stdio_lock/nexus_stdio_unlock when doing any I/O */ nexus_stdio_lock(); printf("%d + %d = %d\n\n", x, y, monitor.sum); nexus_stdio_unlock(); nexus_mutex_unlock(&(monitor.mutex)); /********************/ /* initial clean-up */ /********************/ nexus_mutex_destroy(&(monitor.mutex)); nexus_cond_destroy(&(monitor.cond)); nexus_endpoint_destroy(&reply_done_ep); /*********************/ /* Standard Clean-up */ /*********************/ nexus_endpoint_destroy(&GlobalEndpoint); nexus_endpointattr_destroy(&EpAttr); /* Free the node list */ for(i = 0; i < n_nodes; i++) { nexus_free(nodes[i].name); /* OK to call startpoint destroy on nodes[0].startpoint */ /* even though nodes[0].startpoint known to be NULL */ nexus_startpoint_destroy(&(nodes[i].startpoint)); } /* endfor */ nexus_free(nodes); /* Terminate master process */ nexus_context_destroy(NEXUS_FALSE); nexus_printf("main(): ERROR: We should never get here.\n"); return 0; } /* end main() */ /***************************/ /* */ /* Nexus Handler Functions */ /* */ /***************************/ /* * addem_handler() */ static void addem_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler) { nexus_startpoint_t reply_done_sp; nexus_buffer_t reply_buffer; int operands[2]; int sum; /*****************************************************/ /* extracting reply_done_sp and operands from buffer */ /*****************************************************/ /* note that items from buffer *must* be extracted in */ /* the same order that they were placed into the buffer. */ /* note also that the two operands may be extracted using */ /* a *single* get by specifying a space big enough to extract */ /* them and the fact that we want "2" of them. */ nexus_get_startpoint(buffer, &reply_done_sp, 1); nexus_get_int(buffer, operands, 2); /*********************************/ /* calculating value of function */ /*********************************/ sum = operands[0] + operands[1]; /**************************/ /* remote service request */ /**************************/ /* initializing buffer to hold single integer */ nexus_buffer_init(&reply_buffer, nexus_sizeof_int(1), 0); /* placing sum into buffer */ nexus_put_int(&reply_buffer, &sum, 1); /* sending buffer to original process, using reply_done_sp */ /* that we just extracted from the buffer, and invoking */ /* get_answer_handler() there */ nexus_send_rsr(&reply_buffer, &reply_done_sp, GET_ANSWER_HANDLER_ID, NEXUS_TRUE, is_non_threaded_handler); /************/ /* clean-up */ /************/ nexus_buffer_destroy(buffer); nexus_startpoint_destroy(&reply_done_sp); /* terminate slave process */ nexus_context_destroy(is_non_threaded_handler); } /* end addem_handler() */ /* * get_answer_handler() */ static void get_answer_handler(nexus_endpoint_t *endpoint, nexus_buffer_t *buffer, nexus_bool_t is_non_threaded_handler) { monitor_t *mp; int sum; /****************************************************/ /* extracting pointer to monitor from this endpoint */ /****************************************************/ mp = (monitor_t *) nexus_endpoint_get_user_pointer(endpoint); /******************************************/ /* extracting answer from message buffer */ /******************************************/ nexus_get_int(buffer, &sum, 1); /************/ /* clean-up */ /************/ nexus_buffer_destroy(buffer); /***********************************************/ /* placing sum into monitor, setting flag, and */ /* signalling cond variable to wake up main() */ /***********************************************/ /* the implementation of nexus_cond_wait() and nexus_cond_signal() */ /* makes it possible for nexus_cond_wait() to experience a 'false */ /* wakeup', i.e., return without having had a nexus_cond_signal() */ /* applied to it. */ /* */ /* this is why we must signal a condition variable in the manner */ /* we do below, with a nexus_bool_t done, rather than simply: */ /* nexus_mutex_lock(&(mp->mutex)); */ /* mp->sum = sum; */ /* nexus_cond_signal(&(mp->cond)); */ /* nexus_mutex_unlock(&(mp->mutex)); */ nexus_mutex_lock(&(mp->mutex)); mp->sum = sum; mp->done = NEXUS_TRUE; nexus_cond_signal(&(mp->cond)); nexus_mutex_unlock(&(mp->mutex)); } /* end get_answer_handler() */