|
Sierra Toolkit
Version of the Day
|
00001 00010 #include <cstdlib> 00011 #include <stk_util/parallel/mpi_filebuf.hpp> 00012 #include <assert.h> 00013 00014 enum { buffer_default_length = 4096 }; 00015 enum { buffer_putback_length = 16 }; 00016 00017 /*--------------------------------------------------------------------*/ 00018 00019 mpi_filebuf::mpi_filebuf() 00020 : std::streambuf(), 00021 comm( MPI_COMM_NULL ), 00022 comm_root( -1 ), 00023 comm_root_fp( NULL ), 00024 comm_output( 0 ), 00025 comm_buffer( NULL ), 00026 comm_buffer_len( buffer_default_length ), 00027 comm_time(0.0) 00028 {} 00029 00030 mpi_filebuf::~mpi_filebuf() 00031 { 00032 close(); 00033 } 00034 00035 /*--------------------------------------------------------------------*/ 00036 00037 mpi_filebuf * mpi_filebuf::set_buffer_length( const size_t len ) 00038 { 00039 // If already open then abort 00040 if ( NULL != comm_buffer ) return (mpi_filebuf *) NULL ; 00041 00042 // Wait and verify upon the attempt to open 00043 comm_buffer_len = buffer_putback_length < len ? len : buffer_putback_length ; 00044 00045 return this ; 00046 } 00047 00048 /*--------------------------------------------------------------------*/ 00049 00050 mpi_filebuf * mpi_filebuf::open( 00051 MPI_Comm communicator , 00052 const int root_processor , 00053 const std::ios_base::openmode file_mode , 00054 const char * const file_name ) 00055 { 00056 const double start_time = MPI_Wtime(); 00057 00058 // If already open then abort 00059 if ( NULL != comm_buffer ) return (mpi_filebuf *) NULL ; 00060 00061 const int mode = 00062 ( std::ios::in == file_mode ) ? 'r' : ( 00063 ( std::ios::out == file_mode ) ? 'w' : ( 00064 ( std::ios::app == file_mode ) ? 'a' : -1 ) ); 00065 00066 int err ; 00067 int rank ; 00068 int local, global ; 00069 int data[3] ; 00070 00071 // Broadcast the selected root processor and 'C' file mode 00072 00073 data[0] = root_processor ; 00074 data[1] = mode ; 00075 data[2] = comm_buffer_len ; 00076 00077 if ( MPI_SUCCESS != ( err = MPI_Bcast(data,3,MPI_INT,0,communicator) ) ) 00078 MPI_Abort( communicator , err ); 00079 00080 // Verify that all processors have the same root, mode, and buffer length: 00081 00082 local = data[0] != root_processor || data[1] != mode || data[2] != (signed) comm_buffer_len ; 00083 00084 if ( MPI_SUCCESS != ( err = 00085 MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) ) 00086 MPI_Abort( communicator , err ); 00087 00088 if ( global ) { 00089 comm_time += MPI_Wtime() - start_time ; 00090 return (mpi_filebuf *) NULL ; 00091 } 00092 00093 //-------------------------------------------------------------------- 00094 // Root processor and mode are consistent. 00095 // All processors try to allocate buffers and the 00096 // root processor tries to open the file. 00097 00098 if ( MPI_SUCCESS != ( err = MPI_Comm_rank( communicator , &rank ) ) ) 00099 MPI_Abort( communicator , err ); 00100 00101 char * const tmp_buf = (char *) std::malloc( comm_buffer_len ); 00102 std::FILE * tmp_fp = NULL ; 00103 00104 local = tmp_buf == NULL ; // Failed allocation ? 00105 00106 if ( root_processor == rank && ! local ) { 00107 tmp_fp = std::fopen( file_name , ( ( ( mode == 'r' ) ? "r" : 00108 ( mode == 'w' ) ? "w" : "a" ) ) ); 00109 #ifdef REDSTORM_SETVBUF 00110 if (tmp_fp) { 00111 if (std::setvbuf(tmp_fp, NULL, _IOFBF, 32768) != 0) { 00112 std::fclose(tmp_fp); 00113 tmp_fp = 0; 00114 } 00115 } 00116 #endif 00117 local = NULL == tmp_fp ; 00118 } 00119 00120 if ( MPI_SUCCESS != ( err = 00121 MPI_Allreduce(&local,&global,1,MPI_INT,MPI_BOR,communicator) ) ) 00122 MPI_Abort( communicator , err ); 00123 00124 if ( global ) { 00125 if ( NULL != tmp_buf ) std::free( tmp_buf ); // Deallocate 00126 if ( NULL != tmp_fp ) std::fclose( tmp_fp ); // Close the file 00127 comm_time += MPI_Wtime() - start_time ; 00128 return (mpi_filebuf *) NULL ; 00129 } 00130 00131 //-------------------------------------------------------------------- 00132 // All memory allocated and root processor openned the file 00133 // Update the internal members accordingly. 00134 00135 comm = communicator ; 00136 comm_root = root_processor ; 00137 comm_root_fp = tmp_fp ; 00138 comm_buffer = tmp_buf ; 00139 comm_output = mode != 'r' ; 00140 00141 // If output then set up put-buffer 00142 00143 if ( comm_output ) setp( comm_buffer, comm_buffer + comm_buffer_len ); 00144 00145 comm_time += MPI_Wtime() - start_time ; 00146 00147 return this ; 00148 } 00149 00150 /*--------------------------------------------------------------------*/ 00151 00152 mpi_filebuf * mpi_filebuf::close() 00153 { 00154 mpi_filebuf * tmp = NULL ; 00155 00156 if ( NULL != comm_buffer ) { 00157 00158 flush(); // Flush the buffers 00159 00160 if ( NULL != comm_root_fp ) std::fclose( comm_root_fp ); // Close the file 00161 00162 std::free( comm_buffer ); // Free the buffer 00163 00164 if ( comm_output ) setp(NULL,NULL); 00165 else setg(NULL,NULL,NULL); 00166 00167 // Reset the members: 00168 00169 comm = MPI_COMM_NULL ; 00170 comm_root = -1 ; 00171 comm_root_fp = NULL ; 00172 comm_output = 0 ; 00173 comm_buffer = NULL ; 00174 00175 tmp = this ; 00176 } 00177 00178 return tmp ; 00179 } 00180 00181 /*--------------------------------------------------------------------*/ 00182 /* Underflow, a global call. 00183 Read more data from the root processor's file and 00184 broadcast it to all processors. 00185 */ 00186 00187 int mpi_filebuf::underflow() 00188 { 00189 const double start_time = MPI_Wtime(); 00190 00191 if ( NULL != comm_buffer && ! comm_output && // Open for read 00192 ( gptr() == NULL || gptr() >= egptr() ) ) { // valid get buffer 00193 00194 00195 // Length of the buffer, consistent on all processors 00196 // Entire buffer is offset to accomodate putbacks 00197 00198 const size_t size = comm_buffer_len - buffer_putback_length ; 00199 char * const buf = comm_buffer + buffer_putback_length ; 00200 00201 int nread ; 00202 int err ; 00203 00204 // Root processor reads from the file and broadcasts the result 00205 00206 if ( NULL != comm_root_fp ) nread = std::fread(buf,1,size,comm_root_fp); 00207 00208 if ( MPI_SUCCESS != ( err = 00209 MPI_Bcast( &nread, 1, MPI_INT, comm_root, comm ) ) ) 00210 MPI_Abort(comm,err); 00211 00212 // If the read is successfull then update the get buffer pointers: 00213 00214 if ( 0 < nread ) { 00215 00216 // Broadcast the read buffer to all processors: 00217 00218 if ( MPI_SUCCESS != ( err = 00219 MPI_Bcast( buf, nread, MPI_BYTE, comm_root, comm ) ) ) 00220 MPI_Abort(comm,err); 00221 00222 // Set the get buffer: 00223 00224 setg( comm_buffer, buf, buf + nread ); 00225 00226 // Return the next character from the file: 00227 00228 comm_time += MPI_Wtime() - start_time ; 00229 00230 return *buf ; 00231 } 00232 } 00233 00234 // Failed: set the get buffer to NULL and return EOF 00235 setg(NULL, NULL, NULL); 00236 00237 comm_time += MPI_Wtime() - start_time ; 00238 00239 return EOF; 00240 } 00241 00242 /*--------------------------------------------------------------------*/ 00243 /* Overflow, a local call. 00244 Output complete lines of data on the root processor. 00245 Increase the buffer size on all other processors. 00246 */ 00247 00248 int mpi_filebuf::overflow( int c ) 00249 { 00250 if ( NULL != comm_buffer && comm_output ) { // open for write 00251 00252 // Determine current offset and length: 00253 char * cur_buffer = comm_buffer ; 00254 size_t cur_offset = pptr() - cur_buffer ; 00255 size_t cur_length = epptr() - cur_buffer ; 00256 00257 assert( cur_offset <= cur_length /* detecting abuse by 'ostream' */ ); 00258 00259 if ( NULL != comm_root_fp ) { 00260 if ( std::fwrite(cur_buffer,1,cur_offset,comm_root_fp) != cur_offset ) { 00261 return EOF ; // Write failed 00262 } 00263 cur_offset = 0 ; 00264 } 00265 else if ( cur_length <= cur_offset ) { 00266 // Not root processor, ran out of buffer space and 00267 // cannot write so increase the buffer size: 00268 cur_buffer = (char *) std::realloc( cur_buffer , cur_length *= 2 ); 00269 } 00270 00271 // If buffer is still good then reset the put-buffer 00272 00273 if ( NULL != cur_buffer ) { 00274 00275 comm_buffer = cur_buffer ; 00276 00277 setp( cur_buffer + cur_offset, cur_buffer + cur_length ); 00278 00279 if ( c != EOF ) { 00280 00281 sputc(c); 00282 return c; 00283 } 00284 else { 00285 return 0; 00286 } 00287 } 00288 } 00289 return EOF ; 00290 } 00291 00292 /*--------------------------------------------------------------------*/ 00293 /* Send output buffers to root processor and 00294 write them to the output file. 00295 */ 00296 00297 mpi_filebuf * mpi_filebuf::flush() 00298 { 00299 const double start_time = MPI_Wtime(); 00300 00301 int result = -1 ; // Failure return value 00302 00303 if ( NULL != comm_buffer && comm_output ) { // Open for write 00304 00305 int err ; 00306 00307 result = 0 ; 00308 00309 // Determine the local length: 00310 00311 char * cur_buf = comm_buffer ; 00312 unsigned int cur_len = pptr() - cur_buf ; 00313 00314 // Determine the global lengths 00315 00316 char * recv_buf = NULL ; 00317 int * recv_len = NULL ; 00318 int * recv_disp = NULL ; 00319 00320 int nproc = 0 ; 00321 00322 00323 // if ( NULL != comm_root_fp ) { 00324 00325 // It should no be neccessary to allocate recv_len on non-root 00326 // nodes, but the MPI_Gatherv on Janus always accesses recv_len 00327 // even on non-root processors which causes a segmentaion 00328 // violation if recv_len is set to NULL. 00329 00330 if ( MPI_SUCCESS != ( err = MPI_Comm_size(comm,&nproc) ) ) 00331 MPI_Abort( comm , err ); 00332 00333 recv_len = (int*) std::malloc( sizeof(int) * nproc ); 00334 00335 if ( NULL == recv_len ) MPI_Abort( comm , MPI_ERR_UNKNOWN ); 00336 00337 for (int j = 0 ; j < nproc ; ++j ) 00338 recv_len[j] = 0; 00339 // } 00340 00341 // Gather buffer lengths on the root processor 00342 00343 if ( MPI_SUCCESS != ( err = 00344 MPI_Gather(&cur_len,1,MPI_INT,recv_len,1,MPI_INT,comm_root,comm))) 00345 MPI_Abort( comm , err ); 00346 00347 // Root processor must allocate enough buffer space: 00348 00349 if ( NULL != comm_root_fp ) { 00350 00351 recv_len[ comm_root ] = 0 ; // Don't send to self 00352 00353 int i ; 00354 00355 if ( NULL == ( recv_disp = (int*) std::malloc( sizeof(int) * (nproc + 1) ) ) ) 00356 result = -1 ; 00357 00358 if ( 0 == result ) { // Allocation succeeded 00359 00360 recv_disp[0] = 0 ; 00361 00362 for ( i = 0 ; i < nproc ; ++i ) 00363 recv_disp[i+1] = recv_disp[i] + recv_len[i] ; 00364 00365 if ( 0 < recv_disp[nproc] ) { 00366 if ( NULL == ( recv_buf = (char*) std::malloc( recv_disp[nproc] ) ) ) 00367 result = -1 ; 00368 } 00369 else { 00370 result = 1 ; // No need to gather! 00371 } 00372 00373 if ( -1 != result ) { 00374 00375 // Write the root processor's buffer 00376 00377 if ( 0 < cur_len ) { 00378 if ( std::fwrite(cur_buf,1,cur_len,comm_root_fp) != cur_len ) 00379 result = -1 ; // Write failed 00380 00381 cur_len = 0 ; // Wrote this buffer 00382 } 00383 } 00384 } 00385 std::fflush( comm_root_fp ); 00386 } 00387 00388 // Root process broadcasts that all is well with the allocation 00389 00390 if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm))) 00391 MPI_Abort( comm , err ); 00392 00393 if ( 0 == result ) { // All-is-well, need to gather and write 00394 00395 // Gather the buffers to the root processor 00396 00397 if ( MPI_SUCCESS != ( err = 00398 MPI_Gatherv(cur_buf, cur_len, MPI_BYTE, 00399 recv_buf, recv_len, recv_disp, MPI_BYTE, 00400 comm_root, comm ) ) ) 00401 MPI_Abort( comm , err ); 00402 00403 // Output the buffers, beginning with 'comm_root' 00404 00405 if ( NULL != comm_root_fp ) { 00406 00407 int i ; 00408 00409 for ( i = 1 ; i < nproc && 0 == result ; ++i ) { 00410 const int j = ( i + comm_root ) % nproc ; 00411 const unsigned int len = recv_len[j] ; 00412 00413 if ( 0 < len ) 00414 if ( std::fwrite(recv_buf+recv_disp[j],1,len,comm_root_fp) != len ) 00415 result = -1 ; // Write failed 00416 } 00417 00418 std::fflush( comm_root_fp ); 00419 } 00420 00421 // Broadcast that the write succeeded 00422 00423 if ( MPI_SUCCESS != ( err = MPI_Bcast(&result,1,MPI_INT,comm_root,comm))) 00424 MPI_Abort( comm , err ); 00425 } 00426 else if ( 1 == result ) { 00427 // Did not need to gather 00428 00429 result = 0 ; 00430 } 00431 00432 // Reset the output buffer 00433 00434 setp( comm_buffer , epptr() ); 00435 00436 // Clean up allocated memory 00437 00438 if ( NULL != recv_buf ) std::free( recv_buf ); 00439 if ( NULL != recv_len ) std::free( recv_len ); 00440 if ( NULL != recv_disp ) std::free( recv_disp ); 00441 } 00442 00443 comm_time += MPI_Wtime() - start_time ; 00444 00445 return -1 == result ? (mpi_filebuf *) NULL : this ; 00446 } 00447 00448 /*--------------------------------------------------------------------*/ 00449 00450 int mpi_filebuf::sync() 00451 { 00452 // The root processor will push to file, all others ignore 00453 00454 if ( NULL != comm_root_fp ) { 00455 00456 // Determine the local length: 00457 00458 char * cur_buf = comm_buffer ; 00459 int cur_len = pptr() - cur_buf ; 00460 00461 if ( 0 < cur_len ) std::fwrite(cur_buf,1,cur_len,comm_root_fp); 00462 00463 std::fflush( comm_root_fp ); 00464 00465 setp( comm_buffer , epptr() ); 00466 } 00467 00468 return 0 ; 00469 } 00470 00471 00472 std::streambuf * mpi_filebuf::setbuf( char * s , std::streamsize n ) 00473 { 00474 return this ; 00475 }