|
Sierra Toolkit
Version of the Day
|
00001 /*------------------------------------------------------------------------*/ 00002 /* Copyright 2010 Sandia Corporation. */ 00003 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */ 00004 /* license for use of this work by or on behalf of the U.S. Government. */ 00005 /* Export of this program may require a license from the */ 00006 /* United States Government. */ 00007 /*------------------------------------------------------------------------*/ 00008 00009 #include <list> 00010 #include <string> 00011 #include <sstream> 00012 #include <utility> 00013 #include <vector> 00014 #include <boost/unordered_map.hpp> 00015 00016 #include <stk_util/environment/RuntimeMessage.hpp> 00017 #include <stk_util/environment/ReportHandler.hpp> 00018 #include <stk_util/util/Bootstrap.hpp> 00019 #include <stk_util/util/Marshal.hpp> 00020 00021 namespace stk_classic { 00022 00023 MessageCode 00024 MessageCode::s_defaultMessageCode(100000000); 00025 00026 namespace { 00027 00028 void bootstrap() 00029 { 00030 register_message_type(MSG_WARNING, 10000000, "warning"); 00031 register_message_type(MSG_DOOMED, 10000000, "error"); 00032 register_message_type(MSG_EXCEPTION, 1000000, "exception"); 00033 register_message_type(MSG_INFORMATION, 1000000, "informational"); 00034 } 00035 00036 stk_classic::Bootstrap x(bootstrap); 00037 00038 typedef std::pair<MessageId, std::string> MessageKey; 00039 00040 typedef boost::unordered_map<MessageKey, Throttle> MessageIdMap; 00041 00042 MessageIdMap s_messageIdMap; 00043 00044 MessageIdMap s_deferredMessageIdMap; 00045 00046 struct DeferredMessage 00047 { 00048 DeferredMessage() 00049 {} 00050 00051 DeferredMessage( 00052 size_t type, 00053 MessageId message_id, 00054 size_t throttle_cutoff, 00055 int throttle_group, 00056 const std::string & header, 00057 const std::string & aggregate) 00058 : m_type(type), 00059 m_messageId(message_id), 00060 m_rank(0), 00061 m_throttleCutoff(throttle_cutoff), 00062 m_throttleGroup(throttle_group), 00063 m_header(header), 00064 m_aggregate(aggregate) 00065 {} 00066 00067 size_t m_type; 00068 MessageId m_messageId; 00069 int m_rank; 00070 size_t m_throttleCutoff; 00071 int m_throttleGroup; 00072 std::string m_header; 00073 std::string m_aggregate; 00074 }; 00075 00076 typedef std::vector<DeferredMessage> DeferredMessageVector; 00077 00078 struct DeferredMessageLess : public std::binary_function<DeferredMessage, DeferredMessage, bool> 00079 { 00080 bool operator()(const DeferredMessage &key_1, const DeferredMessage &key_2) const { 00081 return (key_1.m_type < key_2.m_type) 00082 || (!(key_2.m_type < key_1.m_type) && key_1.m_messageId < key_2.m_messageId) 00083 || (!(key_2.m_type < key_1.m_type) && !(key_2.m_messageId < key_1.m_messageId) && key_1.m_header < key_2.m_header); 00084 } 00085 }; 00086 00087 DeferredMessageVector s_deferredMessageVector; 00088 00089 struct MessageTypeInfo 00090 { 00091 MessageTypeInfo() 00092 : m_count(0), 00093 m_maxCount(10000000), 00094 m_name("unknown") 00095 {} 00096 00097 unsigned m_count; 00098 unsigned m_maxCount; 00099 std::string m_name; 00100 }; 00101 00102 typedef boost::unordered_map<unsigned, MessageTypeInfo> MessageTypeInfoMap; 00103 00104 MessageTypeInfoMap s_messageTypeInfo; 00105 00106 MessageTypeInfo & 00107 get_message_type_info( 00108 unsigned type) 00109 { 00110 MessageTypeInfoMap::iterator it = s_messageTypeInfo.find(type & MSG_TYPE_MASK); 00111 if (it != s_messageTypeInfo.end()) 00112 return (*it).second; 00113 else 00114 return s_messageTypeInfo[type & MSG_TYPE_MASK]; 00115 } 00116 00117 00118 enum CutoffStatus { 00119 MSG_DISPLAY = 0, 00120 MSG_CUTOFF = 1, 00121 MSG_CUTOFF_EXCEEDED = 2 00122 }; 00123 00124 00125 CutoffStatus 00126 count_message( 00127 MessageId message_id, 00128 const char * message, 00129 const Throttle & throttle) 00130 { 00131 std::pair<MessageIdMap::iterator, bool> res = s_messageIdMap.insert(MessageIdMap::value_type(MessageIdMap::key_type(message_id, message), throttle)); 00132 size_t count = ++(*res.first).second.m_count; 00133 00134 if (count < (*res.first).second.m_cutoff) 00135 return MSG_DISPLAY; 00136 else if (count == (*res.first).second.m_cutoff) 00137 return MSG_CUTOFF; 00138 else 00139 return MSG_CUTOFF_EXCEEDED; 00140 } 00141 00142 Marshal &operator<<(Marshal &mout, const DeferredMessage &s) { 00143 mout << s.m_type << s.m_messageId << s.m_rank << s.m_throttleGroup << s.m_throttleCutoff << s.m_header << s.m_aggregate; 00144 return mout; 00145 } 00146 00147 Marshal &operator>>(Marshal &min, DeferredMessage &s) { 00148 min >> s.m_type >> s.m_messageId >> s.m_rank >> s.m_throttleGroup >> s.m_throttleCutoff >> s.m_header >> s.m_aggregate; 00149 return min; 00150 } 00151 00152 } // namespace <empty> 00153 00154 00155 void 00156 register_message_type( 00157 unsigned message_type, 00158 unsigned max_count, 00159 const char * name) 00160 { 00161 MessageTypeInfo &message_info = get_message_type_info(message_type); 00162 00163 message_info.m_maxCount = max_count; 00164 message_info.m_name = name; 00165 } 00166 00167 00168 unsigned 00169 get_message_count( 00170 unsigned message_type) 00171 { 00172 return get_message_type_info(message_type).m_count; 00173 } 00174 00175 00176 unsigned 00177 increment_message_count( 00178 unsigned message_type) 00179 { 00180 return ++get_message_type_info(message_type).m_count; 00181 } 00182 00183 00184 void 00185 reset_message_count( 00186 unsigned message_type) 00187 { 00188 get_message_type_info(message_type).m_count = 0; 00189 } 00190 00191 00192 const std::string & 00193 get_message_name( 00194 unsigned message_type) 00195 { 00196 return get_message_type_info(message_type).m_name; 00197 } 00198 00199 00200 void 00201 set_max_message_count( 00202 unsigned message_type, 00203 unsigned max_count) 00204 { 00205 get_message_type_info(message_type).m_maxCount = max_count; 00206 } 00207 00208 00209 unsigned 00210 get_max_message_count( 00211 unsigned message_type) 00212 { 00213 return get_message_type_info(message_type).m_maxCount; 00214 } 00215 00216 00217 void 00218 report_message( 00219 const char * message, 00220 unsigned message_type, 00221 const MessageCode & message_code) 00222 { 00223 if (message_type & MSG_DEFERRED) 00224 report(message, message_type); 00225 00226 else { 00227 unsigned count = increment_message_count(message_type); 00228 unsigned max_count = get_max_message_count(message_type); 00229 00230 if (count == max_count) { 00231 report(message, message_type); 00232 00233 std::ostringstream s; 00234 s << "Maximum " << get_message_name(message_type) << " count has been exceeded and will no longer be displayed"; 00235 report(s.str().c_str(), MSG_WARNING | MSG_SYMMETRIC); 00236 } 00237 00238 else if (count < max_count) { 00239 CutoffStatus cutoff = count_message(message_code.m_id, "", message_code.m_throttle); 00240 00241 if (cutoff == MSG_CUTOFF) { 00242 report(message, message_type); 00243 00244 std::ostringstream s; 00245 s << "Maximum count for this " << get_message_name(message_type) << " has been exceeded and will no longer be displayed"; 00246 report(s.str().c_str(), MSG_WARNING | MSG_SYMMETRIC); 00247 } 00248 00249 else if (cutoff == MSG_DISPLAY) 00250 report(message, message_type); 00251 } 00252 } 00253 } 00254 00255 00256 void 00257 reset_throttle_group( 00258 int throttle_group) 00259 { 00260 for (MessageIdMap::iterator it = s_messageIdMap.begin(); it != s_messageIdMap.end(); ++it) 00261 if ((*it).second.m_group == throttle_group) 00262 (*it).second.m_count = 0; 00263 } 00264 00265 00266 void 00267 add_deferred_message( 00268 int message_type, 00269 MessageId message_id, 00270 size_t throttle_cutoff, 00271 int throttle_group, 00272 const char * header, 00273 const char * aggegrate) 00274 { 00275 std::ostringstream s; 00276 s << header << " " << aggegrate; 00277 00278 report(s.str().c_str(), message_type | MSG_DEFERRED); 00279 00280 std::pair<MessageIdMap::iterator, bool> res = s_deferredMessageIdMap.insert(MessageIdMap::value_type(MessageIdMap::key_type(message_id, header), Throttle(throttle_cutoff, throttle_group))); 00281 size_t count = ++(*res.first).second.m_count; 00282 00283 if (count <= throttle_cutoff) 00284 s_deferredMessageVector.push_back(DeferredMessage(message_type, message_id, throttle_cutoff, throttle_group, header, aggegrate)); 00285 } 00286 00287 00289 00290 void 00291 report_deferred_messages( 00292 ParallelMachine comm) 00293 { 00294 #ifdef STK_HAS_MPI 00295 const int p_root = 0 ; 00296 const int p_size = parallel_machine_size(comm); 00297 const int p_rank = parallel_machine_rank(comm); 00298 00299 for (DeferredMessageVector::iterator it = s_deferredMessageVector.begin(); it != s_deferredMessageVector.end(); ++it) 00300 (*it).m_rank = p_rank; 00301 00302 Marshal mout; 00303 mout << s_deferredMessageVector; 00304 00305 DeferredMessageVector deferred_message_vector; 00306 00307 // Gather the send counts on root processor 00308 std::string send_string(mout.stream.str()); 00309 int send_count = send_string.size(); 00310 std::vector<int> recv_count(p_size, 0); 00311 int * const recv_count_ptr = &recv_count[0] ; 00312 00313 int result = MPI_Gather(&send_count, 1, MPI_INT, 00314 recv_count_ptr, 1, MPI_INT, 00315 p_root, comm); 00316 if (MPI_SUCCESS != result) { 00317 std::ostringstream message ; 00318 message << "stk_classic::report_deferred_messages FAILED: MPI_Gather = " << result ; 00319 throw std::runtime_error(message.str()); 00320 } 00321 00322 // Receive counts are only non-zero on the root processor: 00323 std::vector<int> recv_displ(p_size + 1, 0); 00324 00325 for (int i = 0 ; i < p_size ; ++i) { 00326 recv_displ[i + 1] = recv_displ[i] + recv_count[i] ; 00327 } 00328 00329 const int recv_size = recv_displ[p_size] ; 00330 00331 std::vector<char> buffer(recv_size); 00332 00333 { 00334 const char * const send_ptr = send_string.data(); 00335 char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ; 00336 int * const recv_displ_ptr = & recv_displ[0] ; 00337 00338 result = MPI_Gatherv((void *) send_ptr, send_count, MPI_CHAR, 00339 recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR, 00340 p_root, comm); 00341 if (MPI_SUCCESS != result) { 00342 std::ostringstream message ; 00343 message << "stk_classic::report_deferred_messages FAILED: MPI_Gatherv = " << result ; 00344 throw std::runtime_error(message.str()); 00345 } 00346 00347 00348 if (p_rank == p_root) { 00349 for (int i = 0; i < p_size; ++i) { 00350 Marshal min(std::string(recv_ptr + recv_displ[i], recv_ptr + recv_displ[i + 1])); 00351 min >> deferred_message_vector; 00352 } 00353 00354 std::stable_sort(deferred_message_vector.begin(), deferred_message_vector.end(), DeferredMessageLess()); 00355 00356 DeferredMessageVector::const_iterator current_message_it = deferred_message_vector.begin(); 00357 while (current_message_it != deferred_message_vector.end()) { 00358 const DeferredMessage ¤t_message = (*current_message_it); 00359 00360 DeferredMessageVector::const_iterator end = current_message_it + 1; 00361 while (end != deferred_message_vector.end() 00362 && current_message.m_messageId == (*end).m_messageId 00363 && current_message.m_header == (*end).m_header) 00364 ++end; 00365 00366 std::ostringstream s; 00367 00368 s << current_message.m_header << current_message.m_aggregate; 00369 00370 for (DeferredMessageVector::const_iterator it1 = current_message_it + 1; it1 != end; ++it1) { 00371 bool print = true; 00372 for (DeferredMessageVector::const_iterator it2 = current_message_it; it2 != it1; ++it2) 00373 if ((*it1).m_aggregate == (*it2).m_aggregate) { 00374 print = false; 00375 break; 00376 } 00377 if (print) { 00378 if (!(*it1).m_aggregate.find('\n')) 00379 s << ", "; 00380 s << (*it1).m_aggregate; 00381 } 00382 } 00383 00384 report_message(s.str().c_str(), current_message.m_type | stk_classic::MSG_SYMMETRIC, MessageCode(current_message.m_messageId, current_message.m_throttleCutoff, current_message.m_throttleGroup)); 00385 00386 current_message_it = end; 00387 } 00388 } 00389 } 00390 00391 s_deferredMessageIdMap.clear(); 00392 s_deferredMessageVector.clear(); 00393 #endif 00394 } 00395 00396 00397 void 00398 aggregate_messages( 00399 ParallelMachine comm, 00400 std::ostringstream & os, 00401 const char * separator) 00402 { 00403 #ifdef STK_HAS_MPI 00404 std::string message = os.str(); 00405 os.str(""); 00406 00407 const int p_root = 0 ; 00408 const int p_size = parallel_machine_size(comm); 00409 const int p_rank = parallel_machine_rank(comm); 00410 00411 int result ; 00412 00413 // Gather the send counts on root processor 00414 00415 int send_count = message.size(); 00416 00417 std::vector<int> recv_count(p_size, 0); 00418 00419 int * const recv_count_ptr = & recv_count[0] ; 00420 00421 result = MPI_Gather(& send_count, 1, MPI_INT, 00422 recv_count_ptr, 1, MPI_INT, 00423 p_root, comm); 00424 00425 if (MPI_SUCCESS != result) { 00426 std::ostringstream s; 00427 s << "stk_classic::all_write FAILED: MPI_Gather = " << result ; 00428 throw std::runtime_error(s.str()); 00429 } 00430 00431 // Receive counts are only non-zero on the root processor: 00432 std::vector<int> recv_displ(p_size + 1, 0); 00433 00434 for (int i = 0 ; i < p_size ; ++i) { 00435 recv_displ[i + 1] = recv_displ[i] + recv_count[i] ; 00436 } 00437 00438 const int recv_size = recv_displ[ p_size ] ; 00439 00440 std::vector<char> buffer(recv_size); 00441 00442 { 00443 const char * const send_ptr = message.c_str(); 00444 char * const recv_ptr = recv_size ? & buffer[0] : (char *) NULL ; 00445 int * const recv_displ_ptr = & recv_displ[0] ; 00446 00447 result = MPI_Gatherv((void*) send_ptr, send_count, MPI_CHAR, 00448 recv_ptr, recv_count_ptr, recv_displ_ptr, MPI_CHAR, 00449 p_root, comm); 00450 } 00451 00452 if (MPI_SUCCESS != result) { 00453 std::ostringstream s ; 00454 s << "stk_classic::all_write FAILED: MPI_Gatherv = " << result ; 00455 throw std::runtime_error(s.str()); 00456 } 00457 00458 if (p_root == (int) p_rank) { 00459 bool first = true; 00460 for (int i = 0 ; i < p_size ; ++i) { 00461 if (recv_count[i]) { 00462 if (!first) 00463 os << separator; 00464 first = false; 00465 char * const ptr = & buffer[ recv_displ[i] ]; 00466 os.write(ptr, recv_count[i]); 00467 } 00468 } 00469 os.flush(); 00470 } 00471 else 00472 os << message; 00473 #endif 00474 } 00475 00476 00477 std::ostream & 00478 operator<<( 00479 std::ostream & os, 00480 const MessageType & message_type) 00481 { 00482 // if (message_type & MSG_SYMMETRIC) 00483 // os << "parallel "; 00484 os << get_message_type_info(message_type).m_name; 00485 00486 return os; 00487 } 00488 00489 } // namespace stk_classic