blocxx

Thread.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2006, Novell, Inc. All rights reserved.
00004 * 
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 * 
00008 *     * Redistributions of source code must retain the above copyright notice,
00009 *       this list of conditions and the following disclaimer.
00010 *     * Redistributions in binary form must reproduce the above copyright
00011 *       notice, this list of conditions and the following disclaimer in the
00012 *       documentation and/or other materials provided with the distribution.
00013 *     * Neither the name of 
00014 *       Vintela, Inc., 
00015 *       nor Novell, Inc., 
00016 *       nor the names of its contributors or employees may be used to 
00017 *       endorse or promote products derived from this software without 
00018 *       specific prior written permission.
00019 * 
00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030 * POSSIBILITY OF SUCH DAMAGE.
00031 *******************************************************************************/
00032 
00033 
00039 #include "blocxx/BLOCXX_config.h"
00040 #include "blocxx/Thread.hpp"
00041 #include "blocxx/Assertion.hpp"
00042 #include "blocxx/Format.hpp"
00043 #include "blocxx/ThreadBarrier.hpp"
00044 #include "blocxx/NonRecursiveMutexLock.hpp"
00045 #include "blocxx/ExceptionIds.hpp"
00046 #include "blocxx/Timeout.hpp"
00047 #include "blocxx/TimeoutTimer.hpp"
00048 #include "blocxx/DateTime.hpp"
00049 #include "blocxx/Logger.hpp"
00050 
00051 #include <cstring>
00052 #include <cstdio>
00053 #include <cerrno>
00054 #include <iostream> // for cerr
00055 #include <csignal>
00056 #include <cassert>
00057 
00058 #ifdef BLOCXX_HAVE_OPENSSL
00059 #include <openssl/err.h>
00060 #endif
00061 
00062 
00063 namespace BLOCXX_NAMESPACE
00064 {
00065 
00067 BLOCXX_DEFINE_EXCEPTION_WITH_ID(Thread);
00068 BLOCXX_DEFINE_EXCEPTION_WITH_ID(CancellationDenied);
00069 
00070 namespace
00071 {
00072 const GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx");
00073 
00075 // this is what's really passed to threadRunner
00076 struct ThreadParam
00077 {
00078    ThreadParam(Thread* t, const ThreadDoneCallbackRef& c, const ThreadBarrier& b)
00079       : thread(t)
00080       , cb(c)
00081       , thread_barrier(b)
00082    {}
00083    Thread* thread;
00084    ThreadDoneCallbackRef cb;
00085    ThreadBarrier thread_barrier;
00086 };
00087 
00089 static Thread_t
00090 zeroThread()
00091 {
00092    Thread_t zthr;
00093    ::memset(&zthr, 0, sizeof(zthr));
00094    return zthr;
00095 }
00096 
00097 static Thread_t NULLTHREAD = zeroThread();
00099 static inline bool
00100 sameId(const Thread_t& t1, const Thread_t& t2)
00101 {
00102    return ThreadImpl::sameThreads(t1, t2);
00103 }
00104 } // end unnamed namespace
00105 
00107 // Constructor
00108 Thread::Thread() 
00109    : m_id(NULLTHREAD)
00110    , m_isRunning(false)
00111    , m_joined(false)
00112    , m_cancelRequested(0)
00113    , m_cancelled(false)
00114 {
00115 }
00117 // Destructor
00118 Thread::~Thread()
00119 {
00120    try
00121    {
00122       if (!sameId(m_id, NULLTHREAD))
00123       {
00124          if (!m_joined)
00125          {
00126             join();
00127          }
00128          assert(m_isRunning == false);
00129          ThreadImpl::destroyThread(m_id);
00130       }
00131    }
00132    catch (...)
00133    {
00134       // don't let exceptions escape
00135    }
00136 }
00138 // Start the thread
00139 void
00140 Thread::start(const ThreadDoneCallbackRef& cb)
00141 {
00142    if (isRunning())
00143    {
00144       BLOCXX_THROW(ThreadException,
00145          "Thread::start - thread is already running");
00146    }
00147    if (!sameId(m_id, NULLTHREAD))
00148    {
00149       BLOCXX_THROW(ThreadException,
00150          "Thread::start - cannot start previously run thread");
00151    }
00152    UInt32 flgs = BLOCXX_THREAD_FLG_JOINABLE;
00153    ThreadBarrier thread_barrier(2);
00154    // p will be delted by threadRunner
00155    ThreadParam* p = new ThreadParam(this, cb, thread_barrier);
00156    int result = ThreadImpl::createThread(m_id, threadRunner, p, flgs);
00157    if (result != 0)
00158    {
00159       BLOCXX_THROW_ERRNO_MSG1(ThreadException, "ThreadImpl::createThread() failed", result);
00160    }
00161    thread_barrier.wait();
00162 }
00164 // Wait for this object's thread execution (if any) to complete.
00165 Int32
00166 Thread::join()
00167 {
00168    BLOCXX_ASSERT(!sameId(m_id, NULLTHREAD));
00169    Int32 rval;
00170    if (ThreadImpl::joinThread(m_id, rval) != 0)
00171    {
00172       BLOCXX_THROW(ThreadException,
00173          Format("Thread::join - ThreadImpl::joinThread: %1(%2)", 
00174                errno, strerror(errno)).c_str());
00175    }
00176    m_joined = true;
00177    return rval;
00178 }
00180 // STATIC
00181 // Method used for starting threads
00182 Int32
00183 Thread::threadRunner(void* paramPtr)
00184 {
00185    Thread_t theThreadID;
00186    Int32 rval = -1;
00187    try
00188    {
00189       // scope is important so destructors will run before the thread is clobbered by exitThread
00190       BLOCXX_ASSERT(paramPtr != NULL);
00191       ThreadParam* pParam = static_cast<ThreadParam*>(paramPtr);
00192       Thread* pTheThread = pParam->thread;
00193       ThreadImpl::saveThreadInTLS(pTheThread);
00194       theThreadID = pTheThread->m_id;
00195       ThreadDoneCallbackRef cb = pParam->cb;
00196       ThreadBarrier thread_barrier = pParam->thread_barrier;
00197       delete pParam;
00198       pTheThread->m_isRunning = true;
00199       thread_barrier.wait();
00200 
00201       try
00202       {
00203          rval = pTheThread->run();
00204       }
00205       // make sure we get all exceptions so the thread is cleaned up properly
00206       catch (ThreadCancelledException&)
00207       {
00208       }
00209       catch (Exception& ex)
00210       {
00211 #ifdef BLOCXX_DEBUG     
00212          std::clog << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00213          std::clog << ex << std::endl;
00214 #endif
00215          Logger logger(COMPONENT_NAME);
00216          BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in Thread class: %1", ex));
00217          pTheThread->doneRunning(cb);
00218          // we need to re-throw here, otherwise we'll segfault
00219          // if pthread_cancel() does forced stack unwinding.
00220          throw;
00221       }
00222       catch (std::exception& ex)
00223       {
00224 #ifdef BLOCXX_DEBUG     
00225          std::clog << "!!! std::exception: " << ex.what() << " caught in Thread class" << std::endl;
00226 #endif
00227          Logger logger(COMPONENT_NAME);
00228          BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in Thread class: %1", ex.what()));
00229          pTheThread->doneRunning(cb);
00230          // we need to re-throw here, otherwise we'll segfault
00231          // if pthread_cancel() does forced stack unwinding.
00232          throw;
00233       }
00234       catch (...)
00235       {
00236 #ifdef BLOCXX_DEBUG     
00237          std::clog << "!!! Unknown Exception caught in Thread class" << std::endl;
00238 #endif
00239          Logger logger(COMPONENT_NAME);
00240          BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in Thread class.");
00241 
00242          pTheThread->doneRunning(cb);
00243          // we need to re-throw here, otherwise we'll segfault
00244          // if pthread_cancel() does forced stack unwinding.
00245          throw;
00246       }
00247 
00248       pTheThread->doneRunning(cb);
00249       
00250    }
00251    catch (Exception& ex)
00252    {
00253 #ifdef BLOCXX_DEBUG     
00254       std::clog << "!!! Exception: " << ex.type() << " caught in Thread class\n";
00255       std::clog << ex << std::endl;
00256 #endif
00257       Logger logger(COMPONENT_NAME);
00258       BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in Thread class: %1", ex));
00259       // end the thread.  exitThread never returns.
00260       ThreadImpl::exitThread(theThreadID, rval);
00261    }
00262    catch (std::exception& ex)
00263    {
00264 #ifdef BLOCXX_DEBUG     
00265       std::clog << "!!! std::exception: " << ex.what() << " caught in Thread class" << std::endl;
00266 #endif
00267       Logger logger(COMPONENT_NAME);
00268       BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in Thread class: %1", ex.what()));
00269       // end the thread.  exitThread never returns.
00270       ThreadImpl::exitThread(theThreadID, rval);
00271    }
00272    catch (...)
00273    {
00274 #ifdef BLOCXX_DEBUG     
00275       std::clog << "!!! Unknown Exception caught in Thread class" << std::endl;
00276 #endif
00277       Logger logger(COMPONENT_NAME);
00278       BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in Thread class.");
00279       // end the thread.  exitThread never returns.
00280       ThreadImpl::exitThread(theThreadID, rval);
00281    }
00282    // end the thread.  exitThread never returns.
00283    ThreadImpl::exitThread(theThreadID, rval);
00284    return rval;
00285 }
00286 
00288 void
00289 Thread::doneRunning(const ThreadDoneCallbackRef& cb)
00290 {
00291    {
00292       NonRecursiveMutexLock lock(m_stateGuard);
00293       m_isRunning = false;
00294       m_stateCond.notifyAll();
00295    }
00296 
00297    if (cb)
00298    {
00299       cb->notifyThreadDone(this);
00300    }
00301 #ifdef BLOCXX_HAVE_OPENSSL
00302    // this is necessary to free memory associated with the OpenSSL error queue for this thread.
00303    ERR_remove_state(0);
00304 #endif
00305 }
00306 
00308 void
00309 Thread::shutdown()
00310 {
00311    doShutdown();
00312 }
00314 bool
00315 Thread::shutdown(const Timeout& timeout)
00316 {
00317    doShutdown();
00318    return timedWait(timeout);
00319 }
00321 void
00322 Thread::cooperativeCancel()
00323 {
00324    if (!isRunning())
00325    {
00326       return;
00327    }
00328 
00329    // give the thread a chance to clean up a bit or abort the cancellation.
00330    doCooperativeCancel();
00331    m_cancelRequested = Atomic_t(1);
00332 
00333 #if !defined(BLOCXX_WIN32)
00334    // send a SIGUSR1 to the thread to break it out of any blocking syscall.
00335    // SIGUSR1 is ignored.  It's set to SIG_IGN in ThreadImpl.cpp
00336    // If the thread has already exited, an ThreadException will be thrown
00337    // we just want to ignore that.
00338    try
00339    {
00340       ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00341    }
00342    catch (ThreadException&) 
00343    {
00344    }
00345 #endif
00346 }
00348 bool
00349 Thread::definitiveCancel(UInt32 waitForCooperativeSecs)
00350 {
00351    return definitiveCancel(Timeout::relative(waitForCooperativeSecs));
00352 }
00354 bool
00355 Thread::definitiveCancel(const Timeout& timeout)
00356 {
00357    if (!isRunning())
00358    {
00359       return true;
00360    }
00361 
00362    // give the thread a chance to clean up a bit or abort the cancellation.
00363    doCooperativeCancel();
00364    m_cancelRequested = Atomic_t(1);
00365 
00366 #if !defined(BLOCXX_WIN32)
00367    // send a SIGUSR1 to the thread to break it out of any blocking syscall.
00368    // SIGUSR1 is ignored.  It's set to SIG_IGN in ThreadImpl.cpp
00369    // If the thread has already exited, an ThreadException will be thrown
00370    // we just want to ignore that.
00371    try
00372    {
00373       ThreadImpl::sendSignalToThread(m_id, SIGUSR1);
00374    }
00375    catch (ThreadException&) 
00376    {
00377    }
00378 #endif
00379 
00380    Logger logger(COMPONENT_NAME);
00381    TimeoutTimer timer(timeout);
00382    NonRecursiveMutexLock l(m_stateGuard);
00383    while (!m_cancelled && isRunning())
00384    {
00385       BLOCXX_LOG_DEBUG3(logger, "Thread::definitiveCancel waiting for thread to exit.");
00386       if (!m_stateCond.timedWait(l, timer.asAbsoluteTimeout()))
00387       {
00388          // give the thread a chance to clean up a bit or abort the cancellation.
00389          doDefinitiveCancel();
00390          // thread didn't (or won't) exit by itself, we'll have to really kill it.
00391          if (!m_cancelled && isRunning())
00392          {
00393             BLOCXX_LOG_ERROR(logger, "Thread::definitiveCancel cancelling thread because it did not exit!");
00394             this->cancel_internal(true);
00395          }
00396          return false;
00397       }
00398    }
00399    return true;
00400 }
00401 
00403 void
00404 Thread::cancel()
00405 {
00406    this->cancel_internal(false);
00407 }
00408 
00410 void
00411 Thread::cancel_internal(bool is_locked)
00412 {
00413    // Ignore errors from ThreadImpl (usually caused by the fact that the thread
00414    // has already exited)
00415    try
00416    {
00417       ThreadImpl::cancel(m_id);
00418    }
00419    catch (ThreadException&)
00420    {
00421    }
00422    {
00423       NonRecursiveMutex mtx;
00424       NonRecursiveMutexLock l(is_locked ? mtx : m_stateGuard);
00425       m_cancelled = true;
00426       m_isRunning = false;
00427       m_stateCond.notifyAll();
00428    }
00429 }
00431 void
00432 Thread::testCancel()
00433 {
00434    ThreadImpl::testCancel();
00435 }
00436 
00438 void
00439 Thread::doShutdown()
00440 {
00441 }
00443 void 
00444 Thread::doCooperativeCancel()
00445 {
00446 }
00448 void 
00449 Thread::doDefinitiveCancel()
00450 {
00451 }
00452 
00454 bool
00455 Thread::timedWait(const Timeout& timeout)
00456 {
00457    TimeoutTimer tt(timeout);
00458    NonRecursiveMutexLock lock(m_stateGuard);
00459    while (m_isRunning == true)
00460    {
00461       if (!m_stateCond.timedWait(lock, tt.asAbsoluteTimeout()))
00462       {
00463          return false; // timeout
00464       }
00465    }
00466    return true; // exited
00467 }
00468 
00469 } // end namespace BLOCXX_NAMESPACE
00470