blocxx

ThreadPool.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2006, Novell, Inc. All rights reserved.
00004 * 
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 * 
00008 *     * Redistributions of source code must retain the above copyright notice,
00009 *       this list of conditions and the following disclaimer.
00010 *     * Redistributions in binary form must reproduce the above copyright
00011 *       notice, this list of conditions and the following disclaimer in the
00012 *       documentation and/or other materials provided with the distribution.
00013 *     * Neither the name of 
00014 *       Vintela, Inc., 
00015 *       nor Novell, Inc., 
00016 *       nor the names of its contributors or employees may be used to 
00017 *       endorse or promote products derived from this software without 
00018 *       specific prior written permission.
00019 * 
00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030 * POSSIBILITY OF SUCH DAMAGE.
00031 *******************************************************************************/
00032 
00033 
00038 #include "blocxx/BLOCXX_config.h"
00039 #include "blocxx/ThreadPool.hpp"
00040 #include "blocxx/Array.hpp"
00041 #include "blocxx/Thread.hpp"
00042 #include "blocxx/NonRecursiveMutex.hpp"
00043 #include "blocxx/NonRecursiveMutexLock.hpp"
00044 #include "blocxx/Condition.hpp"
00045 #include "blocxx/Format.hpp"
00046 #include "blocxx/Mutex.hpp"
00047 #include "blocxx/MutexLock.hpp"
00048 #include "blocxx/NullLogger.hpp"
00049 #include "blocxx/Timeout.hpp"
00050 #include "blocxx/TimeoutTimer.hpp"
00051 #include "blocxx/GlobalString.hpp"
00052 
00053 #include <deque>
00054 
00055 #ifdef BLOCXX_DEBUG     
00056 #include <iostream> // for cerr
00057 #endif
00058 
00059 namespace BLOCXX_NAMESPACE
00060 {
00061 
00062 BLOCXX_DEFINE_EXCEPTION(ThreadPool);
00063 
00064 // logger can be null
00065 #define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
00066 #define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
00067 #define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
00068 #define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
00069 #define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
00070 
00072 class ThreadPoolImpl : public IntrusiveCountableBase
00073 {
00074 public:
00075    // returns true if work is placed in the queue to be run and false if not.
00076    virtual bool addWork(const RunnableRef& work, const Timeout& timeout) = 0;
00077    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) = 0;
00078    virtual void waitForEmptyQueue() = 0;
00079    virtual ~ThreadPoolImpl()
00080    {
00081    }
00082 };
00083 namespace {
00084 
00085 GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.ThreadPool");
00086 
00087 class FixedSizePoolImpl;
00089 class FixedSizePoolWorkerThread : public Thread
00090 {
00091 public:
00092    FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
00093       : Thread()
00094       , m_thePool(thePool)
00095    {
00096    }
00097    virtual Int32 run();
00098 private:
00099    virtual void doShutdown()
00100    {
00101       MutexLock lock(m_guard);
00102       if (m_currentRunnable)
00103       {
00104          m_currentRunnable->doShutdown();
00105       }
00106    }
00107    virtual void doCooperativeCancel()
00108    {
00109       MutexLock lock(m_guard);
00110       if (m_currentRunnable)
00111       {
00112          m_currentRunnable->doCooperativeCancel();
00113       }
00114    }
00115    virtual void doDefinitiveCancel()
00116    {
00117       MutexLock lock(m_guard);
00118       if (m_currentRunnable)
00119       {
00120          m_currentRunnable->doDefinitiveCancel();
00121       }
00122    }
00123 
00124    FixedSizePoolImpl* m_thePool;
00125 
00126    Mutex m_guard;
00127    RunnableRef m_currentRunnable;
00128 
00129    // non-copyable
00130    FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
00131    FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
00132 };
00134 class CommonPoolImpl : public ThreadPoolImpl
00135 {
00136 protected:
00137    CommonPoolImpl(UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00138       : m_maxQueueSize(maxQueueSize)
00139       , m_queueClosed(false)
00140       , m_shutdown(false)
00141       , m_logger(logger)
00142       , m_poolName(poolName)
00143    {
00144    }
00145 
00146    virtual ~CommonPoolImpl()
00147    {
00148    }
00149    
00150    // assumes that m_queueLock is locked. DynamicSizeNoQueuePoolImpl overrides this.
00151    virtual bool queueIsFull() const
00152    {
00153       return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
00154    }
00155    
00156    // assumes that m_queueLock is locked
00157    bool queueClosed() const
00158    {
00159       return m_shutdown || m_queueClosed;
00160    }
00161    
00162    bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
00163    {
00164       NonRecursiveMutexLock l(m_queueLock);
00165       // the pool is in the process of being destroyed
00166       if (queueClosed())
00167       {
00168          BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue is already closed.  Why are you trying to shutdown again?");
00169          return false;
00170       }
00171       m_queueClosed = true;
00172       BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue closed");
00173       
00174       if (finishWorkInQueue)
00175       {
00176          TimeoutTimer timer(timeout);
00177          while (m_queue.size() != 0)
00178          {
00179             if (timer.infinite())
00180             {
00181                BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting forever for queue to empty");
00182                m_queueEmpty.wait(l);
00183             }
00184             else
00185             {
00186                BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting w/timout for queue to empty");
00187                if (!m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
00188                {
00189                   BLOCXX_POOL_LOG_DEBUG2(m_logger, "Wait timed out. Work in queue will be discarded.");
00190                   break; // timed out
00191                }
00192             }
00193          }
00194       }
00195       m_shutdown = true;
00196       return true;
00197    }
00198 
00199    virtual void waitForEmptyQueue()
00200    {
00201       NonRecursiveMutexLock l(m_queueLock);
00202       while (m_queue.size() != 0)
00203       {
00204          BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting for empty queue");
00205          m_queueEmpty.wait(l);
00206       }
00207       BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue empty: the wait is over");
00208    }
00209    
00210    void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00211    {
00212       TimeoutTimer shutdownTimer(shutdownTimeout);
00213       TimeoutTimer dTimer(definitiveCancelTimeout);
00214       if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
00215       {
00216          return;
00217       }
00218 
00219       // Wake up any workers so they recheck shutdown flag
00220       m_queueNotEmpty.notifyAll();
00221       m_queueNotFull.notifyAll();
00222 
00223       if (!shutdownTimer.infinite())
00224       {
00225          // Tell all the threads to shutdown
00226          for (UInt32 i = 0; i < m_threads.size(); ++i)
00227          {
00228             BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling shutdown on thread %1", i));
00229             m_threads[i]->shutdown();
00230          }
00231 
00232          // Wait until shutdownTimeout for the threads to finish
00233          Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
00234          for (UInt32 i = 0; i < m_threads.size(); ++i)
00235          {
00236             BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Waiting for thread %1 to exit.", i));
00237             m_threads[i]->timedWait(absoluteShutdownTimeout);
00238          }
00239 
00240          // Tell all the threads to cooperative cancel
00241          for (UInt32 i = 0; i < m_threads.size(); ++i)
00242          {
00243             BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling cooperativeCancel on thread %1", i));
00244             m_threads[i]->cooperativeCancel();
00245          }
00246 
00247          if (!dTimer.infinite())
00248          {
00249             // If any still haven't shut down, definitiveCancel will kill them.
00250             Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
00251             for (UInt32 i = 0; i < m_threads.size(); ++i)
00252             {
00253                BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling definitiveCancel on thread %1", i));
00254                try
00255                {
00256                   if (!m_threads[i]->definitiveCancel(absoluteDefinitiveTimeout))
00257                   {
00258                      BLOCXX_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
00259                   }
00260                }
00261                catch (CancellationDeniedException& e)
00262                {
00263                   BLOCXX_POOL_LOG_ERROR(m_logger, Format("Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e, i));
00264                }
00265             }
00266          }
00267 
00268       }
00269 
00270       // Clean up after the threads and/or wait for them to exit.
00271       for (UInt32 i = 0; i < m_threads.size(); ++i)
00272       {
00273          BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("calling join() on thread %1", i));
00274          m_threads[i]->join();
00275          BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("join() finished for thread %1", i));
00276       }
00277    }
00278 
00279    RunnableRef getWorkFromQueue(bool waitForWork)
00280    {
00281       NonRecursiveMutexLock l(m_queueLock);
00282       while ((m_queue.size() == 0) && (!m_shutdown))
00283       {
00284          if (waitForWork)
00285          {
00286             BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waiting for work");
00287             m_queueNotEmpty.wait(l);
00288          }
00289          else
00290          {
00291             // wait 1 sec for work, to more efficiently handle a stream
00292             // of single requests.
00293             if (!m_queueNotEmpty.timedWait(l,Timeout::relative(1)))
00294             {
00295                BLOCXX_POOL_LOG_DEBUG3(m_logger, "No work after 1 sec. I'm not waiting any longer");
00296                return RunnableRef();
00297             }
00298          }
00299       }
00300       // check to see if a shutdown started while the thread was sleeping
00301       if (m_shutdown)
00302       {
00303          BLOCXX_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
00304          return RunnableRef();
00305       }
00306 
00307       RunnableRef work = m_queue.front();
00308       m_queue.pop_front();
00309 
00310       // This needs to happen before the call to queueIsFull() because the worker count can affect the result of queueIsFull()
00311       incrementWorkerCount();
00312       // handle threads waiting in addWork().
00313       if (!queueIsFull())
00314       {
00315          m_queueNotFull.notifyAll();
00316       }
00317 
00318       // handle waiting shutdown thread or callers of waitForEmptyQueue()
00319       if (m_queue.size() == 0)
00320       {
00321          m_queueEmpty.notifyAll();
00322       }
00323       BLOCXX_POOL_LOG_DEBUG3(m_logger, "A thread got some work to do");
00324       return work;
00325    }
00326 
00327    // hooks for DynamicSizeNoQueuePoolImpl subclass. Yes this is a horrible design, it just saves code duplication.
00328    virtual void incrementWorkerCount()
00329    {
00330    }
00331 
00332    virtual void decrementWorkerCount()
00333    {
00334    }
00335 
00336    // pool characteristics
00337    UInt32 m_maxQueueSize;
00338    // pool state
00339    Array<ThreadRef> m_threads;
00340    std::deque<RunnableRef> m_queue;
00341    bool m_queueClosed;
00342    bool m_shutdown;
00343    // pool synchronization
00344    NonRecursiveMutex m_queueLock;
00345    Condition m_queueNotFull;
00346    Condition m_queueEmpty;
00347    Condition m_queueNotEmpty;
00348    Logger m_logger;
00349    String m_poolName;
00350 };
00351 class FixedSizePoolImpl : public CommonPoolImpl
00352 {
00353 public:
00354    FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00355       : CommonPoolImpl(maxQueueSize, logger, poolName)
00356    {
00357       // create the threads and start them up.
00358       m_threads.reserve(numThreads);
00359       for (UInt32 i = 0; i < numThreads; ++i)
00360       {
00361          m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
00362       }
00363       for (UInt32 i = 0; i < numThreads; ++i)
00364       {
00365          try
00366          {
00367             m_threads[i]->start();
00368          }
00369          catch (ThreadException& e)
00370          {
00371             BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread #%1: %2", i, e));
00372             m_threads.resize(i); // remove non-started threads
00373             // shutdown the rest
00374             this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
00375             throw;
00376          }
00377       }
00378       BLOCXX_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
00379    }
00380    // returns true if work is placed in the queue to be run and false if not.
00381    virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
00382    {
00383       // check precondition: work != NULL
00384       if (!work)
00385       {
00386          BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00387          return false;
00388       }
00389       
00390       NonRecursiveMutexLock l(m_queueLock);
00391       TimeoutTimer timer(timeout);
00392       while ( queueIsFull() && !queueClosed() )
00393       {
00394          BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00395          if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
00396          {
00397             // timed out
00398             BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
00399             return false;
00400          }
00401       }
00402       
00403       // the pool is in the process of being destroyed
00404       if (queueClosed())
00405       {
00406          BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00407          return false;
00408       }
00409 
00410       m_queue.push_back(work);
00411       
00412       // if the queue was empty, there may be workers just sitting around, so we need to wake them up!
00413       if (m_queue.size() == 1)
00414       {
00415          BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waking up sleepy workers");
00416          m_queueNotEmpty.notifyAll();
00417       }
00418 
00419       BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00420       return true;
00421    }
00422 
00423    // we keep this around so it can be called in the destructor
00424    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00425    {
00426       shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
00427    }
00428    virtual ~FixedSizePoolImpl()
00429    {
00430       // can't let exception escape the destructor
00431       try
00432       {
00433          // don't need a lock here, because we're the only thread left.
00434          if (!queueClosed())
00435          {
00436             // Make sure the pool is shutdown.
00437             // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
00438             this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
00439          }
00440       }
00441       catch (...)
00442       {
00443       }
00444    }
00445 private:
00446    friend class FixedSizePoolWorkerThread;
00447 };
00448 void runRunnable(const RunnableRef& work)
00449 {
00450    // don't let exceptions escape, we need to keep going, except for ThreadCancelledException, in which case we need to stop.
00451    try
00452    {
00453       work->run();
00454    }
00455    catch (ThreadCancelledException&)
00456    {
00457       throw;
00458    }
00459    catch (Exception& ex)
00460    {
00461 #ifdef BLOCXX_DEBUG     
00462       std::clog << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
00463 #endif
00464       Logger logger(COMPONENT_NAME);
00465       BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in ThreadPool worker: %1", ex));
00466    }
00467    catch(std::exception& ex)
00468    {
00469 #ifdef BLOCXX_DEBUG
00470       std::clog << "!!! std::exception what = \"" << ex.what() << "\" caught in ThreadPool worker" << std::endl;
00471 #endif
00472       Logger logger(COMPONENT_NAME);
00473       BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in ThreadPool worker: %1", ex.what()));
00474    }
00475    catch (...)
00476    {
00477 #ifdef BLOCXX_DEBUG     
00478       std::clog << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
00479 #endif
00480       Logger logger(COMPONENT_NAME);
00481       BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in ThreadPool worker.");
00482    }
00483 }
00484 Int32 FixedSizePoolWorkerThread::run()
00485 {
00486    while (true)
00487    {
00488       // check queue for work
00489       RunnableRef work = m_thePool->getWorkFromQueue(true);
00490       if (!work)
00491       {
00492          return 0;
00493       }
00494       // save this off so it can be cancelled by another thread.
00495       {
00496          MutexLock lock(m_guard);
00497          m_currentRunnable = work;
00498       }
00499       runRunnable(work);
00500       {
00501          MutexLock lock(m_guard);
00502          m_currentRunnable = 0;
00503       }
00504    }
00505    return 0;
00506 }
00507 class DynamicSizePoolImpl;
00509 class DynamicSizePoolWorkerThread : public Thread
00510 {
00511 public:
00512    DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
00513       : Thread()
00514       , m_thePool(thePool)
00515    {
00516    }
00517    virtual Int32 run();
00518 private:
00519    virtual void doShutdown()
00520    {
00521       MutexLock lock(m_guard);
00522       if (m_currentRunnable)
00523       {
00524          m_currentRunnable->doShutdown();
00525       }
00526    }
00527    virtual void doCooperativeCancel()
00528    {
00529       MutexLock lock(m_guard);
00530       if (m_currentRunnable)
00531       {
00532          m_currentRunnable->doCooperativeCancel();
00533       }
00534    }
00535    virtual void doDefinitiveCancel()
00536    {
00537       MutexLock lock(m_guard);
00538       if (m_currentRunnable)
00539       {
00540          m_currentRunnable->doDefinitiveCancel();
00541       }
00542    }
00543 
00544    DynamicSizePoolImpl* m_thePool;
00545 
00546    Mutex m_guard;
00547    RunnableRef m_currentRunnable;
00548 
00549    // non-copyable
00550    DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
00551    DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
00552 };
00554 class DynamicSizePoolImpl : public CommonPoolImpl
00555 {
00556 public:
00557    DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00558       : CommonPoolImpl(maxQueueSize, logger, poolName)
00559       , m_maxThreads(maxThreads)
00560    {
00561    }
00562    // returns true if work is placed in the queue to be run and false if not.
00563    virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
00564    {
00565       // check precondition: work != NULL
00566       if (!work)
00567       {
00568          BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
00569          return false;
00570       }
00571       NonRecursiveMutexLock l(m_queueLock);
00572 
00573       // the pool is in the process of being destroyed
00574       if (queueClosed())
00575       {
00576          BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00577          return false;
00578       }
00579 
00580       // Can't touch m_threads until *after* we check for the queue being closed, shutdown 
00581       // requires that m_threads not change after the queue is closed.
00582       // Now clean up dead threads (before we add the new one, so we don't need to check it)
00583       size_t i = 0;
00584       while (i < m_threads.size())
00585       {
00586          if (!m_threads[i]->isRunning())
00587          {
00588             BLOCXX_POOL_LOG_DEBUG3(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
00589             m_threads[i]->join();
00590             m_threads.remove(i);
00591          }
00592          else
00593          {
00594             ++i;
00595          }
00596       }
00597 
00598       TimeoutTimer timer(timeout);
00599       while ( queueIsFull() && !queueClosed() )
00600       {
00601          BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
00602          if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
00603          {
00604             // timed out
00605             BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
00606             return false;
00607          }
00608       }
00609       
00610       // The previous loop could have ended because a spot opened in the queue or it was closed.  Check for the close.
00611       if (queueClosed())
00612       {
00613          BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
00614          return false;
00615       }
00616 
00617       m_queue.push_back(work);
00618       
00619       BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
00620 
00621       // Release the lock and wake up a thread waiting for work in the queue
00622       // This bit of code is a race condition with the thread,
00623       // but if we acquire the lock again before it does, then we
00624       // properly handle that case.  The only disadvantage if we win
00625       // the "race" is that we'll unnecessarily start a new thread.
00626       // In practice it works all the time.
00627       l.release();
00628       m_queueNotEmpty.notifyOne();
00629       Thread::yield(); // give the thread a chance to run
00630       l.lock();
00631 
00632       // Start up a new thread to handle the work in the queue.
00633       if (!m_queue.empty() && m_threads.size() < m_maxThreads)
00634       {
00635          ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
00636          m_threads.push_back(theThread);
00637          BLOCXX_POOL_LOG_DEBUG3(m_logger, "About to start a new thread");
00638          try
00639          {
00640             theThread->start();
00641          }
00642          catch (ThreadException& e)
00643          {
00644             BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread: %1", e));
00645             m_threads.pop_back();
00646             throw;
00647          }
00648          BLOCXX_POOL_LOG_DEBUG2(m_logger, "New thread started");
00649       }
00650       return true;
00651    }
00652 
00653    // we keep this around so it can be called in the destructor
00654    virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00655    {
00656       shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
00657    }
00658    virtual ~DynamicSizePoolImpl()
00659    {
00660       // can't let exception escape the destructor
00661       try
00662       {
00663          // don't need a lock here, because we're the only thread left.
00664          if (!queueClosed())
00665          {
00666             // Make sure the pool is shutdown.
00667             // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
00668             this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
00669          }
00670       }
00671       catch (...)
00672       {
00673       }
00674    }
00675 
00676 protected:
00677    UInt32 getMaxThreads() const
00678    {
00679       return m_maxThreads;
00680    }
00681 
00682 private:
00683    // pool characteristics
00684    UInt32 m_maxThreads;
00685    friend class DynamicSizePoolWorkerThread;
00686 };
00687 Int32 DynamicSizePoolWorkerThread::run()
00688 {
00689    while (true)
00690    {
00691       // check queue for work
00692       RunnableRef work = m_thePool->getWorkFromQueue(false);
00693       if (!work)
00694       {
00695          return 0;
00696       }
00697       // save this off so it can be cancelled by another thread.
00698       {
00699          MutexLock lock(m_guard);
00700          m_currentRunnable = work;
00701       }
00702       runRunnable(work);
00703       m_thePool->decrementWorkerCount();
00704       {
00705          MutexLock lock(m_guard);
00706          m_currentRunnable = 0;
00707       }
00708    }
00709    return 0;
00710 }
00711 
00713 class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
00714 {
00715 public:
00716    DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const Logger& logger, const String& poolName)
00717       : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName) // allow queue in superclass, but prevent it from having any backlog
00718       , m_workingThreads(0)
00719    {
00720    }
00721 
00722    virtual ~DynamicSizeNoQueuePoolImpl()
00723    {
00724    }
00725 
00726    virtual void incrementWorkerCount()
00727    {
00728       ++m_workingThreads;
00729    }
00730 
00731    virtual void decrementWorkerCount()
00732    {
00733       NonRecursiveMutexLock lock(m_queueLock);
00734       --m_workingThreads;
00735       // wake up any threads waiting to start some work
00736       m_queueNotFull.notifyAll();
00737    }
00738 
00739    // One difference between this class and DynamicSizePoolImpl is that we change the definition of queueIsFull()
00740    virtual bool queueIsFull() const
00741    {
00742       // don't let the queue get bigger than the number of free threads. This effectively prevents work from being 
00743       // queued up which can't be immediately serviced.
00744       size_t freeThreads = getMaxThreads() -  AtomicGet(m_workingThreads); 
00745       return (freeThreads <= m_queue.size());
00746    }
00747 
00748 private:
00749    // Keep track of the number of threads doing work. Protected by m_guard
00750    size_t m_workingThreads;
00751    
00752 };
00753 
00754 } // end anonymous namespace
00756 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const String& poolName)
00757 {
00758    NullLogger logger;
00759    switch (poolType)
00760    {
00761       case FIXED_SIZE:
00762          m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00763          break;
00764       case DYNAMIC_SIZE:
00765          m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00766          break;
00767       case DYNAMIC_SIZE_NO_QUEUE:
00768          m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00769          break;
00770    }
00771 }
00773 ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
00774 {
00775    switch (poolType)
00776    {
00777       case FIXED_SIZE:
00778          m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00779          break;
00780       case DYNAMIC_SIZE:
00781          m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
00782          break;
00783       case DYNAMIC_SIZE_NO_QUEUE:
00784          m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
00785          break;
00786    }
00787 }
00789 bool ThreadPool::addWork(const RunnableRef& work)
00790 {
00791    return m_impl->addWork(work, Timeout::infinite);
00792 }
00794 bool ThreadPool::tryAddWork(const RunnableRef& work)
00795 {
00796    return m_impl->addWork(work, Timeout::relative(0));
00797 }
00799 bool ThreadPool::tryAddWork(const RunnableRef& work, const Timeout& timeout)
00800 {
00801    return m_impl->addWork(work, timeout);
00802 }
00804 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
00805 {
00806    m_impl->shutdown(finishWorkInQueue, Timeout::relative(shutdownSecs), Timeout::relative(shutdownSecs));
00807 }
00809 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
00810 {
00811    m_impl->shutdown(finishWorkInQueue, timeout, timeout);
00812 }
00814 void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
00815 {
00816    m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
00817 }
00819 void ThreadPool::waitForEmptyQueue()
00820 {
00821    m_impl->waitForEmptyQueue();
00822 }
00824 ThreadPool::~ThreadPool()
00825 {
00826 }
00828 ThreadPool::ThreadPool(const ThreadPool& x)
00829    : IntrusiveCountableBase(x)
00830    , m_impl(x.m_impl)
00831 {
00832 }
00834 ThreadPool& ThreadPool::operator=(const ThreadPool& x)
00835 {
00836    m_impl = x.m_impl;
00837    return *this;
00838 }
00839 
00840 } // end namespace BLOCXX_NAMESPACE
00841