|
Sierra Toolkit
Version of the Day
|
00001 /*------------------------------------------------------------------------*/ 00002 /* Copyright 2010 Sandia Corporation. */ 00003 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */ 00004 /* license for use of this work by or on behalf of the U.S. Government. */ 00005 /* Export of this program may require a license from the */ 00006 /* United States Government. */ 00007 /*------------------------------------------------------------------------*/ 00008 00009 #include <stdexcept> 00010 #include <sstream> 00011 #include <algorithm> 00012 #include <limits> 00013 #include <stdint.h> 00014 00015 #include <stk_util/parallel/ParallelComm.hpp> 00016 #include <stk_util/parallel/DistributedIndex.hpp> 00017 00018 #include <stk_util/util/RadixSort.hpp> 00019 00020 namespace stk_classic { 00021 namespace parallel { 00022 00023 //---------------------------------------------------------------------- 00024 00025 namespace { 00026 00027 struct KeyProcLess { 00028 00029 bool operator()( const DistributedIndex::KeyProc & lhs , 00030 const DistributedIndex::KeyType & rhs ) const 00031 { return lhs.first < rhs ; } 00032 00033 }; 00034 00035 void sort_unique( std::vector<DistributedIndex::KeyProc> & key_usage ) 00036 { 00037 std::vector<DistributedIndex::KeyProc>::iterator 00038 i = key_usage.begin() , 00039 j = key_usage.end() ; 00040 00041 std::sort( i , j ); 00042 00043 i = std::unique( i , j ); 00044 00045 key_usage.erase( i , j ); 00046 } 00047 00048 void sort_unique( std::vector<DistributedIndex::KeyType> & keys ) 00049 { 00050 stk_classic::util::radix_sort_unsigned((keys.empty() ? NULL : &keys[0]), keys.size()); 00051 00052 std::vector<DistributedIndex::KeyType>::iterator 00053 i = keys.begin() , 00054 j = keys.end() ; 00055 00056 i = std::unique( i , j ); 00057 keys.erase( i , j ); 00058 } 00059 00060 // reserve vector size (current size + rev_buffer remaining) 00061 template < class T > 00062 inline void reserve_for_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v) 00063 { 00064 unsigned num_remote = 0; 00065 for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) { 00066 CommBuffer & buf = all.recv_buffer( p ); 00067 num_remote += buf.remaining() / sizeof(T); 00068 } 00069 v.reserve(v.size() + num_remote); 00070 } 00071 00072 // unpack buffer into vector 00073 template < class T > 00074 inline void unpack_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<T>& v) 00075 { 00076 reserve_for_recv_buffer(all, comm_size, v); 00077 for (DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) { 00078 CommBuffer & buf = all.recv_buffer( p ); 00079 while ( buf.remaining() ) { 00080 T kp; 00081 buf.unpack( kp ); 00082 v.push_back( kp ); 00083 } 00084 } 00085 } 00086 00087 // unpack buffer into vector, where pair.second is the processor 00088 template < class T > 00089 inline void unpack_with_proc_recv_buffer( const CommAll& all, const DistributedIndex::ProcType& comm_size, std::vector<std::pair<T,DistributedIndex::ProcType> >& v) 00090 { 00091 reserve_for_recv_buffer(all, comm_size, v); 00092 for ( DistributedIndex::ProcType p = 0 ; p < comm_size ; ++p ) { 00093 CommBuffer & buf = all.recv_buffer( p ); 00094 std::pair<T,DistributedIndex::ProcType> kp; 00095 kp.second = p; 00096 while ( buf.remaining() ) { 00097 buf.unpack( kp.first ); 00098 v.push_back( kp ); 00099 } 00100 } 00101 } 00102 00103 } // namespace <unnamed> 00104 00105 //---------------------------------------------------------------------- 00106 00107 enum { DISTRIBUTED_INDEX_CHUNK_BITS = 12 }; 00108 00109 enum { DISTRIBUTED_INDEX_CHUNK_SIZE = 00110 size_t(1) << DISTRIBUTED_INDEX_CHUNK_BITS }; 00111 00112 DistributedIndex::ProcType 00113 DistributedIndex::to_which_proc( const DistributedIndex::KeyType & key ) const 00114 { 00115 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00116 if ( m_key_span[i].first <= key && key <= m_key_span[i].second ) { 00117 const KeyType offset = key - m_key_span[i].first ; 00118 return ( offset >> DISTRIBUTED_INDEX_CHUNK_BITS ) % m_comm_size ; 00119 } 00120 } 00121 return m_comm_size ; 00122 } 00123 00124 //---------------------------------------------------------------------- 00125 00126 DistributedIndex::~DistributedIndex() {} 00127 00128 DistributedIndex::DistributedIndex ( 00129 ParallelMachine comm , 00130 const std::vector<KeySpan> & partition_bounds ) 00131 : m_comm( comm ), 00132 m_comm_rank( parallel_machine_rank( comm ) ), 00133 m_comm_size( parallel_machine_size( comm ) ), 00134 m_span_count(0), 00135 m_key_span(), 00136 m_key_usage() 00137 { 00138 unsigned info[2] ; 00139 info[0] = partition_bounds.size(); 00140 info[1] = 0 ; 00141 00142 // Check each span for validity 00143 00144 for ( std::vector<KeySpan>::const_iterator 00145 i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) { 00146 if ( i->second < i->first || 00147 ( i != partition_bounds.begin() && i->first <= (i-1)->second ) ) { 00148 info[1] = 1 ; 00149 } 00150 } 00151 00152 #if defined( STK_HAS_MPI ) 00153 if (m_comm_size > 1) { 00154 MPI_Bcast( info , 2 , MPI_UNSIGNED , 0 , comm ); 00155 } 00156 00157 if ( 0 < info[0] ) { 00158 m_key_span.resize( info[0] ); 00159 if ( 0 == parallel_machine_rank( comm ) ) { 00160 m_key_span = partition_bounds ; 00161 } 00162 if (m_comm_size > 1) { 00163 MPI_Bcast( (m_key_span.empty() ? NULL : & m_key_span[0]), info[0] * sizeof(KeySpan), MPI_BYTE, 0, comm ); 00164 } 00165 } 00166 #else 00167 m_key_span = partition_bounds ; 00168 #endif 00169 00170 if ( info[1] ) { 00171 std::ostringstream msg ; 00172 msg << "sierra::parallel::DistributedIndex ctor( comm , " ; 00173 00174 for ( std::vector<KeySpan>::const_iterator 00175 i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) { 00176 msg << " ( min = " << i->first << " , max = " << i->second << " )" ; 00177 } 00178 msg << " ) contains invalid span of keys" ; 00179 throw std::runtime_error( msg.str() ); 00180 } 00181 00182 m_span_count = info[0] ; 00183 00184 if ( 0 == m_span_count ) { 00185 m_key_span.push_back( 00186 KeySpan( std::numeric_limits<KeyType>::min(), 00187 std::numeric_limits<KeyType>::max() ) ); 00188 m_span_count = 1 ; 00189 } 00190 } 00191 00192 //---------------------------------------------------------------------- 00193 //---------------------------------------------------------------------- 00194 00195 namespace { 00196 00197 bool is_sorted_and_unique( const std::vector<DistributedIndex::KeyProc> & key_usage ) 00198 { 00199 std::vector<DistributedIndex::KeyProc>::const_iterator itr = key_usage.begin(); 00200 std::vector<DistributedIndex::KeyProc>::const_iterator end = key_usage.end(); 00201 for ( ; itr != end; ++itr ) { 00202 if ( itr + 1 != end && *itr >= *(itr + 1) ) { 00203 return false; 00204 } 00205 } 00206 return true; 00207 } 00208 00209 void query_pack_to_usage( 00210 const std::vector<DistributedIndex::KeyProc> & key_usage , 00211 const std::vector<DistributedIndex::KeyType> & request , 00212 CommAll & all ) 00213 { 00214 std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin(); 00215 std::vector<DistributedIndex::KeyType>::const_iterator k = request.begin(); 00216 00217 for ( ; k != request.end() && i != key_usage.end() ; ++k ) { 00218 00219 for ( ; i != key_usage.end() && i->first < *k ; ++i ); 00220 00221 std::vector<DistributedIndex::KeyProc>::const_iterator j = i ; 00222 for ( ; j != key_usage.end() && j->first == *k ; ++j ); 00223 00224 for ( std::vector<DistributedIndex::KeyProc>::const_iterator 00225 jsend = i ; jsend != j ; ++jsend ) { 00226 00227 for ( std::vector<DistributedIndex::KeyProc>::const_iterator 00228 jinfo = i ; jinfo != j ; ++jinfo ) { 00229 00230 all.send_buffer( jsend->second ) 00231 .pack<DistributedIndex::KeyProc>( *jinfo ); 00232 } 00233 } 00234 } 00235 } 00236 00237 void query_pack( const std::vector<DistributedIndex::KeyProc> & key_usage , 00238 const std::vector<DistributedIndex::KeyProc> & request , 00239 CommAll & all ) 00240 { 00241 std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin(); 00242 00243 for ( std::vector<DistributedIndex::KeyProc>::const_iterator 00244 k = request.begin() ; 00245 k != request.end() && 00246 i != key_usage.end() ; ++k ) { 00247 00248 for ( ; i != key_usage.end() && i->first < k->first ; ++i ); 00249 00250 for ( std::vector<DistributedIndex::KeyProc>::const_iterator j = i ; 00251 j != key_usage.end() && j->first == k->first ; ++j ) { 00252 all.send_buffer( k->second ).pack<DistributedIndex::KeyProc>( *j ); 00253 } 00254 } 00255 } 00256 00257 } 00258 00259 void DistributedIndex::query( 00260 const std::vector<DistributedIndex::KeyProc> & request , 00261 std::vector<DistributedIndex::KeyProc> & sharing_of_keys ) const 00262 { 00263 sharing_of_keys.clear(); 00264 00265 CommAll all( m_comm ); 00266 00267 query_pack( m_key_usage , request , all ); // Sizing 00268 00269 all.allocate_buffers( m_comm_size / 4 , false ); 00270 00271 query_pack( m_key_usage , request , all ); // Packing 00272 00273 all.communicate(); 00274 00275 unpack_recv_buffer(all, m_comm_size, sharing_of_keys); 00276 00277 std::sort( sharing_of_keys.begin() , sharing_of_keys.end() ); 00278 } 00279 00280 void DistributedIndex::query( 00281 std::vector<DistributedIndex::KeyProc> & sharing_of_local_keys ) const 00282 { 00283 query( m_key_usage , sharing_of_local_keys ); 00284 } 00285 00286 void DistributedIndex::query( 00287 const std::vector<DistributedIndex::KeyType> & keys , 00288 std::vector<DistributedIndex::KeyProc> & sharing_keys ) const 00289 { 00290 std::vector<KeyProc> request ; 00291 00292 { 00293 bool bad_key = false ; 00294 CommAll all( m_comm ); 00295 00296 for ( std::vector<KeyType>::const_iterator 00297 k = keys.begin() ; k != keys.end() ; ++k ) { 00298 const ProcType p = to_which_proc( *k ); 00299 00300 if ( p < m_comm_size ) { 00301 all.send_buffer( p ).pack<KeyType>( *k ); 00302 } 00303 else { 00304 bad_key = true ; 00305 } 00306 } 00307 00308 // Error condition becomes global: 00309 00310 bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key ); 00311 00312 if ( bad_key ) { 00313 throw std::runtime_error("stk_classic::parallel::DistributedIndex::query given a key which is out of range"); 00314 } 00315 00316 for ( std::vector<KeyType>::const_iterator 00317 k = keys.begin() ; k != keys.end() ; ++k ) { 00318 all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k ); 00319 } 00320 00321 all.communicate(); 00322 00323 unpack_with_proc_recv_buffer(all, m_comm_size, request); 00324 } 00325 00326 sort_unique( request ); 00327 00328 query( request , sharing_keys ); 00329 } 00330 00331 void DistributedIndex::query_to_usage( 00332 const std::vector<DistributedIndex::KeyType> & keys , 00333 std::vector<DistributedIndex::KeyProc> & sharing_keys ) const 00334 { 00335 std::vector<KeyType> request ; 00336 00337 { 00338 bool bad_key = false ; 00339 CommAll all( m_comm ); 00340 00341 for ( std::vector<KeyType>::const_iterator 00342 k = keys.begin() ; k != keys.end() ; ++k ) { 00343 const ProcType p = to_which_proc( *k ); 00344 00345 if ( p < m_comm_size ) { 00346 all.send_buffer( p ).pack<KeyType>( *k ); 00347 } 00348 else { 00349 bad_key = true ; 00350 } 00351 } 00352 00353 // Error condition becomes global: 00354 00355 bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key ); 00356 00357 if ( bad_key ) { 00358 throw std::runtime_error("stk_classic::parallel::DistributedIndex::query given a key which is out of range"); 00359 } 00360 00361 for ( std::vector<KeyType>::const_iterator 00362 k = keys.begin() ; k != keys.end() ; ++k ) { 00363 all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k ); 00364 } 00365 00366 all.communicate(); 00367 00368 unpack_recv_buffer(all, m_comm_size, request); 00369 } 00370 00371 sort_unique( request ); 00372 00373 { 00374 CommAll all( m_comm ); 00375 00376 query_pack_to_usage( m_key_usage , request , all ); // Sizing 00377 00378 all.allocate_buffers( m_comm_size / 4 , false ); 00379 00380 query_pack_to_usage( m_key_usage , request , all ); // Packing 00381 00382 all.communicate(); 00383 00384 unpack_recv_buffer(all, m_comm_size, sharing_keys); 00385 00386 std::sort( sharing_keys.begin() , sharing_keys.end() ); 00387 } 00388 } 00389 00390 //---------------------------------------------------------------------- 00391 //---------------------------------------------------------------------- 00392 00393 namespace { 00394 00395 struct RemoveKeyProc { 00396 00397 bool operator()( const DistributedIndex::KeyProc & kp ) const 00398 { return kp.second < 0 ; } 00399 00400 static void mark( std::vector<DistributedIndex::KeyProc> & key_usage , 00401 const DistributedIndex::KeyProc & kp ) 00402 { 00403 std::vector<DistributedIndex::KeyProc>::iterator 00404 i = std::lower_bound( key_usage.begin(), 00405 key_usage.end(), kp.first, KeyProcLess() ); 00406 00407 // Iterate over the span of KeyProcs with matching key until an exact match 00408 // is found. We have to do it this way because marking a KeyProc unsorts it 00409 // in the key_usage vector, so we cannot look up KeyProcs directly once marking 00410 // has begun. 00411 while ( i != key_usage.end() && kp.first == i->first && kp.second != i->second) { ++i ; } 00412 00413 if ( i != key_usage.end() && kp == *i ) { 00414 i->second = -1 ; 00415 } 00416 } 00417 00418 static void clean( std::vector<DistributedIndex::KeyProc> & key_usage ) 00419 { 00420 std::vector<DistributedIndex::KeyProc>::iterator end = 00421 std::remove_if( key_usage.begin() , key_usage.end() , RemoveKeyProc() ); 00422 key_usage.erase( end , key_usage.end() ); 00423 } 00424 }; 00425 00426 } 00427 00428 void DistributedIndex::update_keys( 00429 const std::vector<DistributedIndex::KeyType> & add_new_keys , 00430 const std::vector<DistributedIndex::KeyType> & remove_existing_keys ) 00431 { 00432 std::vector<unsigned long> count_remove( m_comm_size , (unsigned long)0 ); 00433 std::vector<unsigned long> count_add( m_comm_size , (unsigned long)0 ); 00434 00435 size_t local_bad_input = 0 ; 00436 00437 // Iterate over keys being removed and keep a count of keys being removed 00438 // from other processes 00439 for ( std::vector<KeyType>::const_iterator 00440 i = remove_existing_keys.begin(); 00441 i != remove_existing_keys.end(); ++i ) { 00442 const ProcType p = to_which_proc( *i ); 00443 if ( m_comm_size <= p ) { 00444 // Key is not within one of the span: 00445 ++local_bad_input ; 00446 } 00447 else if ( p != m_comm_rank ) { 00448 ++( count_remove[ p ] ); 00449 } 00450 } 00451 00452 // Iterate over keys being added and keep a count of keys being added 00453 // to other processes 00454 for ( std::vector<KeyType>::const_iterator 00455 i = add_new_keys.begin(); 00456 i != add_new_keys.end(); ++i ) { 00457 const ProcType p = to_which_proc( *i ); 00458 if ( p == m_comm_size ) { 00459 // Key is not within one of the span: 00460 ++local_bad_input ; 00461 } 00462 else if ( p != m_comm_rank ) { 00463 ++( count_add[ p ] ); 00464 } 00465 } 00466 00467 CommAll all( m_comm ); 00468 00469 // Sizing and add_new_keys bounds checking: 00470 00471 // For each process, we are going to send the number of removed keys, 00472 // the removed keys, and the added keys. It will be assumed that any keys 00473 // beyond the number of removed keys will be added keys. 00474 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00475 if ( count_remove[p] || count_add[p] ) { 00476 CommBuffer & buf = all.send_buffer( p ); 00477 buf.skip<unsigned long>( 1 ); 00478 buf.skip<KeyType>( count_remove[p] ); 00479 buf.skip<KeyType>( count_add[p] ); 00480 } 00481 } 00482 00483 // Allocate buffers and perform a global OR of error_flag 00484 const bool symmetry_flag = false ; 00485 const bool error_flag = 0 < local_bad_input ; 00486 00487 bool global_bad_input = 00488 all.allocate_buffers( m_comm_size / 4, symmetry_flag , error_flag ); 00489 00490 if ( global_bad_input ) { 00491 std::ostringstream msg ; 00492 00493 if ( 0 < local_bad_input ) { 00494 msg << "stk_classic::parallel::DistributedIndex::update_keys ERROR Given " 00495 << local_bad_input << " of " << add_new_keys.size() 00496 << " add_new_keys outside of any span" ; 00497 } 00498 00499 throw std::runtime_error( msg.str() ); 00500 } 00501 00502 // Packing: 00503 00504 // Pack the remove counts for each process 00505 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00506 if ( count_remove[p] || count_add[p] ) { 00507 all.send_buffer( p ).pack<unsigned long>( count_remove[p] ); 00508 } 00509 } 00510 00511 // Pack the removed keys for each process 00512 for ( std::vector<KeyType>::const_iterator 00513 i = remove_existing_keys.begin(); 00514 i != remove_existing_keys.end(); ++i ) { 00515 const ProcType p = to_which_proc( *i ); 00516 if ( p != m_comm_rank ) { 00517 all.send_buffer( p ).pack<KeyType>( *i ); 00518 } 00519 } 00520 00521 // Pack the added keys for each process 00522 for ( std::vector<KeyType>::const_iterator 00523 i = add_new_keys.begin(); 00524 i != add_new_keys.end(); ++i ) { 00525 const ProcType p = to_which_proc( *i ); 00526 if ( p != m_comm_rank ) { 00527 all.send_buffer( p ).pack<KeyType>( *i ); 00528 } 00529 } 00530 00531 // Communicate keys 00532 all.communicate(); 00533 00534 //------------------------------ 00535 // Remove for local keys 00536 00537 for ( std::vector<KeyType>::const_iterator 00538 i = remove_existing_keys.begin(); 00539 i != remove_existing_keys.end(); ++i ) { 00540 const ProcType p = to_which_proc( *i ); 00541 if ( p == m_comm_rank ) { 00542 RemoveKeyProc::mark( m_key_usage , KeyProc( *i , p ) ); 00543 } 00544 } 00545 00546 // Unpack the remove key and find it. 00547 // Set the process to a negative value for subsequent removal. 00548 00549 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00550 CommBuffer & buf = all.recv_buffer( p ); 00551 if ( buf.remaining() ) { 00552 unsigned long remove_count = 0 ; 00553 00554 KeyProc kp ; 00555 00556 kp.second = p ; 00557 00558 buf.unpack<unsigned long>( remove_count ); 00559 00560 for ( ; 0 < remove_count ; --remove_count ) { 00561 buf.unpack<KeyType>( kp.first ); 00562 00563 RemoveKeyProc::mark( m_key_usage , kp ); 00564 } 00565 } 00566 } 00567 00568 RemoveKeyProc::clean( m_key_usage ); 00569 00570 //------------------------------ 00571 // Append for local keys 00572 00573 // Add new_keys going to this proc to local_key_usage 00574 std::vector<KeyProc> local_key_usage ; 00575 local_key_usage.reserve(add_new_keys.size()); 00576 for ( std::vector<KeyType>::const_iterator 00577 i = add_new_keys.begin(); 00578 i != add_new_keys.end(); ++i ) { 00579 00580 const ProcType p = to_which_proc( *i ); 00581 if ( p == m_comm_rank ) { 00582 local_key_usage.push_back( KeyProc( *i , p ) ); 00583 } 00584 } 00585 00586 // Merge local_key_usage and m_key_usage into temp_key 00587 std::vector<KeyProc> temp_key ; 00588 temp_key.reserve(local_key_usage.size() + m_key_usage.size()); 00589 std::sort( local_key_usage.begin(), local_key_usage.end() ); 00590 std::merge( m_key_usage.begin(), 00591 m_key_usage.end(), 00592 local_key_usage.begin(), 00593 local_key_usage.end(), 00594 std::back_inserter(temp_key) ); 00595 00596 // Unpack and append for remote keys: 00597 std::vector<KeyProc> remote_key_usage ; 00598 00599 unpack_with_proc_recv_buffer(all, m_comm_size, remote_key_usage); 00600 00601 std::sort( remote_key_usage.begin(), remote_key_usage.end() ); 00602 00603 m_key_usage.clear(); 00604 m_key_usage.reserve(temp_key.size() + remote_key_usage.size()); 00605 00606 // Merge temp_key and remote_key_usage into m_key_usage, so... 00607 // m_key_usage = local_key_usage + remote_key_usage + m_key_usage(orig) 00608 std::merge( temp_key.begin(), 00609 temp_key.end(), 00610 remote_key_usage.begin(), 00611 remote_key_usage.end(), 00612 std::back_inserter(m_key_usage) ); 00613 00614 // Unique m_key_usage 00615 m_key_usage.erase(std::unique( m_key_usage.begin(), 00616 m_key_usage.end()), 00617 m_key_usage.end() ); 00618 00619 // Check invariant that m_key_usage is sorted 00620 if (!is_sorted_and_unique(m_key_usage)) { 00621 throw std::runtime_error( "Sorted&unique invariant violated!" ); 00622 } 00623 } 00624 00625 //---------------------------------------------------------------------- 00626 //---------------------------------------------------------------------- 00627 00628 void DistributedIndex::generate_new_global_key_upper_bound( 00629 const std::vector<size_t> & requests , 00630 std::vector<DistributedIndex::KeyType> & global_key_upper_bound ) const 00631 { 00632 bool bad_request = m_span_count != requests.size(); 00633 00634 std::ostringstream error_msg ; 00635 00636 error_msg 00637 << "sierra::parallel::DistributedIndex::generate_new_keys_global_counts( " ; 00638 00639 std::vector<unsigned long> 00640 local_counts( m_span_count + 1 , (unsigned long) 0 ), 00641 global_counts( m_span_count + 1 , (unsigned long) 0 ); 00642 00643 // Count unique keys in each span and add requested keys for 00644 // final total count of keys needed. 00645 00646 // Append the error check to this communication to avoid 00647 // and extra reduction operation. 00648 local_counts[ m_span_count ] = m_span_count != requests.size(); 00649 00650 if ( m_span_count == requests.size() ) { 00651 00652 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00653 local_counts[i] = requests[i] ; 00654 } 00655 00656 std::vector<KeyProc>::const_iterator j = m_key_usage.begin(); 00657 00658 for ( size_t i = 0 ; i < m_span_count && j != m_key_usage.end() ; ++i ) { 00659 const KeyType key_span_last = m_key_span[i].second ; 00660 size_t count = 0 ; 00661 while ( j != m_key_usage.end() && j->first <= key_span_last ) { 00662 const KeyType key = j->first ; 00663 while ( j != m_key_usage.end() && key == j->first ) { ++j ; } 00664 ++count ; 00665 } 00666 local_counts[i] += count ; 00667 } 00668 } 00669 00670 #if defined( STK_HAS_MPI ) 00671 if (m_comm_size > 1) { 00672 MPI_Allreduce( (local_counts.empty() ? NULL : & local_counts[0]) , (global_counts.empty() ? NULL : & global_counts[0]) , 00673 m_span_count + 1 , MPI_UNSIGNED_LONG , 00674 MPI_SUM , m_comm ); 00675 } 00676 else { 00677 global_counts = local_counts ; 00678 } 00679 #else 00680 global_counts = local_counts ; 00681 #endif 00682 00683 bad_request = global_counts[m_span_count] != 0 ; 00684 00685 if ( bad_request ) { 00686 if ( m_span_count != requests.size() ) { 00687 error_msg << " requests.size() = " << requests.size() 00688 << " != " << m_span_count << " )" ; 00689 } 00690 } 00691 00692 if ( ! bad_request ) { 00693 for ( unsigned i = 0 ; i < m_span_count ; ++i ) { 00694 const size_t span_available = 00695 ( 1 + m_key_span[i].second - m_key_span[i].first ); 00696 00697 const size_t span_requested = global_counts[i]; 00698 00699 if ( span_available < span_requested ) { 00700 bad_request = true ; 00701 error_msg << " global_sum( (existing+request)[" << i << "] ) = " 00702 << span_requested 00703 << " > global_sum( span_available ) = " 00704 << span_available ; 00705 } 00706 } 00707 } 00708 00709 if ( bad_request ) { 00710 throw std::runtime_error( error_msg.str() ); 00711 } 00712 00713 // Determine the maximum generated key 00714 00715 global_key_upper_bound.resize( m_span_count ); 00716 00717 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00718 global_key_upper_bound[i] = m_key_span[i].first + global_counts[i] - 1 ; 00719 } 00720 } 00721 00722 //-------------------------------------------------------------------- 00723 //-------------------------------------------------------------------- 00724 00725 void DistributedIndex::generate_new_keys_local_planning( 00726 const std::vector<DistributedIndex::KeyType> & key_global_upper_bound , 00727 const std::vector<size_t> & requests_local , 00728 std::vector<long> & new_request , 00729 std::vector<KeyType> & requested_keys , 00730 std::vector<KeyType> & contrib_keys ) const 00731 { 00732 new_request.assign( m_span_count , long(0) ); 00733 00734 contrib_keys.clear(); 00735 00736 std::vector<KeyProc>::const_iterator j = m_key_usage.begin(); 00737 00738 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00739 // The maximum generated key from any process will 00740 // not exceed this value. 00741 const KeyType key_upper_bound = key_global_upper_bound[i] ; 00742 00743 const size_t init_size = contrib_keys.size(); 00744 00745 const size_t chunk_inc = m_comm_size * DISTRIBUTED_INDEX_CHUNK_SIZE ; 00746 00747 const size_t chunk_rsize = m_comm_rank * DISTRIBUTED_INDEX_CHUNK_SIZE ; 00748 00749 for ( KeyType key_begin = m_key_span[i].first + 00750 chunk_rsize ; 00751 key_begin <= key_upper_bound ; key_begin += chunk_inc ) { 00752 00753 // What is the first key of the chunk 00754 KeyType key_iter = key_begin ; 00755 00756 // What is the last key belonging to this process' chunk 00757 const KeyType key_last = 00758 std::min( key_begin + DISTRIBUTED_INDEX_CHUNK_SIZE - 1 , key_upper_bound ); 00759 00760 // Jump into the sorted used key vector to 00761 // the key which may be contributed 00762 00763 j = std::lower_bound( j, m_key_usage.end(), key_iter, KeyProcLess() ); 00764 // now know: j == m_key_usage.end() OR 00765 // key_iter <= j->first 00766 00767 for ( ; key_iter <= key_last ; ++key_iter ) { 00768 if ( j == m_key_usage.end() || key_iter < j->first ) { 00769 // The current attempt 'key_iter' is not used, contribute it. 00770 contrib_keys.push_back( key_iter ); 00771 } 00772 else { // j != m_key_usage.end() && key_iter == j->first 00773 // The current attempt 'key_iter' is already used, 00774 // increment the used-iterator to its next key value. 00775 while ( j != m_key_usage.end() && key_iter == j->first ) { 00776 ++j ; 00777 } 00778 } 00779 } 00780 } 00781 00782 // Determine which local keys will be contributed, 00783 // keeping what this process could use from the contribution. 00784 // This can reduce the subsequent communication load when 00785 // donating keys to another process. 00786 00787 const size_t this_contrib = contrib_keys.size() - init_size ; 00788 00789 // How many keys will this process keep: 00790 const size_t keep = std::min( requests_local[i] , this_contrib ); 00791 00792 // Take the kept keys from the contributed key vector. 00793 requested_keys.insert( requested_keys.end() , 00794 contrib_keys.end() - keep , 00795 contrib_keys.end() ); 00796 00797 contrib_keys.erase( contrib_keys.end() - keep , 00798 contrib_keys.end() ); 00799 00800 // New request is positive for needed keys or negative for donated keys 00801 new_request[i] = requests_local[i] - this_contrib ; 00802 } 00803 } 00804 00805 //---------------------------------------------------------------------- 00806 00807 void DistributedIndex::generate_new_keys_global_planning( 00808 const std::vector<long> & new_request , 00809 std::vector<long> & my_donations ) const 00810 { 00811 my_donations.assign( m_comm_size * m_span_count , long(0) ); 00812 00813 // Gather the global request plan for receiving and donating keys 00814 // Positive values for receiving, negative values for donating. 00815 00816 std::vector<long> new_request_global( m_comm_size * m_span_count ); 00817 00818 #if defined( STK_HAS_MPI ) 00819 00820 if (m_comm_size > 1) { // Gather requests into per-process spans 00821 00822 // There is a possible bug in MPI_Allgather, for Intel 12; use MPI_Gather instead 00823 #if defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 1200) 00824 { 00825 // MPI doesn't do 'const' in its interface, but the send buffer is const 00826 void * send_buf = const_cast<void*>( (void *)( (new_request.empty() ? NULL : & new_request[0]) )); 00827 void * recv_buf = (new_request_global.empty() ? NULL : & new_request_global[0]) ; 00828 for (int root = 0; root < m_comm_size; ++root) 00829 { 00830 MPI_Gather( send_buf , m_span_count , MPI_LONG , 00831 recv_buf , m_span_count , MPI_LONG , root, m_comm ); 00832 } 00833 } 00834 #else 00835 { 00836 // MPI doesn't do 'const' in its interface, but the send buffer is const 00837 void * send_buf = const_cast<void*>( (void *)( (new_request.empty() ? NULL : & new_request[0]) )); 00838 void * recv_buf = (new_request_global.empty() ? NULL : & new_request_global[0]) ; 00839 MPI_Allgather( send_buf , m_span_count , MPI_LONG , 00840 recv_buf , m_span_count , MPI_LONG , m_comm ); 00841 } 00842 #endif 00843 00844 } 00845 else { 00846 new_request_global = new_request ; 00847 } 00848 #else 00849 new_request_global = new_request ; 00850 #endif 00851 00852 // Now have the global receive & donate plan. 00853 //-------------------------------------------------------------------- 00854 // Generate my donate plan from the global receive & donate plan. 00855 00856 for ( unsigned i = 0 ; i < m_span_count ; ++i ) { 00857 00858 if ( new_request[i] < 0 ) { // This process is donating on this span 00859 long my_total_donate = - new_request[i] ; 00860 00861 long previous_donate = 0 ; 00862 00863 // Count what previous processes have donated: 00864 for ( int p = 0 ; p < m_comm_rank ; ++p ) { 00865 const long new_request_p = new_request_global[ p * m_span_count + i ] ; 00866 if ( new_request_p < 0 ) { 00867 previous_donate -= new_request_p ; 00868 } 00869 } 00870 00871 // What the donation count will be with my donation: 00872 long end_donate = previous_donate + my_total_donate ; 00873 00874 long previous_receive = 0 ; 00875 00876 // Determine my donation to other processes (one to many). 00877 00878 for ( int p = 0 ; p < m_comm_size && 0 < my_total_donate ; ++p ) { 00879 00880 const long new_request_p = new_request_global[ p * m_span_count + i ]; 00881 00882 if ( 0 < new_request_p ) { // Process 'p' receives keys 00883 00884 // Accumulation of requests: 00885 00886 previous_receive += new_request_p ; 00887 00888 if ( previous_donate < previous_receive ) { 00889 // I am donating to process 'p' 00890 const long n = std::min( previous_receive , end_donate ) 00891 - previous_donate ; 00892 00893 my_donations[ p * m_span_count + i ] = n ; 00894 previous_donate += n ; 00895 my_total_donate -= n ; 00896 } 00897 } 00898 } 00899 } 00900 } 00901 } 00902 00903 //-------------------------------------------------------------------- 00904 00905 void DistributedIndex::generate_new_keys( 00906 const std::vector<size_t> & requests , 00907 std::vector< std::vector<KeyType> > & requested_keys ) 00908 { 00909 //-------------------------------------------------------------------- 00910 // Develop the plan: 00911 00912 std::vector<KeyType> global_key_upper_bound ; 00913 std::vector<long> new_request ; 00914 std::vector<long> my_donations ; 00915 std::vector<KeyType> contrib_keys ; 00916 std::vector<KeyType> new_keys ; 00917 00918 // Verify input and generate global sum of 00919 // current key usage and requested new keys. 00920 // Throw a parallel consistent exception if the input is bad. 00921 00922 generate_new_global_key_upper_bound( requests , global_key_upper_bound ); 00923 00924 // No exception thrown means all inputs are good and parallel consistent 00925 00926 // Determine which local keys will be contributed, 00927 // keeping what this process could use from the contribution. 00928 // This can reduce the subsequent communication load when 00929 // donating keys to another process. 00930 00931 generate_new_keys_local_planning( global_key_upper_bound , 00932 requests , 00933 new_request , 00934 new_keys , 00935 contrib_keys ); 00936 00937 // Determine where this process will be donating 'contrib_keys' 00938 generate_new_keys_global_planning( new_request, my_donations ); 00939 00940 // Due to using an upper bound as opposed to an exact maximum 00941 // the contrib_keys is likely to contain more keys that are needed. 00942 // Remove unneeded keys. 00943 00944 // Backwards to erase from the end 00945 for ( size_t i = m_span_count ; 0 < i ; ) { 00946 --i ; 00947 size_t count = 0 ; 00948 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00949 count += my_donations[ p * m_span_count + i ]; 00950 } 00951 std::vector<KeyType>::iterator j_beg = contrib_keys.begin(); 00952 std::vector<KeyType>::iterator j_end = contrib_keys.end(); 00953 j_beg = std::lower_bound( j_beg , j_end , m_key_span[i].first ); 00954 j_end = std::upper_bound( j_beg , j_end , m_key_span[i].second ); 00955 const size_t n = std::distance( j_beg , j_end ); 00956 if ( count < n ) { 00957 contrib_keys.erase( j_beg + count , j_end ); 00958 } 00959 } 00960 00961 // Plan is done, communicate the new keys. 00962 //-------------------------------------------------------------------- 00963 // Put key this process is keeping into the index. 00964 m_key_usage.reserve(m_key_usage.size() + new_keys.size()); 00965 for ( std::vector<KeyType>::iterator i = new_keys.begin(); 00966 i != new_keys.end() ; ++i ) { 00967 m_key_usage.push_back( KeyProc( *i , m_comm_rank ) ); 00968 } 00969 00970 //-------------------------------------------------------------------- 00971 00972 CommAll all( m_comm ); 00973 00974 // Sizing 00975 00976 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00977 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00978 const size_t n_to_p = my_donations[ p * m_span_count + i ]; 00979 if ( 0 < n_to_p ) { 00980 all.send_buffer(p).skip<KeyType>( n_to_p ); 00981 } 00982 } 00983 } 00984 00985 all.allocate_buffers( m_comm_size / 4 , false ); 00986 00987 // Packing 00988 00989 { 00990 size_t n = 0 ; 00991 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00992 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00993 const size_t n_to_p = my_donations[ p * m_span_count + i ]; 00994 if ( 0 < n_to_p ) { 00995 all.send_buffer(p).pack<KeyType>( & contrib_keys[n] , n_to_p ); 00996 for ( size_t k = 0 ; k < n_to_p ; ++k , ++n ) { 00997 m_key_usage.push_back( KeyProc( contrib_keys[n] , p ) ); 00998 } 00999 } 01000 } 01001 } 01002 } 01003 01004 std::sort( m_key_usage.begin() , m_key_usage.end() ); 01005 01006 all.communicate(); 01007 01008 // Unpacking 01009 unpack_recv_buffer( all, m_comm_size, new_keys); 01010 01011 stk_classic::util::radix_sort_unsigned((new_keys.empty() ? NULL : &new_keys[0]), new_keys.size()); 01012 01013 requested_keys.resize( m_span_count ); 01014 01015 { 01016 std::vector<KeyType>::iterator i_beg = new_keys.begin(); 01017 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 01018 std::vector<KeyType>::iterator i_end = i_beg + requests[i] ; 01019 requested_keys[i].assign( i_beg , i_end ); 01020 i_beg = i_end ; 01021 } 01022 } 01023 } 01024 01025 //---------------------------------------------------------------------- 01026 01027 } // namespace util 01028 } // namespace stk_classic 01029 01030