#include #include #include typedef struct { int index; int bytes_requested; char *read_buffer; int done; issfs_size_t offset; int dpssfd; } aread_userdata; /* command line args */ char *dpss_url = NULL; char *output_file = NULL; /* global to know have read entire file */ int done = 0; pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER; /* for output file */ FILE *fp = NULL; #define NBYTES 1 * 1024 * 1024 /* number of bytes to get from the DPSS at a * time */ /** @name dp_get copy a file from the DPSS to local disk. Usage: dp_get -i URL [-o filename][-v] [-i URL] URL to copy (format = x-dpss://master_name/set_id) [-o file] output file name (default = set name) @author Brian L. Tierney, LBNL. */ void usage_err (void); void parse_args (int argc, char **argv); void *read_callbackfunc (int nbytes, void *udata); #define NUMBUFS (2) main (int argc, char *argv[]) { int dpssfd, j, idx; int iosize = NBYTES; int64_t nbytes = 0; int r = NBYTES, w = NBYTES; char **read_buf; void **tid; issfs_size_t offset = 0; aread_userdata udata[NUMBUFS]; struct timeval tv = { 0, 500 }; parse_args (argc, argv); read_buf = (char **) calloc (NUMBUFS, sizeof (char *)); tid = (void **) calloc (NUMBUFS, sizeof (void *)); for (j = 0; j < NUMBUFS; j++) { read_buf[j] = calloc (iosize, sizeof (char)); } if (dpssInit () != DPSSFS_OK) { dpssPerror ("dpssInit()"); exit (-1); } dpss_status ("opening DPSS for set %s ", dpss_url); if ((dpssfd = dpssOpen (dpss_url, O_RDONLY, 0)) < 0) { dpssPerror ("dpssOpen()"); exit (-1); } if ((fp = fopen (output_file, "w")) == NULL) { dpss_status ("Couldn't open output file: %s", output_file); return (-1); } dpss_status ("Reading file from the DPSS, %d MB at a time", iosize / 1024 / 1024); /* read and write file in chunks of iosize untill end of file */ /* always have 2 outstanding reads at a time */ idx = 0; offset = 0; done = 0; /* #define UNALIGNED_BLOCK_TEST */ #ifdef UNALIGNED_BLOCK_TEST r = dpssRead (dpssfd, read_buf[0], DPSS_BLOCK_SIZE / 3); if ((fwrite (read_buf[0], 1, r, fp)) != r) { err_ret ("fwrite error"); exit (-1); } offset = (issfs_size_t) r; #endif udata[idx].index = idx; udata[idx].bytes_requested = iosize; udata[idx].read_buffer = read_buf[idx]; udata[idx].done = 0; udata[idx].offset = offset; udata[idx].dpssfd = dpssfd; dpss_status ("calling dpssAsyncRead (buffer %d), offset = %lld, read size = %d", idx, offset, iosize); r = dpssAsyncRead (dpssfd, read_buf[idx], (issfs_size_t) iosize, (issfs_size_t) offset,&tid[idx],(void *) &udata[idx], read_callbackfunc); if (r != ISS_OK) dpss_status("error starting dpssAsyncRead"); /* Grr */ offset += iosize; idx = abs (idx - 1); /* toggle between 0 and 1 */ while (!done) { /* start another read */ udata[idx].index = idx; udata[idx].bytes_requested = iosize; udata[idx].read_buffer = read_buf[idx]; udata[idx].done = 0; udata[idx].offset = offset; udata[idx].dpssfd = dpssfd; dpss_status ("calling dpssAsyncRead (buffer %d), offset = %lld, read size = %d", idx, offset, iosize); r = dpssAsyncRead (dpssfd, read_buf[idx], (issfs_size_t) iosize, (issfs_size_t) offset, &tid[idx], (void *) &udata[idx], read_callbackfunc); if (r != ISS_OK) dpss_status("error starting dpssAsyncRead"); /* wait for 1st read to finish */ idx = abs (idx - 1); /* toggle between 0 and 1 */ dpss_status ("waiting for read %d to finish ", idx); while (!(udata[idx].done)) { /* write of data to file happens in the callback routine */ /* sleep a bit */ select (0, 0, 0, 0, &tv); } offset += iosize; nbytes += iosize; /* should really be amount read */ dpss_status ("Read %lld MB from the DPSS ", nbytes / (1024 * 1024)); } if (r != ISS_OK) /* read error? */ { dpssPerror ("dpssRead()"); } if (dpssClose (dpssfd) != DPSSFS_OK) dpssPerror ("dpssClose()"); fclose (fp); dpss_status ("Read entire file (%lld bytes) from the DPSS", nbytes); exit (0); } /************************************************************************/ void usage_err (void) { fprintf (stderr, "\nUsage: dp_get [-i dpss_url][-o file] \n"); fprintf (stderr, "\t[-i URL] URL to copy (format = x-dpss://master_name/setname) \n" "\t[-o file] Output file name \n"); exit (-1); } /************************************************************************/ void * read_callbackfunc (int nbytes, void *udata) { static int count = 0; int index; aread_userdata *my_udata; char *rbuf; int i; issfs_size_t offset; int r; void **bar; bar = calloc(1,sizeof(void *)); my_udata = (aread_userdata *) udata; index = my_udata->index; rbuf = my_udata->read_buffer; offset = my_udata->offset; /* dpss_status ("read_callback %d: nbytes read = %d out of %d, off %lld ", index, nbytes, my_udata->bytes_requested, offset); */ if (nbytes < 0) /* is error */ { dpss_status("read_callback %d finished with error", index); my_udata->done = 1; return; } if (nbytes == 0) /* is done... */ { my_udata->done = 1; done = 1; return; } /* * have to lock the output stream, and seek to * the offset of the data returned */ pthread_mutex_lock(&output_lock); if (fseek(fp, (long)offset, SEEK_SET) < 0 ) { err_ret ("fseek error"); exit (-1); } if ((fwrite (rbuf, 1, nbytes, fp)) != nbytes) { err_ret ("fwrite error"); exit (-1); } pthread_mutex_unlock(&output_lock); /* if short, repost */ if (nbytes < my_udata->bytes_requested) { dpss_status("REPOSTING READ"); my_udata->offset += nbytes; r = dpssAsyncRead (my_udata->dpssfd, my_udata->read_buffer, (issfs_size_t) my_udata->bytes_requested, (issfs_size_t) (offset + nbytes), bar, (void *) udata, read_callbackfunc); return; } my_udata->done = 1; dpss_status ("read_callback %d done", index); return; } /************************************************************************/ void parse_args (int argc, char **argv) { char *s; void usage_err (); if (argc > 1) { while (--argc && (*++argv)[0] == '-') { for (s = argv[0] + 1; *s; s++) { switch (*s) { case 'i': if (argc < 2) usage_err (); dpss_url = (char *) calloc (128, (int) sizeof (char)); sscanf (*++argv, "%s", dpss_url); argc--; break; case 'o': if (argc < 2) usage_err (); output_file = (char *) calloc (128, (int) sizeof (char)); sscanf (*++argv, "%s", output_file); argc--; break; case 'h': /* help */ usage_err (); break; default: usage_err (); } /* end switch */ } /* end for */ } /* end while */ } /* end if */ if (dpss_url == NULL) /* must be set */ usage_err (); if (output_file == NULL) /* must be set */ usage_err (); }