blocxx
|
00001 /******************************************************************************* 00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved. 00003 * Copyright (C) 2006, Novell, Inc. All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright notice, 00009 * this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of 00014 * Vintela, Inc., 00015 * nor Novell, Inc., 00016 * nor the names of its contributors or employees may be used to 00017 * endorse or promote products derived from this software without 00018 * specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00030 * POSSIBILITY OF SUCH DAMAGE. 00031 *******************************************************************************/ 00032 00033 00039 #include "blocxx/BLOCXX_config.h" 00040 #include "blocxx/Thread.hpp" 00041 #include "blocxx/Assertion.hpp" 00042 #include "blocxx/Format.hpp" 00043 #include "blocxx/ThreadBarrier.hpp" 00044 #include "blocxx/NonRecursiveMutexLock.hpp" 00045 #include "blocxx/ExceptionIds.hpp" 00046 #include "blocxx/Timeout.hpp" 00047 #include "blocxx/TimeoutTimer.hpp" 00048 #include "blocxx/DateTime.hpp" 00049 #include "blocxx/Logger.hpp" 00050 00051 #include <cstring> 00052 #include <cstdio> 00053 #include <cerrno> 00054 #include <iostream> // for cerr 00055 #include <csignal> 00056 #include <cassert> 00057 00058 #ifdef BLOCXX_HAVE_OPENSSL 00059 #include <openssl/err.h> 00060 #endif 00061 00062 00063 namespace BLOCXX_NAMESPACE 00064 { 00065 00067 BLOCXX_DEFINE_EXCEPTION_WITH_ID(Thread); 00068 BLOCXX_DEFINE_EXCEPTION_WITH_ID(CancellationDenied); 00069 00070 namespace 00071 { 00072 const GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx"); 00073 00075 // this is what's really passed to threadRunner 00076 struct ThreadParam 00077 { 00078 ThreadParam(Thread* t, const ThreadDoneCallbackRef& c, const ThreadBarrier& b) 00079 : thread(t) 00080 , cb(c) 00081 , thread_barrier(b) 00082 {} 00083 Thread* thread; 00084 ThreadDoneCallbackRef cb; 00085 ThreadBarrier thread_barrier; 00086 }; 00087 00089 static Thread_t 00090 zeroThread() 00091 { 00092 Thread_t zthr; 00093 ::memset(&zthr, 0, sizeof(zthr)); 00094 return zthr; 00095 } 00096 00097 static Thread_t NULLTHREAD = zeroThread(); 00099 static inline bool 00100 sameId(const Thread_t& t1, const Thread_t& t2) 00101 { 00102 return ThreadImpl::sameThreads(t1, t2); 00103 } 00104 } // end unnamed namespace 00105 00107 // Constructor 00108 Thread::Thread() 00109 : m_id(NULLTHREAD) 00110 , m_isRunning(false) 00111 , m_joined(false) 00112 , m_cancelRequested(0) 00113 , m_cancelled(false) 00114 { 00115 } 00117 // Destructor 00118 Thread::~Thread() 00119 { 00120 try 00121 { 00122 if (!sameId(m_id, NULLTHREAD)) 00123 { 00124 if (!m_joined) 00125 { 00126 join(); 00127 } 00128 assert(m_isRunning == false); 00129 ThreadImpl::destroyThread(m_id); 00130 } 00131 } 00132 catch (...) 00133 { 00134 // don't let exceptions escape 00135 } 00136 } 00138 // Start the thread 00139 void 00140 Thread::start(const ThreadDoneCallbackRef& cb) 00141 { 00142 if (isRunning()) 00143 { 00144 BLOCXX_THROW(ThreadException, 00145 "Thread::start - thread is already running"); 00146 } 00147 if (!sameId(m_id, NULLTHREAD)) 00148 { 00149 BLOCXX_THROW(ThreadException, 00150 "Thread::start - cannot start previously run thread"); 00151 } 00152 UInt32 flgs = BLOCXX_THREAD_FLG_JOINABLE; 00153 ThreadBarrier thread_barrier(2); 00154 // p will be delted by threadRunner 00155 ThreadParam* p = new ThreadParam(this, cb, thread_barrier); 00156 int result = ThreadImpl::createThread(m_id, threadRunner, p, flgs); 00157 if (result != 0) 00158 { 00159 BLOCXX_THROW_ERRNO_MSG1(ThreadException, "ThreadImpl::createThread() failed", result); 00160 } 00161 thread_barrier.wait(); 00162 } 00164 // Wait for this object's thread execution (if any) to complete. 00165 Int32 00166 Thread::join() 00167 { 00168 BLOCXX_ASSERT(!sameId(m_id, NULLTHREAD)); 00169 Int32 rval; 00170 if (ThreadImpl::joinThread(m_id, rval) != 0) 00171 { 00172 BLOCXX_THROW(ThreadException, 00173 Format("Thread::join - ThreadImpl::joinThread: %1(%2)", 00174 errno, strerror(errno)).c_str()); 00175 } 00176 m_joined = true; 00177 return rval; 00178 } 00180 // STATIC 00181 // Method used for starting threads 00182 Int32 00183 Thread::threadRunner(void* paramPtr) 00184 { 00185 Thread_t theThreadID; 00186 Int32 rval = -1; 00187 try 00188 { 00189 // scope is important so destructors will run before the thread is clobbered by exitThread 00190 BLOCXX_ASSERT(paramPtr != NULL); 00191 ThreadParam* pParam = static_cast<ThreadParam*>(paramPtr); 00192 Thread* pTheThread = pParam->thread; 00193 ThreadImpl::saveThreadInTLS(pTheThread); 00194 theThreadID = pTheThread->m_id; 00195 ThreadDoneCallbackRef cb = pParam->cb; 00196 ThreadBarrier thread_barrier = pParam->thread_barrier; 00197 delete pParam; 00198 pTheThread->m_isRunning = true; 00199 thread_barrier.wait(); 00200 00201 try 00202 { 00203 rval = pTheThread->run(); 00204 } 00205 // make sure we get all exceptions so the thread is cleaned up properly 00206 catch (ThreadCancelledException&) 00207 { 00208 } 00209 catch (Exception& ex) 00210 { 00211 #ifdef BLOCXX_DEBUG 00212 std::clog << "!!! Exception: " << ex.type() << " caught in Thread class\n"; 00213 std::clog << ex << std::endl; 00214 #endif 00215 Logger logger(COMPONENT_NAME); 00216 BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in Thread class: %1", ex)); 00217 pTheThread->doneRunning(cb); 00218 // we need to re-throw here, otherwise we'll segfault 00219 // if pthread_cancel() does forced stack unwinding. 00220 throw; 00221 } 00222 catch (std::exception& ex) 00223 { 00224 #ifdef BLOCXX_DEBUG 00225 std::clog << "!!! std::exception: " << ex.what() << " caught in Thread class" << std::endl; 00226 #endif 00227 Logger logger(COMPONENT_NAME); 00228 BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in Thread class: %1", ex.what())); 00229 pTheThread->doneRunning(cb); 00230 // we need to re-throw here, otherwise we'll segfault 00231 // if pthread_cancel() does forced stack unwinding. 00232 throw; 00233 } 00234 catch (...) 00235 { 00236 #ifdef BLOCXX_DEBUG 00237 std::clog << "!!! Unknown Exception caught in Thread class" << std::endl; 00238 #endif 00239 Logger logger(COMPONENT_NAME); 00240 BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in Thread class."); 00241 00242 pTheThread->doneRunning(cb); 00243 // we need to re-throw here, otherwise we'll segfault 00244 // if pthread_cancel() does forced stack unwinding. 00245 throw; 00246 } 00247 00248 pTheThread->doneRunning(cb); 00249 00250 } 00251 catch (Exception& ex) 00252 { 00253 #ifdef BLOCXX_DEBUG 00254 std::clog << "!!! Exception: " << ex.type() << " caught in Thread class\n"; 00255 std::clog << ex << std::endl; 00256 #endif 00257 Logger logger(COMPONENT_NAME); 00258 BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in Thread class: %1", ex)); 00259 // end the thread. exitThread never returns. 00260 ThreadImpl::exitThread(theThreadID, rval); 00261 } 00262 catch (std::exception& ex) 00263 { 00264 #ifdef BLOCXX_DEBUG 00265 std::clog << "!!! std::exception: " << ex.what() << " caught in Thread class" << std::endl; 00266 #endif 00267 Logger logger(COMPONENT_NAME); 00268 BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in Thread class: %1", ex.what())); 00269 // end the thread. exitThread never returns. 00270 ThreadImpl::exitThread(theThreadID, rval); 00271 } 00272 catch (...) 00273 { 00274 #ifdef BLOCXX_DEBUG 00275 std::clog << "!!! Unknown Exception caught in Thread class" << std::endl; 00276 #endif 00277 Logger logger(COMPONENT_NAME); 00278 BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in Thread class."); 00279 // end the thread. exitThread never returns. 00280 ThreadImpl::exitThread(theThreadID, rval); 00281 } 00282 // end the thread. exitThread never returns. 00283 ThreadImpl::exitThread(theThreadID, rval); 00284 return rval; 00285 } 00286 00288 void 00289 Thread::doneRunning(const ThreadDoneCallbackRef& cb) 00290 { 00291 { 00292 NonRecursiveMutexLock lock(m_stateGuard); 00293 m_isRunning = false; 00294 m_stateCond.notifyAll(); 00295 } 00296 00297 if (cb) 00298 { 00299 cb->notifyThreadDone(this); 00300 } 00301 #ifdef BLOCXX_HAVE_OPENSSL 00302 // this is necessary to free memory associated with the OpenSSL error queue for this thread. 00303 ERR_remove_state(0); 00304 #endif 00305 } 00306 00308 void 00309 Thread::shutdown() 00310 { 00311 doShutdown(); 00312 } 00314 bool 00315 Thread::shutdown(const Timeout& timeout) 00316 { 00317 doShutdown(); 00318 return timedWait(timeout); 00319 } 00321 void 00322 Thread::cooperativeCancel() 00323 { 00324 if (!isRunning()) 00325 { 00326 return; 00327 } 00328 00329 // give the thread a chance to clean up a bit or abort the cancellation. 00330 doCooperativeCancel(); 00331 m_cancelRequested = Atomic_t(1); 00332 00333 #if !defined(BLOCXX_WIN32) 00334 // send a SIGUSR1 to the thread to break it out of any blocking syscall. 00335 // SIGUSR1 is ignored. It's set to SIG_IGN in ThreadImpl.cpp 00336 // If the thread has already exited, an ThreadException will be thrown 00337 // we just want to ignore that. 00338 try 00339 { 00340 ThreadImpl::sendSignalToThread(m_id, SIGUSR1); 00341 } 00342 catch (ThreadException&) 00343 { 00344 } 00345 #endif 00346 } 00348 bool 00349 Thread::definitiveCancel(UInt32 waitForCooperativeSecs) 00350 { 00351 return definitiveCancel(Timeout::relative(waitForCooperativeSecs)); 00352 } 00354 bool 00355 Thread::definitiveCancel(const Timeout& timeout) 00356 { 00357 if (!isRunning()) 00358 { 00359 return true; 00360 } 00361 00362 // give the thread a chance to clean up a bit or abort the cancellation. 00363 doCooperativeCancel(); 00364 m_cancelRequested = Atomic_t(1); 00365 00366 #if !defined(BLOCXX_WIN32) 00367 // send a SIGUSR1 to the thread to break it out of any blocking syscall. 00368 // SIGUSR1 is ignored. It's set to SIG_IGN in ThreadImpl.cpp 00369 // If the thread has already exited, an ThreadException will be thrown 00370 // we just want to ignore that. 00371 try 00372 { 00373 ThreadImpl::sendSignalToThread(m_id, SIGUSR1); 00374 } 00375 catch (ThreadException&) 00376 { 00377 } 00378 #endif 00379 00380 Logger logger(COMPONENT_NAME); 00381 TimeoutTimer timer(timeout); 00382 NonRecursiveMutexLock l(m_stateGuard); 00383 while (!m_cancelled && isRunning()) 00384 { 00385 BLOCXX_LOG_DEBUG3(logger, "Thread::definitiveCancel waiting for thread to exit."); 00386 if (!m_stateCond.timedWait(l, timer.asAbsoluteTimeout())) 00387 { 00388 // give the thread a chance to clean up a bit or abort the cancellation. 00389 doDefinitiveCancel(); 00390 // thread didn't (or won't) exit by itself, we'll have to really kill it. 00391 if (!m_cancelled && isRunning()) 00392 { 00393 BLOCXX_LOG_ERROR(logger, "Thread::definitiveCancel cancelling thread because it did not exit!"); 00394 this->cancel_internal(true); 00395 } 00396 return false; 00397 } 00398 } 00399 return true; 00400 } 00401 00403 void 00404 Thread::cancel() 00405 { 00406 this->cancel_internal(false); 00407 } 00408 00410 void 00411 Thread::cancel_internal(bool is_locked) 00412 { 00413 // Ignore errors from ThreadImpl (usually caused by the fact that the thread 00414 // has already exited) 00415 try 00416 { 00417 ThreadImpl::cancel(m_id); 00418 } 00419 catch (ThreadException&) 00420 { 00421 } 00422 { 00423 NonRecursiveMutex mtx; 00424 NonRecursiveMutexLock l(is_locked ? mtx : m_stateGuard); 00425 m_cancelled = true; 00426 m_isRunning = false; 00427 m_stateCond.notifyAll(); 00428 } 00429 } 00431 void 00432 Thread::testCancel() 00433 { 00434 ThreadImpl::testCancel(); 00435 } 00436 00438 void 00439 Thread::doShutdown() 00440 { 00441 } 00443 void 00444 Thread::doCooperativeCancel() 00445 { 00446 } 00448 void 00449 Thread::doDefinitiveCancel() 00450 { 00451 } 00452 00454 bool 00455 Thread::timedWait(const Timeout& timeout) 00456 { 00457 TimeoutTimer tt(timeout); 00458 NonRecursiveMutexLock lock(m_stateGuard); 00459 while (m_isRunning == true) 00460 { 00461 if (!m_stateCond.timedWait(lock, tt.asAbsoluteTimeout())) 00462 { 00463 return false; // timeout 00464 } 00465 } 00466 return true; // exited 00467 } 00468 00469 } // end namespace BLOCXX_NAMESPACE 00470