|
Sierra Toolkit
Version of the Day
|
00001 /*------------------------------------------------------------------------*/ 00002 /* Copyright 2010 Sandia Corporation. */ 00003 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */ 00004 /* license for use of this work by or on behalf of the U.S. Government. */ 00005 /* Export of this program may require a license from the */ 00006 /* United States Government. */ 00007 /*------------------------------------------------------------------------*/ 00008 00009 #include <stk_util/parallel/ParallelReduce.hpp> 00010 00011 #include <stdlib.h> 00012 #include <stdexcept> 00013 #include <sstream> 00014 #include <vector> 00015 00016 namespace stk_classic { 00017 00018 #if defined( STK_HAS_MPI ) 00019 00022 00023 void all_write_string( ParallelMachine arg_comm , 00024 std::ostream & arg_root_os , 00025 const std::string & arg_msg ) 00026 { 00027 const int i_zero = 0 ; 00028 const int p_root = 0 ; 00029 const unsigned p_size = parallel_machine_size( arg_comm ); 00030 const unsigned p_rank = parallel_machine_rank( arg_comm ); 00031 00032 int result ; 00033 00034 // Gather the send counts on root processor 00035 00036 int send_count = arg_msg.size(); 00037 00038 std::vector<int> recv_count( p_size , i_zero ); 00039 00040 int * const recv_count_ptr = & recv_count[0] ; 00041 00042 result = MPI_Gather( & send_count , 1 , MPI_INT , 00043 recv_count_ptr , 1 , MPI_INT , 00044 p_root , arg_comm ); 00045 00046 if ( MPI_SUCCESS != result ) { 00047 std::ostringstream msg ; 00048 msg << "stk_classic::all_write FAILED: MPI_Gather = " << result ; 00049 throw std::runtime_error( msg.str() ); 00050 } 00051 00052 // Receive counts are only non-zero on the root processor: 00053 00054 std::vector<int> recv_displ( p_size + 1 , i_zero ); 00055 00056 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00057 recv_displ[i+1] = recv_displ[i] + recv_count[i] ; 00058 } 00059 00060 const unsigned recv_size = (unsigned) recv_displ[ p_size ] ; 00061 00062 std::vector<char> buffer( recv_size ); 00063 00064 { 00065 const char * const send_ptr = arg_msg.c_str(); 00066 char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ; 00067 int * const recv_displ_ptr = & recv_displ[0] ; 00068 00069 result = MPI_Gatherv( (void*) send_ptr, send_count, MPI_CHAR , 00070 recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR, 00071 p_root, arg_comm ); 00072 } 00073 00074 if ( MPI_SUCCESS != result ) { 00075 std::ostringstream msg ; 00076 msg << "stk_classic::all_write FAILED: MPI_Gatherv = " << result ; 00077 throw std::runtime_error( msg.str() ); 00078 } 00079 00080 if ( p_root == (int) p_rank ) { 00081 // arg_root_os << std::endl ; 00082 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00083 if ( recv_count[i] ) { 00084 char * const ptr = & buffer[ recv_displ[i] ]; 00085 arg_root_os.write( ptr , recv_count[i] ); 00086 arg_root_os << std::endl ; 00087 } 00088 } 00089 arg_root_os.flush(); 00090 } 00091 } 00092 00093 //---------------------------------------------------------------------- 00094 //---------------------------------------------------------------------- 00095 00096 void all_reduce( ParallelMachine arg_comm , 00097 ParallelReduceOp arg_op , 00098 void * arg_in , 00099 void * arg_out , 00100 unsigned arg_len ) 00101 { 00102 MPI_Op mpi_op = MPI_OP_NULL ; 00103 00104 MPI_Op_create( arg_op , 0 , & mpi_op ); 00105 00106 // The SUN was buggy when combining an 00107 // MPI_Allreduce with a user defined operator, 00108 // use reduce/broadcast instead. 00109 /* 00110 const int result = 00111 MPI_Allreduce(arg_in,arg_out,arg_len,MPI_BYTE,mpi_op,arg_comm); 00112 */ 00113 00114 const int result_reduce = 00115 MPI_Reduce(arg_in,arg_out,arg_len,MPI_BYTE,mpi_op,0,arg_comm); 00116 00117 const int result_bcast = 00118 MPI_Bcast(arg_out,arg_len,MPI_BYTE,0,arg_comm); 00119 00120 MPI_Op_free( & mpi_op ); 00121 00122 if ( MPI_SUCCESS != result_reduce || MPI_SUCCESS != result_bcast ) { 00123 std::ostringstream msg ; 00124 msg << "stk_classic::all_reduce FAILED: MPI_Reduce = " << result_reduce 00125 << " MPI_Bcast = " << result_bcast ; 00126 throw std::runtime_error( msg.str() ); 00127 } 00128 } 00129 00130 //---------------------------------------------------------------------- 00131 //---------------------------------------------------------------------- 00132 00133 void all_reduce_sum( ParallelMachine comm , 00134 const double * local , double * global , unsigned count ) 00135 { 00136 double * tmp = const_cast<double*>( local ); 00137 MPI_Allreduce( tmp , global , count , MPI_DOUBLE , MPI_SUM , comm ); 00138 } 00139 00140 void all_reduce_sum( ParallelMachine comm , 00141 const float * local , float * global , unsigned count ) 00142 { 00143 float * tmp = const_cast<float*>( local ); 00144 MPI_Allreduce( tmp , global , count , MPI_FLOAT , MPI_SUM , comm ); 00145 } 00146 00147 void all_reduce_sum( ParallelMachine comm , 00148 const int * local , int * global , unsigned count ) 00149 { 00150 int * tmp = const_cast<int*>( local ); 00151 MPI_Allreduce( tmp , global , count , MPI_INT , MPI_SUM , comm ); 00152 } 00153 00154 void all_reduce_sum( ParallelMachine comm , 00155 const size_t * local , size_t * global , unsigned count ) 00156 { 00157 size_t * tmp = const_cast<size_t*>( local ); 00158 00159 if ( sizeof(size_t) == sizeof(unsigned) ) { 00160 MPI_Allreduce( tmp , global , count , MPI_UNSIGNED , MPI_SUM , comm ); 00161 } 00162 else if ( sizeof(size_t) == sizeof(unsigned long) ) { 00163 MPI_Allreduce( tmp , global , count , MPI_UNSIGNED_LONG , MPI_SUM , comm ); 00164 } 00165 else { 00166 unsigned long * const in = new unsigned long[ count ]; 00167 unsigned long * const out = new unsigned long[ count ]; 00168 00169 for ( unsigned i = 0 ; i < count ; ++i ) { in[i] = local[i] ; } 00170 MPI_Allreduce( in , out , count , MPI_UNSIGNED_LONG , MPI_SUM , comm ); 00171 for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = out[i] ; } 00172 00173 delete[] in ; 00174 delete[] out ; 00175 } 00176 } 00177 00178 void all_reduce_bor( ParallelMachine comm , 00179 const unsigned * local , 00180 unsigned * global , unsigned count ) 00181 { 00182 unsigned * tmp = const_cast<unsigned*>( local ); 00183 MPI_Allreduce( tmp , global , count , MPI_UNSIGNED , MPI_BOR , comm ); 00184 } 00185 00186 //---------------------------------------------------------------------- 00187 //---------------------------------------------------------------------- 00188 00189 #else 00190 00191 void all_write_string( ParallelMachine , 00192 std::ostream & arg_root_os , 00193 const std::string & arg_msg ) 00194 { 00195 arg_root_os << arg_msg ; 00196 } 00197 00198 void all_reduce_sum( ParallelMachine , 00199 const double * local , double * global , unsigned count ) 00200 { 00201 for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; } 00202 } 00203 00204 void all_reduce_sum( ParallelMachine , 00205 const float * local , float * global , unsigned count ) 00206 { 00207 for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; } 00208 } 00209 00210 void all_reduce_sum( ParallelMachine , 00211 const int * local , int * global , unsigned count ) 00212 { 00213 for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; } 00214 } 00215 00216 void all_reduce_sum( ParallelMachine , 00217 const size_t * local , size_t * global , unsigned count ) 00218 { 00219 for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; } 00220 } 00221 00222 void all_reduce_bor( ParallelMachine , 00223 const unsigned * local , 00224 unsigned * global , unsigned count ) 00225 { 00226 for ( unsigned i = 0 ; i < count ; ++i ) { global[i] = local[i] ; } 00227 } 00228 00229 //---------------------------------------------------------------------- 00230 00231 void all_reduce( ParallelMachine , 00232 ParallelReduceOp , 00233 void * arg_in , 00234 void * arg_out , 00235 unsigned arg_len ) 00236 { 00237 unsigned char * i = reinterpret_cast<unsigned char *>( arg_in ); 00238 unsigned char * o = reinterpret_cast<unsigned char *>( arg_out ); 00239 for ( unsigned char * const e = i + arg_len ; e != i ; ++i , ++o ) { 00240 *o = *i ; 00241 } 00242 } 00243 00244 #endif 00245 00246 } 00247