blocxx
|
00001 /******************************************************************************* 00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved. 00003 * Copyright (C) 2006, Novell, Inc. All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright notice, 00009 * this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of 00014 * Vintela, Inc., 00015 * nor Novell, Inc., 00016 * nor the names of its contributors or employees may be used to 00017 * endorse or promote products derived from this software without 00018 * specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00030 * POSSIBILITY OF SUCH DAMAGE. 00031 *******************************************************************************/ 00032 00033 00038 #include "blocxx/BLOCXX_config.h" 00039 #include "blocxx/ThreadPool.hpp" 00040 #include "blocxx/Array.hpp" 00041 #include "blocxx/Thread.hpp" 00042 #include "blocxx/NonRecursiveMutex.hpp" 00043 #include "blocxx/NonRecursiveMutexLock.hpp" 00044 #include "blocxx/Condition.hpp" 00045 #include "blocxx/Format.hpp" 00046 #include "blocxx/Mutex.hpp" 00047 #include "blocxx/MutexLock.hpp" 00048 #include "blocxx/NullLogger.hpp" 00049 #include "blocxx/Timeout.hpp" 00050 #include "blocxx/TimeoutTimer.hpp" 00051 #include "blocxx/GlobalString.hpp" 00052 00053 #include <deque> 00054 00055 #ifdef BLOCXX_DEBUG 00056 #include <iostream> // for cerr 00057 #endif 00058 00059 namespace BLOCXX_NAMESPACE 00060 { 00061 00062 BLOCXX_DEFINE_EXCEPTION(ThreadPool); 00063 00064 // logger can be null 00065 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0) 00066 #define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0) 00067 #define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0) 00068 #define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0) 00069 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0) 00070 00072 class ThreadPoolImpl : public IntrusiveCountableBase 00073 { 00074 public: 00075 // returns true if work is placed in the queue to be run and false if not. 00076 virtual bool addWork(const RunnableRef& work, const Timeout& timeout) = 0; 00077 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) = 0; 00078 virtual void waitForEmptyQueue() = 0; 00079 virtual ~ThreadPoolImpl() 00080 { 00081 } 00082 }; 00083 namespace { 00084 00085 GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.ThreadPool"); 00086 00087 class FixedSizePoolImpl; 00089 class FixedSizePoolWorkerThread : public Thread 00090 { 00091 public: 00092 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool) 00093 : Thread() 00094 , m_thePool(thePool) 00095 { 00096 } 00097 virtual Int32 run(); 00098 private: 00099 virtual void doShutdown() 00100 { 00101 MutexLock lock(m_guard); 00102 if (m_currentRunnable) 00103 { 00104 m_currentRunnable->doShutdown(); 00105 } 00106 } 00107 virtual void doCooperativeCancel() 00108 { 00109 MutexLock lock(m_guard); 00110 if (m_currentRunnable) 00111 { 00112 m_currentRunnable->doCooperativeCancel(); 00113 } 00114 } 00115 virtual void doDefinitiveCancel() 00116 { 00117 MutexLock lock(m_guard); 00118 if (m_currentRunnable) 00119 { 00120 m_currentRunnable->doDefinitiveCancel(); 00121 } 00122 } 00123 00124 FixedSizePoolImpl* m_thePool; 00125 00126 Mutex m_guard; 00127 RunnableRef m_currentRunnable; 00128 00129 // non-copyable 00130 FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&); 00131 FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&); 00132 }; 00134 class CommonPoolImpl : public ThreadPoolImpl 00135 { 00136 protected: 00137 CommonPoolImpl(UInt32 maxQueueSize, const Logger& logger, const String& poolName) 00138 : m_maxQueueSize(maxQueueSize) 00139 , m_queueClosed(false) 00140 , m_shutdown(false) 00141 , m_logger(logger) 00142 , m_poolName(poolName) 00143 { 00144 } 00145 00146 virtual ~CommonPoolImpl() 00147 { 00148 } 00149 00150 // assumes that m_queueLock is locked. DynamicSizeNoQueuePoolImpl overrides this. 00151 virtual bool queueIsFull() const 00152 { 00153 return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize)); 00154 } 00155 00156 // assumes that m_queueLock is locked 00157 bool queueClosed() const 00158 { 00159 return m_shutdown || m_queueClosed; 00160 } 00161 00162 bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout) 00163 { 00164 NonRecursiveMutexLock l(m_queueLock); 00165 // the pool is in the process of being destroyed 00166 if (queueClosed()) 00167 { 00168 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue is already closed. Why are you trying to shutdown again?"); 00169 return false; 00170 } 00171 m_queueClosed = true; 00172 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue closed"); 00173 00174 if (finishWorkInQueue) 00175 { 00176 TimeoutTimer timer(timeout); 00177 while (m_queue.size() != 0) 00178 { 00179 if (timer.infinite()) 00180 { 00181 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting forever for queue to empty"); 00182 m_queueEmpty.wait(l); 00183 } 00184 else 00185 { 00186 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting w/timout for queue to empty"); 00187 if (!m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout())) 00188 { 00189 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Wait timed out. Work in queue will be discarded."); 00190 break; // timed out 00191 } 00192 } 00193 } 00194 } 00195 m_shutdown = true; 00196 return true; 00197 } 00198 00199 virtual void waitForEmptyQueue() 00200 { 00201 NonRecursiveMutexLock l(m_queueLock); 00202 while (m_queue.size() != 0) 00203 { 00204 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting for empty queue"); 00205 m_queueEmpty.wait(l); 00206 } 00207 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue empty: the wait is over"); 00208 } 00209 00210 void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) 00211 { 00212 TimeoutTimer shutdownTimer(shutdownTimeout); 00213 TimeoutTimer dTimer(definitiveCancelTimeout); 00214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout())) 00215 { 00216 return; 00217 } 00218 00219 // Wake up any workers so they recheck shutdown flag 00220 m_queueNotEmpty.notifyAll(); 00221 m_queueNotFull.notifyAll(); 00222 00223 if (!shutdownTimer.infinite()) 00224 { 00225 // Tell all the threads to shutdown 00226 for (UInt32 i = 0; i < m_threads.size(); ++i) 00227 { 00228 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling shutdown on thread %1", i)); 00229 m_threads[i]->shutdown(); 00230 } 00231 00232 // Wait until shutdownTimeout for the threads to finish 00233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout()); 00234 for (UInt32 i = 0; i < m_threads.size(); ++i) 00235 { 00236 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Waiting for thread %1 to exit.", i)); 00237 m_threads[i]->timedWait(absoluteShutdownTimeout); 00238 } 00239 00240 // Tell all the threads to cooperative cancel 00241 for (UInt32 i = 0; i < m_threads.size(); ++i) 00242 { 00243 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling cooperativeCancel on thread %1", i)); 00244 m_threads[i]->cooperativeCancel(); 00245 } 00246 00247 if (!dTimer.infinite()) 00248 { 00249 // If any still haven't shut down, definitiveCancel will kill them. 00250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout()); 00251 for (UInt32 i = 0; i < m_threads.size(); ++i) 00252 { 00253 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling definitiveCancel on thread %1", i)); 00254 try 00255 { 00256 if (!m_threads[i]->definitiveCancel(absoluteDefinitiveTimeout)) 00257 { 00258 BLOCXX_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i)); 00259 } 00260 } 00261 catch (CancellationDeniedException& e) 00262 { 00263 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e, i)); 00264 } 00265 } 00266 } 00267 00268 } 00269 00270 // Clean up after the threads and/or wait for them to exit. 00271 for (UInt32 i = 0; i < m_threads.size(); ++i) 00272 { 00273 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("calling join() on thread %1", i)); 00274 m_threads[i]->join(); 00275 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("join() finished for thread %1", i)); 00276 } 00277 } 00278 00279 RunnableRef getWorkFromQueue(bool waitForWork) 00280 { 00281 NonRecursiveMutexLock l(m_queueLock); 00282 while ((m_queue.size() == 0) && (!m_shutdown)) 00283 { 00284 if (waitForWork) 00285 { 00286 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waiting for work"); 00287 m_queueNotEmpty.wait(l); 00288 } 00289 else 00290 { 00291 // wait 1 sec for work, to more efficiently handle a stream 00292 // of single requests. 00293 if (!m_queueNotEmpty.timedWait(l,Timeout::relative(1))) 00294 { 00295 BLOCXX_POOL_LOG_DEBUG3(m_logger, "No work after 1 sec. I'm not waiting any longer"); 00296 return RunnableRef(); 00297 } 00298 } 00299 } 00300 // check to see if a shutdown started while the thread was sleeping 00301 if (m_shutdown) 00302 { 00303 BLOCXX_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work"); 00304 return RunnableRef(); 00305 } 00306 00307 RunnableRef work = m_queue.front(); 00308 m_queue.pop_front(); 00309 00310 // This needs to happen before the call to queueIsFull() because the worker count can affect the result of queueIsFull() 00311 incrementWorkerCount(); 00312 // handle threads waiting in addWork(). 00313 if (!queueIsFull()) 00314 { 00315 m_queueNotFull.notifyAll(); 00316 } 00317 00318 // handle waiting shutdown thread or callers of waitForEmptyQueue() 00319 if (m_queue.size() == 0) 00320 { 00321 m_queueEmpty.notifyAll(); 00322 } 00323 BLOCXX_POOL_LOG_DEBUG3(m_logger, "A thread got some work to do"); 00324 return work; 00325 } 00326 00327 // hooks for DynamicSizeNoQueuePoolImpl subclass. Yes this is a horrible design, it just saves code duplication. 00328 virtual void incrementWorkerCount() 00329 { 00330 } 00331 00332 virtual void decrementWorkerCount() 00333 { 00334 } 00335 00336 // pool characteristics 00337 UInt32 m_maxQueueSize; 00338 // pool state 00339 Array<ThreadRef> m_threads; 00340 std::deque<RunnableRef> m_queue; 00341 bool m_queueClosed; 00342 bool m_shutdown; 00343 // pool synchronization 00344 NonRecursiveMutex m_queueLock; 00345 Condition m_queueNotFull; 00346 Condition m_queueEmpty; 00347 Condition m_queueNotEmpty; 00348 Logger m_logger; 00349 String m_poolName; 00350 }; 00351 class FixedSizePoolImpl : public CommonPoolImpl 00352 { 00353 public: 00354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName) 00355 : CommonPoolImpl(maxQueueSize, logger, poolName) 00356 { 00357 // create the threads and start them up. 00358 m_threads.reserve(numThreads); 00359 for (UInt32 i = 0; i < numThreads; ++i) 00360 { 00361 m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this))); 00362 } 00363 for (UInt32 i = 0; i < numThreads; ++i) 00364 { 00365 try 00366 { 00367 m_threads[i]->start(); 00368 } 00369 catch (ThreadException& e) 00370 { 00371 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread #%1: %2", i, e)); 00372 m_threads.resize(i); // remove non-started threads 00373 // shutdown the rest 00374 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5)); 00375 throw; 00376 } 00377 } 00378 BLOCXX_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go"); 00379 } 00380 // returns true if work is placed in the queue to be run and false if not. 00381 virtual bool addWork(const RunnableRef& work, const Timeout& timeout) 00382 { 00383 // check precondition: work != NULL 00384 if (!work) 00385 { 00386 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you."); 00387 return false; 00388 } 00389 00390 NonRecursiveMutexLock l(m_queueLock); 00391 TimeoutTimer timer(timeout); 00392 while ( queueIsFull() && !queueClosed() ) 00393 { 00394 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work"); 00395 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout())) 00396 { 00397 // timed out 00398 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false"); 00399 return false; 00400 } 00401 } 00402 00403 // the pool is in the process of being destroyed 00404 if (queueClosed()) 00405 { 00406 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false"); 00407 return false; 00408 } 00409 00410 m_queue.push_back(work); 00411 00412 // if the queue was empty, there may be workers just sitting around, so we need to wake them up! 00413 if (m_queue.size() == 1) 00414 { 00415 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waking up sleepy workers"); 00416 m_queueNotEmpty.notifyAll(); 00417 } 00418 00419 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue"); 00420 return true; 00421 } 00422 00423 // we keep this around so it can be called in the destructor 00424 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) 00425 { 00426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout); 00427 } 00428 virtual ~FixedSizePoolImpl() 00429 { 00430 // can't let exception escape the destructor 00431 try 00432 { 00433 // don't need a lock here, because we're the only thread left. 00434 if (!queueClosed()) 00435 { 00436 // Make sure the pool is shutdown. 00437 // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor. 00438 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5)); 00439 } 00440 } 00441 catch (...) 00442 { 00443 } 00444 } 00445 private: 00446 friend class FixedSizePoolWorkerThread; 00447 }; 00448 void runRunnable(const RunnableRef& work) 00449 { 00450 // don't let exceptions escape, we need to keep going, except for ThreadCancelledException, in which case we need to stop. 00451 try 00452 { 00453 work->run(); 00454 } 00455 catch (ThreadCancelledException&) 00456 { 00457 throw; 00458 } 00459 catch (Exception& ex) 00460 { 00461 #ifdef BLOCXX_DEBUG 00462 std::clog << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl; 00463 #endif 00464 Logger logger(COMPONENT_NAME); 00465 BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in ThreadPool worker: %1", ex)); 00466 } 00467 catch(std::exception& ex) 00468 { 00469 #ifdef BLOCXX_DEBUG 00470 std::clog << "!!! std::exception what = \"" << ex.what() << "\" caught in ThreadPool worker" << std::endl; 00471 #endif 00472 Logger logger(COMPONENT_NAME); 00473 BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in ThreadPool worker: %1", ex.what())); 00474 } 00475 catch (...) 00476 { 00477 #ifdef BLOCXX_DEBUG 00478 std::clog << "!!! Unknown Exception caught in ThreadPool worker" << std::endl; 00479 #endif 00480 Logger logger(COMPONENT_NAME); 00481 BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in ThreadPool worker."); 00482 } 00483 } 00484 Int32 FixedSizePoolWorkerThread::run() 00485 { 00486 while (true) 00487 { 00488 // check queue for work 00489 RunnableRef work = m_thePool->getWorkFromQueue(true); 00490 if (!work) 00491 { 00492 return 0; 00493 } 00494 // save this off so it can be cancelled by another thread. 00495 { 00496 MutexLock lock(m_guard); 00497 m_currentRunnable = work; 00498 } 00499 runRunnable(work); 00500 { 00501 MutexLock lock(m_guard); 00502 m_currentRunnable = 0; 00503 } 00504 } 00505 return 0; 00506 } 00507 class DynamicSizePoolImpl; 00509 class DynamicSizePoolWorkerThread : public Thread 00510 { 00511 public: 00512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool) 00513 : Thread() 00514 , m_thePool(thePool) 00515 { 00516 } 00517 virtual Int32 run(); 00518 private: 00519 virtual void doShutdown() 00520 { 00521 MutexLock lock(m_guard); 00522 if (m_currentRunnable) 00523 { 00524 m_currentRunnable->doShutdown(); 00525 } 00526 } 00527 virtual void doCooperativeCancel() 00528 { 00529 MutexLock lock(m_guard); 00530 if (m_currentRunnable) 00531 { 00532 m_currentRunnable->doCooperativeCancel(); 00533 } 00534 } 00535 virtual void doDefinitiveCancel() 00536 { 00537 MutexLock lock(m_guard); 00538 if (m_currentRunnable) 00539 { 00540 m_currentRunnable->doDefinitiveCancel(); 00541 } 00542 } 00543 00544 DynamicSizePoolImpl* m_thePool; 00545 00546 Mutex m_guard; 00547 RunnableRef m_currentRunnable; 00548 00549 // non-copyable 00550 DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&); 00551 DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&); 00552 }; 00554 class DynamicSizePoolImpl : public CommonPoolImpl 00555 { 00556 public: 00557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName) 00558 : CommonPoolImpl(maxQueueSize, logger, poolName) 00559 , m_maxThreads(maxThreads) 00560 { 00561 } 00562 // returns true if work is placed in the queue to be run and false if not. 00563 virtual bool addWork(const RunnableRef& work, const Timeout& timeout) 00564 { 00565 // check precondition: work != NULL 00566 if (!work) 00567 { 00568 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you."); 00569 return false; 00570 } 00571 NonRecursiveMutexLock l(m_queueLock); 00572 00573 // the pool is in the process of being destroyed 00574 if (queueClosed()) 00575 { 00576 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false"); 00577 return false; 00578 } 00579 00580 // Can't touch m_threads until *after* we check for the queue being closed, shutdown 00581 // requires that m_threads not change after the queue is closed. 00582 // Now clean up dead threads (before we add the new one, so we don't need to check it) 00583 size_t i = 0; 00584 while (i < m_threads.size()) 00585 { 00586 if (!m_threads[i]->isRunning()) 00587 { 00588 BLOCXX_POOL_LOG_DEBUG3(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i)); 00589 m_threads[i]->join(); 00590 m_threads.remove(i); 00591 } 00592 else 00593 { 00594 ++i; 00595 } 00596 } 00597 00598 TimeoutTimer timer(timeout); 00599 while ( queueIsFull() && !queueClosed() ) 00600 { 00601 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work"); 00602 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout())) 00603 { 00604 // timed out 00605 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false"); 00606 return false; 00607 } 00608 } 00609 00610 // The previous loop could have ended because a spot opened in the queue or it was closed. Check for the close. 00611 if (queueClosed()) 00612 { 00613 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false"); 00614 return false; 00615 } 00616 00617 m_queue.push_back(work); 00618 00619 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue"); 00620 00621 // Release the lock and wake up a thread waiting for work in the queue 00622 // This bit of code is a race condition with the thread, 00623 // but if we acquire the lock again before it does, then we 00624 // properly handle that case. The only disadvantage if we win 00625 // the "race" is that we'll unnecessarily start a new thread. 00626 // In practice it works all the time. 00627 l.release(); 00628 m_queueNotEmpty.notifyOne(); 00629 Thread::yield(); // give the thread a chance to run 00630 l.lock(); 00631 00632 // Start up a new thread to handle the work in the queue. 00633 if (!m_queue.empty() && m_threads.size() < m_maxThreads) 00634 { 00635 ThreadRef theThread(new DynamicSizePoolWorkerThread(this)); 00636 m_threads.push_back(theThread); 00637 BLOCXX_POOL_LOG_DEBUG3(m_logger, "About to start a new thread"); 00638 try 00639 { 00640 theThread->start(); 00641 } 00642 catch (ThreadException& e) 00643 { 00644 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread: %1", e)); 00645 m_threads.pop_back(); 00646 throw; 00647 } 00648 BLOCXX_POOL_LOG_DEBUG2(m_logger, "New thread started"); 00649 } 00650 return true; 00651 } 00652 00653 // we keep this around so it can be called in the destructor 00654 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) 00655 { 00656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout); 00657 } 00658 virtual ~DynamicSizePoolImpl() 00659 { 00660 // can't let exception escape the destructor 00661 try 00662 { 00663 // don't need a lock here, because we're the only thread left. 00664 if (!queueClosed()) 00665 { 00666 // Make sure the pool is shutdown. 00667 // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor. 00668 this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5)); 00669 } 00670 } 00671 catch (...) 00672 { 00673 } 00674 } 00675 00676 protected: 00677 UInt32 getMaxThreads() const 00678 { 00679 return m_maxThreads; 00680 } 00681 00682 private: 00683 // pool characteristics 00684 UInt32 m_maxThreads; 00685 friend class DynamicSizePoolWorkerThread; 00686 }; 00687 Int32 DynamicSizePoolWorkerThread::run() 00688 { 00689 while (true) 00690 { 00691 // check queue for work 00692 RunnableRef work = m_thePool->getWorkFromQueue(false); 00693 if (!work) 00694 { 00695 return 0; 00696 } 00697 // save this off so it can be cancelled by another thread. 00698 { 00699 MutexLock lock(m_guard); 00700 m_currentRunnable = work; 00701 } 00702 runRunnable(work); 00703 m_thePool->decrementWorkerCount(); 00704 { 00705 MutexLock lock(m_guard); 00706 m_currentRunnable = 0; 00707 } 00708 } 00709 return 0; 00710 } 00711 00713 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl 00714 { 00715 public: 00716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const Logger& logger, const String& poolName) 00717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName) // allow queue in superclass, but prevent it from having any backlog 00718 , m_workingThreads(0) 00719 { 00720 } 00721 00722 virtual ~DynamicSizeNoQueuePoolImpl() 00723 { 00724 } 00725 00726 virtual void incrementWorkerCount() 00727 { 00728 ++m_workingThreads; 00729 } 00730 00731 virtual void decrementWorkerCount() 00732 { 00733 NonRecursiveMutexLock lock(m_queueLock); 00734 --m_workingThreads; 00735 // wake up any threads waiting to start some work 00736 m_queueNotFull.notifyAll(); 00737 } 00738 00739 // One difference between this class and DynamicSizePoolImpl is that we change the definition of queueIsFull() 00740 virtual bool queueIsFull() const 00741 { 00742 // don't let the queue get bigger than the number of free threads. This effectively prevents work from being 00743 // queued up which can't be immediately serviced. 00744 size_t freeThreads = getMaxThreads() - AtomicGet(m_workingThreads); 00745 return (freeThreads <= m_queue.size()); 00746 } 00747 00748 private: 00749 // Keep track of the number of threads doing work. Protected by m_guard 00750 size_t m_workingThreads; 00751 00752 }; 00753 00754 } // end anonymous namespace 00756 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const String& poolName) 00757 { 00758 NullLogger logger; 00759 switch (poolType) 00760 { 00761 case FIXED_SIZE: 00762 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName); 00763 break; 00764 case DYNAMIC_SIZE: 00765 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName); 00766 break; 00767 case DYNAMIC_SIZE_NO_QUEUE: 00768 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName); 00769 break; 00770 } 00771 } 00773 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName) 00774 { 00775 switch (poolType) 00776 { 00777 case FIXED_SIZE: 00778 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName); 00779 break; 00780 case DYNAMIC_SIZE: 00781 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName); 00782 break; 00783 case DYNAMIC_SIZE_NO_QUEUE: 00784 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName); 00785 break; 00786 } 00787 } 00789 bool ThreadPool::addWork(const RunnableRef& work) 00790 { 00791 return m_impl->addWork(work, Timeout::infinite); 00792 } 00794 bool ThreadPool::tryAddWork(const RunnableRef& work) 00795 { 00796 return m_impl->addWork(work, Timeout::relative(0)); 00797 } 00799 bool ThreadPool::tryAddWork(const RunnableRef& work, const Timeout& timeout) 00800 { 00801 return m_impl->addWork(work, timeout); 00802 } 00804 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs) 00805 { 00806 m_impl->shutdown(finishWorkInQueue, Timeout::relative(shutdownSecs), Timeout::relative(shutdownSecs)); 00807 } 00809 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout) 00810 { 00811 m_impl->shutdown(finishWorkInQueue, timeout, timeout); 00812 } 00814 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) 00815 { 00816 m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout); 00817 } 00819 void ThreadPool::waitForEmptyQueue() 00820 { 00821 m_impl->waitForEmptyQueue(); 00822 } 00824 ThreadPool::~ThreadPool() 00825 { 00826 } 00828 ThreadPool::ThreadPool(const ThreadPool& x) 00829 : IntrusiveCountableBase(x) 00830 , m_impl(x.m_impl) 00831 { 00832 } 00834 ThreadPool& ThreadPool::operator=(const ThreadPool& x) 00835 { 00836 m_impl = x.m_impl; 00837 return *this; 00838 } 00839 00840 } // end namespace BLOCXX_NAMESPACE 00841