|
Sierra Toolkit
Version of the Day
|
00001 /*------------------------------------------------------------------------*/ 00002 /* Copyright 2010 Sandia Corporation. */ 00003 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */ 00004 /* license for use of this work by or on behalf of the U.S. Government. */ 00005 /* Export of this program may require a license from the */ 00006 /* United States Government. */ 00007 /*------------------------------------------------------------------------*/ 00008 00009 #include <cstdio> 00010 #include <stdexcept> 00011 #include <stk_util/parallel/ParallelInputStream.hpp> 00012 00013 /*--------------------------------------------------------------------*/ 00014 00015 namespace stk_classic { 00016 namespace { 00017 00018 #if defined( STK_HAS_MPI ) 00019 00020 void broadcast( ParallelMachine comm , void * buf , int n ) 00021 { MPI_Bcast( buf , n , MPI_BYTE , 0 , comm ); } 00022 00023 #else 00024 00025 void broadcast( ParallelMachine , void * , int ) {} 00026 00027 #endif 00028 00029 } 00030 } 00031 00032 /*--------------------------------------------------------------------*/ 00033 00034 namespace stk_classic { 00035 namespace { 00036 00037 //---------------------------------------------------------------------- 00038 00039 class ParInBuf : public std::streambuf { 00040 public: 00041 enum { BUFFER_LENGTH = 0x010000 /* 64k bytes */ }; 00042 enum { BUFFER_PUTBACK = 0x000010 /* 16 bytes */ }; 00043 enum { MAX_READ = BUFFER_LENGTH - BUFFER_PUTBACK }; 00044 ParInBuf( ParallelMachine , const char * const ); 00045 virtual ~ParInBuf(); 00046 00047 protected: 00048 virtual int underflow(); // refill the input buffer 00049 virtual int overflow( int c = EOF ); // Not called 00050 virtual int sync(); // No-op 00051 virtual std::streambuf * setbuf( char * , std::streamsize ); // No-op 00052 00053 private: 00054 void close(); 00055 00056 ParallelMachine m_comm ; 00058 std::FILE * m_root_fp ; 00059 char m_buffer[ BUFFER_LENGTH ]; 00060 }; 00061 00062 ParInBuf::ParInBuf( ParallelMachine comm , const char * const file_name ) 00063 : m_comm( comm ), m_root_fp( NULL ) 00064 { 00065 int result = 1 ; 00066 00067 if ( 0 == parallel_machine_rank( comm ) && NULL != file_name ) { 00068 result = NULL != ( m_root_fp = std::fopen( file_name , "r" ) ); 00069 } 00070 00071 broadcast( m_comm , & result , sizeof(int) ); 00072 00073 if ( ! result ) { 00074 std::string msg; 00075 msg.append("stk_classic::ParallelInputStream( " ); 00076 if ( 0 == parallel_machine_rank( comm ) && NULL != file_name ) { 00077 msg.append( file_name ); 00078 } 00079 else { 00080 msg.append( "<NULL>" ); 00081 } 00082 msg.append( " ) FAILED" ); 00083 throw std::runtime_error(msg); 00084 } 00085 } 00086 00087 void ParInBuf::close() 00088 { 00089 if ( NULL != m_root_fp ) { std::fclose( m_root_fp ); m_root_fp = NULL ; } 00090 setg(NULL,NULL,NULL); 00091 } 00092 00093 ParInBuf::~ParInBuf() 00094 { close(); } 00095 00096 int ParInBuf::underflow() 00097 { 00098 char * const buf = m_buffer + BUFFER_PUTBACK ; 00099 int nread = 0 ; 00100 00101 if ( gptr() == NULL || egptr() <= gptr() ) { 00102 if ( NULL != m_root_fp ) { nread = std::fread(buf,1,MAX_READ,m_root_fp); } 00103 broadcast( m_comm , & nread , sizeof(int) ); 00104 } 00105 00106 if ( 0 < nread ) { 00107 broadcast( m_comm , buf , nread ); 00108 setg( m_buffer , buf , buf + nread ); 00109 } 00110 else { 00111 close(); 00112 } 00113 00114 return 0 < nread ? *buf : EOF ; 00115 } 00116 00117 namespace { 00118 00119 void throw_overflow() 00120 { 00121 std::string msg ; 00122 msg.append("stk_classic::ParallelInputStream::overflow CALL IS ERRONEOUS" ); 00123 throw std::runtime_error(msg); 00124 } 00125 00126 } 00127 00128 int ParInBuf::overflow( int ) 00129 { throw_overflow(); return EOF ; } 00130 00131 int ParInBuf::sync() 00132 { return 0 ; } 00133 00134 std::streambuf * ParInBuf::setbuf( char * , std::streamsize ) 00135 { 00136 return this ; 00137 } 00138 00139 //---------------------------------------------------------------------- 00140 00141 } // namespace 00142 00143 00144 ParallelInputStream::ParallelInputStream( 00145 ParallelMachine comm , 00146 const char * const file_name ) 00147 : std::istream( new ParInBuf( comm , file_name ) ) 00148 {} 00149 00150 ParallelInputStream::~ParallelInputStream() 00151 { delete rdbuf(); } 00152 00153 } // namespace stk_classic 00154 00155