blocxx

WaitpidThreadFix.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Quest Software, Inc. All rights reserved.
00003 * 
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 * 
00007 *     * Redistributions of source code must retain the above copyright notice,
00008 *       this list of conditions and the following disclaimer.
00009 *     * Redistributions in binary form must reproduce the above copyright
00010 *       notice, this list of conditions and the following disclaimer in the
00011 *       documentation and/or other materials provided with the distribution.
00012 *     * Neither the name of the Network Associates, nor Quest Software, Inc., nor the
00013 *       names of its contributors or employees may be used to endorse or promote
00014 *       products derived from this software without specific prior written
00015 *       permission.
00016 * 
00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027 * POSSIBILITY OF SUCH DAMAGE.
00028 *******************************************************************************/
00029 
00034 #include "blocxx/Thread.hpp"
00035 #include "blocxx/WaitpidThreadFix.hpp"
00036 #include "blocxx/Exec.hpp"
00037 #include "blocxx/WaitpidThreadFixFwd.hpp"
00038 #include "blocxx/ThreadOnce.hpp"
00039 #include "blocxx/NonRecursiveMutex.hpp"
00040 #include "blocxx/NonRecursiveMutexLock.hpp"
00041 #include "blocxx/Condition.hpp"
00042 #include "blocxx/Reference.hpp"
00043 #include "blocxx/IntrusiveReference.hpp"
00044 #include <queue>
00045 #include <sys/types.h>
00046 #ifndef BLOCXX_WIN32
00047 #include <sys/wait.h>
00048 #endif
00049 
00050 using namespace blocxx;
00051 
00052 namespace BLOCXX_NAMESPACE
00053 {
00054 
00055 namespace 
00056 {
00057    bool g_useWaitpidThreadFix = 
00058 #ifdef BLOCXX_WAITPID_THREADING_PROBLEM
00059       true;
00060 #else
00061       false;
00062 #endif
00063 
00064    class ProcessThread;
00065 
00066    OnceFlag g_initThreadGuard = BLOCXX_ONCE_INIT;
00067    ProcessThread* g_processThread = 0;
00068 
00069    void initThread();
00070 
00071    Thread_t getWorkerThreadId();
00072 
00073 }
00074 
00075 bool WaitpidThreadFix::setWaitpidThreadFixEnabled(bool enabled)
00076 {
00077    bool rv = g_useWaitpidThreadFix;
00078    g_useWaitpidThreadFix = enabled;
00079    return rv;
00080 }
00081 
00082 bool WaitpidThreadFix::shouldUseWaitpidThreadFix()
00083 {
00084    if (!g_useWaitpidThreadFix)
00085    {
00086       return false;
00087    }
00088    Thread_t currThread = ThreadImpl::currentThread();
00089    Thread_t workerThread = getWorkerThreadId();
00090 
00091    // If we are already in the WaitpidThreadFix worker thread
00092    // then we dont want to cause an infinite loop
00093    if (ThreadImpl::sameThreads(currThread, workerThread))
00094    {
00095       return false;
00096    }
00097    return true;
00098 }
00099 
00100 namespace 
00101 {
00102    typedef Reference<Exception> ExceptionPtr;
00103 
00104 
00105    class WorkSignal
00106    {
00107    public:
00108       WorkSignal()
00109          : m_signal(false)
00110       {
00111       }
00112       
00113       ~WorkSignal()
00114       {
00115       }
00116 
00117       void signal()
00118       {
00119          NonRecursiveMutexLock lock(m_mutex);
00120          m_signal = true;
00121          m_cond.notifyAll();
00122       }
00123 
00124       void waitForSignal()
00125       {
00126          NonRecursiveMutexLock lock(m_mutex);
00127 
00128          while(!m_signal)
00129          {
00130             m_cond.wait(lock);
00131          }
00132       }
00133 
00134    private:
00135       bool m_signal;
00136       Condition m_cond;
00137       NonRecursiveMutex m_mutex;
00138    };
00139 
00140    //***************************************************************************
00141    // - This base class represents the work to be performed by ControlledAccessThread
00142    // - This class and all derived classes must be thread safe
00143    class WorkItem : public IntrusiveCountableBase
00144    {
00145    public:
00146       virtual ~WorkItem() 
00147       {
00148       }
00149 
00150       virtual void doWork() = 0;
00151 
00152       void signalDone()
00153       {
00154          m_doneSig.signal();
00155       }
00156 
00157       void saveException(Exception* err)
00158       {
00159          NonRecursiveMutexLock lock(m_errMutex);
00160          m_err = err;
00161       }
00162 
00163       Exception* getException() 
00164       {
00165          NonRecursiveMutexLock lock(m_errMutex);
00166          return m_err.getPtr();
00167       }
00168 
00169    protected:
00170       ExceptionPtr m_err;
00171       NonRecursiveMutex m_errMutex;
00172       WorkSignal m_doneSig;
00173    };
00174 
00175 
00176    //***************************************************************************
00177    class SpawnWorkItem : public WorkItem
00178    {
00179    public:
00180       SpawnWorkItem(char const * execPath, char const * const argv[], 
00181                  char const * const envp[], Exec::PreExec & preExec)
00182          : m_execPath(execPath)
00183          , m_argv(argv)
00184          , m_envp(envp)
00185          , m_preExec(preExec)
00186       {
00187       }
00188 
00189       virtual ~SpawnWorkItem() 
00190       {
00191       }
00192          
00193       virtual void doWork()
00194       {
00195          NonRecursiveMutexLock lock(m_resultMutex);
00196          m_result = Exec::spawnImpl(m_execPath, m_argv, m_envp, m_preExec);
00197       }
00198 
00199       ProcessRef waitTillDone()
00200       {
00201          m_doneSig.waitForSignal();
00202 
00203          NonRecursiveMutexLock lock(m_resultMutex);
00204          return m_result;
00205       }
00206 
00207    protected:
00208       ProcessRef m_result;
00209       NonRecursiveMutex m_resultMutex;
00210 
00211       const char * m_execPath;
00212       const char * const * m_argv; 
00213       const char * const * m_envp;
00214       Exec::PreExec& m_preExec;
00215    };
00216 
00217 
00218    //***************************************************************************
00219    class WaitpidWorkItem : public WorkItem
00220    {
00221    public:
00222       WaitpidWorkItem(const ::pid_t& pid) 
00223          : m_pid(pid)
00224       {
00225       }
00226 
00227       virtual ~WaitpidWorkItem() 
00228       {
00229       }
00230 
00231       virtual void doWork()
00232       {
00233          NonRecursiveMutexLock lock(m_resultMutex);
00234          m_result = pollStatusImpl(m_pid);
00235       }
00236 
00237       Process::Status waitTillDone()
00238       {
00239          m_doneSig.waitForSignal();
00240 
00241          NonRecursiveMutexLock lock(m_resultMutex);
00242          return m_result;
00243       }
00244 
00245 
00246    protected:
00247       Process::Status m_result;
00248       NonRecursiveMutex m_resultMutex;
00249       
00250       const ::pid_t& m_pid; 
00251    };
00252 
00253    typedef IntrusiveReference<SpawnWorkItem> SpawnWorkItemPtr;
00254    typedef IntrusiveReference<WaitpidWorkItem> WaitpidWorkItemPtr;
00255 
00256    class WorkQueue
00257    {
00258    public:
00259       WorkQueue() {}
00260       virtual ~WorkQueue() {}
00261 
00262       WorkItem* getWork()
00263       {
00264          NonRecursiveMutexLock lock(m_workMutex);
00265 
00266          // Wait for some work to show up 
00267          // by checking the predicate in a loop
00268          while(m_work.empty())
00269          {
00270             m_workNotEmpty.wait(lock);
00271          }
00272 
00273          WorkItem* newWork = m_work.front();
00274          m_work.pop();
00275 
00276          return newWork;
00277       }
00278 
00279       void addWork(WorkItem* newWork)
00280       {
00281          NonRecursiveMutexLock lock(m_workMutex);
00282          m_work.push(newWork);
00283          m_workNotEmpty.notifyAll();
00284       }
00285 
00286    private:
00287       std::queue<WorkItem*> m_work;
00288       Condition m_workNotEmpty;
00289       NonRecursiveMutex m_workMutex;
00290    };
00291 
00292    //***************************************************************************
00293    // This is the worker thread that launches processes and/or calls 
00294    // waitpid on them when BLOCXX_WAITPID_THREADING_PROBLEM is defined
00295    //***************************************************************************
00296    class ProcessThread : public Thread
00297    {
00298    public:
00299       ProcessThread();
00300       virtual ~ProcessThread(); 
00301 
00302       virtual Int32 run();
00303 
00304       ProcessRef spawn(
00305          char const * exec_path, 
00306          char const * const argv[], 
00307          char const * const envp[], 
00308          Exec::PreExec & pre_exec
00309          );
00310 
00311       Process::Status waitPid(const ProcId& pid);
00312 
00313    protected:
00314       WorkQueue m_workQueue;
00315 
00316       NonRecursiveMutex m_idMutex;
00317    };
00318 
00319    ProcessThread::ProcessThread()
00320    {
00321    }
00322 
00323    ProcessThread::~ProcessThread() 
00324    {
00325    }
00326 
00327    // This function will never exit until the process terminates itself.
00328    Int32 ProcessThread::run()
00329    {
00330       // Infinite loop.
00331       while(true)
00332       {
00333          WorkItem* newWork; 
00334          newWork = m_workQueue.getWork();
00335 
00336          try
00337          {
00338             newWork->doWork();
00339          }
00340          catch(Exception& e)
00341          {
00342             newWork->saveException(e.clone());
00343          }
00344          newWork->signalDone();
00345       }
00346 
00347       // A return (never reached) to make various compilers happy.
00348       return 0;
00349    }
00350 
00351    ProcessRef ProcessThread::spawn(char const * exec_path, char const * const argv[], 
00352          char const * const envp[], Exec::PreExec & pre_exec)
00353    {
00354       SpawnWorkItemPtr newWork(new SpawnWorkItem(exec_path, argv, envp, pre_exec));
00355       m_workQueue.addWork(newWork.getPtr());
00356 
00357       ProcessRef result = newWork->waitTillDone();
00358 
00359       Exception* err = newWork->getException();
00360       if(err != 0)
00361       {
00362          err->rethrow();
00363       }
00364 
00365       return result;
00366    }
00367 
00368    Process::Status ProcessThread::waitPid(const ProcId& pid)
00369    {
00370       WaitpidWorkItemPtr newWork(new WaitpidWorkItem(pid));
00371       m_workQueue.addWork(newWork.getPtr());
00372 
00373       Process::Status result = newWork->waitTillDone();
00374 
00375       Exception* err = newWork->getException();
00376       if(err != 0)
00377       {
00378          err->rethrow();
00379       }
00380 
00381       return result;
00382    }
00383 
00384 
00385    void initThread()
00386    {
00387       // create the worker thread
00388       g_processThread = new ProcessThread(); 
00389       g_processThread->start(); 
00390    }
00391 
00392    Thread_t getWorkerThreadId()
00393    {
00394       callOnce(g_initThreadGuard, initThread);
00395       return g_processThread->getId();
00396    }
00397 
00398 } // namespace (anon)
00399 
00400 
00401 ProcessRef WaitpidThreadFix::spawnProcess(char const * exec_path, 
00402    char const * const argv[], char const * const envp[], Exec::PreExec & pre_exec)
00403 {
00404    callOnce(g_initThreadGuard, initThread);
00405    return g_processThread->spawn(exec_path, argv, envp, pre_exec);
00406 }
00407 
00408 Process::Status WaitpidThreadFix::waitPid(const ProcId& pid)
00409 {
00410    callOnce(g_initThreadGuard, initThread);
00411    return g_processThread->waitPid(pid);
00412 }
00413 
00414 } //namespace BLOCXX_NAMESPACE
00415