PlayaMPIComm.cpp
Go to the documentation of this file.
00001 /* @HEADER@ */
00002 // ************************************************************************
00003 // 
00004 //                 Playa: Programmable Linear Algebra
00005 //                 Copyright 2012 Sandia Corporation
00006 // 
00007 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
00008 // the U.S. Government retains certain rights in this software.
00009 //
00010 // Redistribution and use in source and binary forms, with or without
00011 // modification, are permitted provided that the following conditions are
00012 // met:
00013 //
00014 // 1. Redistributions of source code must retain the above copyright
00015 // notice, this list of conditions and the following disclaimer.
00016 //
00017 // 2. Redistributions in binary form must reproduce the above copyright
00018 // notice, this list of conditions and the following disclaimer in the
00019 // documentation and/or other materials provided with the distribution.
00020 //
00021 // 3. Neither the name of the Corporation nor the names of the
00022 // contributors may be used to endorse or promote products derived from
00023 // this software without specific prior written permission.
00024 //
00025 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
00026 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00027 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
00028 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
00029 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
00030 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
00031 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
00032 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
00033 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
00034 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00035 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00036 //
00037 // Questions? Contact Kevin Long (kevin.long@ttu.edu)
00038 // 
00039 
00040 /* @HEADER@ */
00041 
00042 #include "PlayaMPIComm.hpp"
00043 #include "PlayaMPIDataType.hpp"
00044 #include "PlayaMPIOp.hpp"
00045 #include "PlayaErrorPolling.hpp"
00046 
00047 
00048 
00049 
00050 namespace Playa
00051 {
00052 using namespace Teuchos;
00053 
00054 
00055 MPIComm::MPIComm()
00056   :
00057 #ifdef HAVE_MPI
00058   comm_(MPI_COMM_WORLD),
00059 #endif
00060   nProc_(0), myRank_(0)
00061 {
00062   init();
00063 }
00064 
00065 #ifdef HAVE_MPI
00066 MPIComm::MPIComm(MPI_Comm comm)
00067   : comm_(comm), nProc_(0), myRank_(0)
00068 {
00069   init();
00070 }
00071 #endif
00072 
00073 int MPIComm::mpiIsRunning() const
00074 {
00075   int mpiStarted = 0;
00076 #ifdef HAVE_MPI
00077   MPI_Initialized(&mpiStarted);
00078 #endif
00079   return mpiStarted;
00080 }
00081 
00082 void MPIComm::init()
00083 {
00084 #ifdef HAVE_MPI
00085 
00086   if (mpiIsRunning())
00087   {
00088     errCheck(MPI_Comm_rank(comm_, &myRank_), "Comm_rank");
00089     errCheck(MPI_Comm_size(comm_, &nProc_), "Comm_size");
00090   }
00091   else
00092   {
00093     nProc_ = 1;
00094     myRank_ = 0;
00095   }
00096   
00097 #else
00098   nProc_ = 1;
00099   myRank_ = 0;
00100 #endif
00101 }
00102 
00103 #ifdef USE_MPI_GROUPS /* we're ignoring groups for now */
00104 
00105 MPIComm::MPIComm(const MPIComm& parent, const MPIGroup& group)
00106   :
00107 #ifdef HAVE_MPI
00108   comm_(MPI_COMM_WORLD), 
00109 #endif
00110   nProc_(0), myRank_(0)
00111 {
00112 #ifdef HAVE_MPI
00113   if (group.getNProc()==0)
00114   {
00115     rank_ = -1;
00116     nProc_ = 0;
00117   }
00118   else if (parent.containsMe())
00119   {
00120     MPI_Comm parentComm = parent.comm_;
00121     MPI_Group newGroup = group.group_;
00122       
00123     errCheck(MPI_Comm_create(parentComm, newGroup, &comm_), 
00124       "Comm_create");
00125       
00126     if (group.containsProc(parent.getRank()))
00127     {
00128       errCheck(MPI_Comm_rank(comm_, &rank_), "Comm_rank");
00129           
00130       errCheck(MPI_Comm_size(comm_, &nProc_), "Comm_size");
00131     }
00132     else
00133     {
00134       rank_ = -1;
00135       nProc_ = -1;
00136       return;
00137     }
00138   }
00139   else
00140   {
00141     rank_ = -1;
00142     nProc_ = -1;
00143   }
00144 #endif
00145 }
00146 
00147 #endif /* USE_MPI_GROUPS */
00148 
00149 MPIComm& MPIComm::world()
00150 {
00151   static MPIComm w = MPIComm();
00152   return w;
00153 }
00154 
00155 
00156 MPIComm& MPIComm::self()
00157 {
00158 #ifdef HAVE_MPI
00159   static MPIComm w = MPIComm(MPI_COMM_SELF);
00160 #else
00161   static MPIComm w = MPIComm();
00162 #endif
00163   return w;
00164 }
00165 
00166 
00167 void MPIComm::synchronize() const 
00168 {
00169 #ifdef HAVE_MPI
00170   //mutex_.lock();
00171   {
00172     if (mpiIsRunning())
00173     {
00174       /* test whether errors have been detected on another proc before
00175        * doing the collective operation. */
00176       TEUCHOS_POLL_FOR_FAILURES(*this);
00177       /* if we're to this point, all processors are OK */
00178         
00179       errCheck(::MPI_Barrier(comm_), "Barrier");
00180     }
00181   }
00182   //mutex_.unlock();
00183 #endif
00184 }
00185 
00186 void MPIComm::allToAll(void* sendBuf, int sendCount, 
00187   const MPIDataType& sendType,
00188   void* recvBuf, int recvCount, const MPIDataType& recvType) const
00189 {
00190 #ifdef HAVE_MPI
00191   //mutex_.lock();
00192   {
00193     MPI_Datatype mpiSendType = sendType.handle();
00194     MPI_Datatype mpiRecvType = recvType.handle();
00195 
00196 
00197     if (mpiIsRunning())
00198     {
00199       /* test whether errors have been detected on another proc before
00200        * doing the collective operation. */
00201       TEUCHOS_POLL_FOR_FAILURES(*this);
00202       /* if we're to this point, all processors are OK */
00203         
00204       errCheck(::MPI_Alltoall(sendBuf, sendCount, mpiSendType,
00205           recvBuf, recvCount, mpiRecvType,
00206           comm_), "Alltoall");
00207     }
00208   }
00209   //mutex_.unlock();
00210 #else
00211   (void)sendBuf;
00212   (void)sendCount;
00213   (void)sendType;
00214   (void)recvBuf;
00215   (void)recvCount;
00216   (void)recvType;
00217 #endif
00218 }
00219 
00220 void MPIComm::allToAllv(void* sendBuf, int* sendCount, 
00221   int* sendDisplacements, const MPIDataType& sendType,
00222   void* recvBuf, int* recvCount, 
00223   int* recvDisplacements, const MPIDataType& recvType) const
00224 {
00225 #ifdef HAVE_MPI
00226   //mutex_.lock();
00227   {
00228     MPI_Datatype mpiSendType = sendType.handle();
00229     MPI_Datatype mpiRecvType = recvType.handle();
00230 
00231     if (mpiIsRunning())
00232     {
00233       /* test whether errors have been detected on another proc before
00234        * doing the collective operation. */
00235       TEUCHOS_POLL_FOR_FAILURES(*this);
00236       /* if we're to this point, all processors are OK */   
00237         
00238       errCheck(::MPI_Alltoallv(sendBuf, sendCount, sendDisplacements, mpiSendType,
00239           recvBuf, recvCount, recvDisplacements, mpiRecvType,
00240           comm_), "Alltoallv");
00241     }
00242   }
00243   //mutex_.unlock();
00244 #else
00245   (void)sendBuf;
00246   (void)sendCount;
00247   (void)sendDisplacements;
00248   (void)sendType;
00249   (void)recvBuf;
00250   (void)recvCount;
00251   (void)recvDisplacements;
00252   (void)recvType;
00253 #endif
00254 }
00255 
00256 void MPIComm::gather(void* sendBuf, int sendCount, const MPIDataType& sendType,
00257   void* recvBuf, int recvCount, const MPIDataType& recvType,
00258   int root) const
00259 {
00260 #ifdef HAVE_MPI
00261   //mutex_.lock();
00262   {
00263     MPI_Datatype mpiSendType = sendType.handle();
00264     MPI_Datatype mpiRecvType = recvType.handle();
00265 
00266 
00267     if (mpiIsRunning())
00268     {
00269       /* test whether errors have been detected on another proc before
00270        * doing the collective operation. */
00271       TEUCHOS_POLL_FOR_FAILURES(*this);
00272       /* if we're to this point, all processors are OK */
00273         
00274       errCheck(::MPI_Gather(sendBuf, sendCount, mpiSendType,
00275           recvBuf, recvCount, mpiRecvType,
00276           root, comm_), "Gather");
00277     }
00278   }
00279   //mutex_.unlock();
00280 #endif
00281 }
00282 
00283 void MPIComm::gatherv(void* sendBuf, int sendCount, const MPIDataType& sendType,
00284   void* recvBuf, int* recvCount, int* displacements, const MPIDataType& recvType,
00285   int root) const
00286 {
00287 #ifdef HAVE_MPI
00288   //mutex_.lock();
00289   {
00290     MPI_Datatype mpiSendType = sendType.handle();
00291     MPI_Datatype mpiRecvType = recvType.handle();
00292     
00293     if (mpiIsRunning())
00294     {
00295       /* test whether errors have been detected on another proc before
00296        * doing the collective operation. */
00297       TEUCHOS_POLL_FOR_FAILURES(*this);
00298       /* if we're to this point, all processors are OK */
00299         
00300       errCheck(::MPI_Gatherv(sendBuf, sendCount, mpiSendType,
00301           recvBuf, recvCount, displacements, mpiRecvType,
00302           root, comm_), "Gatherv");
00303     }
00304   }
00305   //mutex_.unlock();
00306 #endif
00307 }
00308 
00309 void MPIComm::allGather(void* sendBuf, int sendCount, const MPIDataType& sendType,
00310   void* recvBuf, int recvCount, 
00311   const MPIDataType& recvType) const
00312 {
00313 #ifdef HAVE_MPI
00314   //mutex_.lock();
00315   {
00316     MPI_Datatype mpiSendType = sendType.handle();
00317     MPI_Datatype mpiRecvType = recvType.handle();
00318     
00319     if (mpiIsRunning())
00320     {
00321       /* test whether errors have been detected on another proc before
00322        * doing the collective operation. */
00323       TEUCHOS_POLL_FOR_FAILURES(*this);
00324       /* if we're to this point, all processors are OK */
00325         
00326       errCheck(::MPI_Allgather(sendBuf, sendCount, mpiSendType,
00327           recvBuf, recvCount, 
00328           mpiRecvType, comm_), 
00329         "AllGather");
00330     }
00331   }
00332   //mutex_.unlock();
00333 #endif
00334 }
00335 
00336 
00337 void MPIComm::allGatherv(void* sendBuf, int sendCount, const MPIDataType& sendType,
00338   void* recvBuf, int* recvCount, 
00339   int* recvDisplacements,
00340   const MPIDataType& recvType) const
00341 {
00342 #ifdef HAVE_MPI
00343   //mutex_.lock();
00344   {
00345     MPI_Datatype mpiSendType = sendType.handle();
00346     MPI_Datatype mpiRecvType = recvType.handle();
00347     
00348     if (mpiIsRunning())
00349     {
00350       /* test whether errors have been detected on another proc before
00351        * doing the collective operation. */
00352       TEUCHOS_POLL_FOR_FAILURES(*this);
00353       /* if we're to this point, all processors are OK */
00354         
00355       errCheck(::MPI_Allgatherv(sendBuf, sendCount, mpiSendType,
00356           recvBuf, recvCount, recvDisplacements,
00357           mpiRecvType, 
00358           comm_), 
00359         "AllGatherv");
00360     }
00361   }
00362   //mutex_.unlock();
00363 #endif
00364 }
00365 
00366 
00367 void MPIComm::bcast(void* msg, int length, 
00368   const MPIDataType& type, int src) const
00369 {
00370 #ifdef HAVE_MPI
00371   //mutex_.lock();
00372   {
00373     if (mpiIsRunning())
00374     {
00375       /* test whether errors have been detected on another proc before
00376        * doing the collective operation. */
00377       TEUCHOS_POLL_FOR_FAILURES(*this);
00378       /* if we're to this point, all processors are OK */
00379         
00380       MPI_Datatype mpiType = type.handle();
00381       errCheck(::MPI_Bcast(msg, length, mpiType, src, 
00382           comm_), "Bcast");
00383     }
00384   }
00385   //mutex_.unlock();
00386 #endif
00387 }
00388 
00389 void MPIComm::allReduce(void* input, void* result, int inputCount, 
00390     const MPIDataType& type,
00391     const MPIOp& op) const 
00392 {
00393 #ifdef HAVE_MPI
00394   //mutex_.lock();
00395   {
00396     MPI_Op mpiOp = op.handle();
00397     MPI_Datatype mpiType = type.handle();
00398     
00399     if (mpiIsRunning())
00400     {
00401       errCheck(::MPI_Allreduce(input, result, inputCount, mpiType,
00402           mpiOp, comm_), 
00403         "Allreduce");
00404     }
00405   }
00406   //mutex_.unlock();
00407 #endif
00408 }
00409 
00410 
00411 void MPIComm::errCheck(int errCode, const std::string& methodName)
00412 {
00413   TEUCHOS_TEST_FOR_EXCEPTION(errCode != 0, std::runtime_error,
00414     "MPI function MPI_" << methodName 
00415     << " returned error code=" << errCode);
00416 }
00417 
00418 
00419 
00420 }

Site Contact