blocxx
|
00001 /******************************************************************************* 00002 * Copyright (C) 2005, Quest Software, Inc. All rights reserved. 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * 00007 * * Redistributions of source code must retain the above copyright notice, 00008 * this list of conditions and the following disclaimer. 00009 * * Redistributions in binary form must reproduce the above copyright 00010 * notice, this list of conditions and the following disclaimer in the 00011 * documentation and/or other materials provided with the distribution. 00012 * * Neither the name of the Network Associates, nor Quest Software, Inc., nor the 00013 * names of its contributors or employees may be used to endorse or promote 00014 * products derived from this software without specific prior written 00015 * permission. 00016 * 00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 * POSSIBILITY OF SUCH DAMAGE. 00028 *******************************************************************************/ 00029 00034 #include "blocxx/Thread.hpp" 00035 #include "blocxx/WaitpidThreadFix.hpp" 00036 #include "blocxx/Exec.hpp" 00037 #include "blocxx/WaitpidThreadFixFwd.hpp" 00038 #include "blocxx/ThreadOnce.hpp" 00039 #include "blocxx/NonRecursiveMutex.hpp" 00040 #include "blocxx/NonRecursiveMutexLock.hpp" 00041 #include "blocxx/Condition.hpp" 00042 #include "blocxx/Reference.hpp" 00043 #include "blocxx/IntrusiveReference.hpp" 00044 #include <queue> 00045 #include <sys/types.h> 00046 #ifndef BLOCXX_WIN32 00047 #include <sys/wait.h> 00048 #endif 00049 00050 using namespace blocxx; 00051 00052 namespace BLOCXX_NAMESPACE 00053 { 00054 00055 namespace 00056 { 00057 bool g_useWaitpidThreadFix = 00058 #ifdef BLOCXX_WAITPID_THREADING_PROBLEM 00059 true; 00060 #else 00061 false; 00062 #endif 00063 00064 class ProcessThread; 00065 00066 OnceFlag g_initThreadGuard = BLOCXX_ONCE_INIT; 00067 ProcessThread* g_processThread = 0; 00068 00069 void initThread(); 00070 00071 Thread_t getWorkerThreadId(); 00072 00073 } 00074 00075 bool WaitpidThreadFix::setWaitpidThreadFixEnabled(bool enabled) 00076 { 00077 bool rv = g_useWaitpidThreadFix; 00078 g_useWaitpidThreadFix = enabled; 00079 return rv; 00080 } 00081 00082 bool WaitpidThreadFix::shouldUseWaitpidThreadFix() 00083 { 00084 if (!g_useWaitpidThreadFix) 00085 { 00086 return false; 00087 } 00088 Thread_t currThread = ThreadImpl::currentThread(); 00089 Thread_t workerThread = getWorkerThreadId(); 00090 00091 // If we are already in the WaitpidThreadFix worker thread 00092 // then we dont want to cause an infinite loop 00093 if (ThreadImpl::sameThreads(currThread, workerThread)) 00094 { 00095 return false; 00096 } 00097 return true; 00098 } 00099 00100 namespace 00101 { 00102 typedef Reference<Exception> ExceptionPtr; 00103 00104 00105 class WorkSignal 00106 { 00107 public: 00108 WorkSignal() 00109 : m_signal(false) 00110 { 00111 } 00112 00113 ~WorkSignal() 00114 { 00115 } 00116 00117 void signal() 00118 { 00119 NonRecursiveMutexLock lock(m_mutex); 00120 m_signal = true; 00121 m_cond.notifyAll(); 00122 } 00123 00124 void waitForSignal() 00125 { 00126 NonRecursiveMutexLock lock(m_mutex); 00127 00128 while(!m_signal) 00129 { 00130 m_cond.wait(lock); 00131 } 00132 } 00133 00134 private: 00135 bool m_signal; 00136 Condition m_cond; 00137 NonRecursiveMutex m_mutex; 00138 }; 00139 00140 //*************************************************************************** 00141 // - This base class represents the work to be performed by ControlledAccessThread 00142 // - This class and all derived classes must be thread safe 00143 class WorkItem : public IntrusiveCountableBase 00144 { 00145 public: 00146 virtual ~WorkItem() 00147 { 00148 } 00149 00150 virtual void doWork() = 0; 00151 00152 void signalDone() 00153 { 00154 m_doneSig.signal(); 00155 } 00156 00157 void saveException(Exception* err) 00158 { 00159 NonRecursiveMutexLock lock(m_errMutex); 00160 m_err = err; 00161 } 00162 00163 Exception* getException() 00164 { 00165 NonRecursiveMutexLock lock(m_errMutex); 00166 return m_err.getPtr(); 00167 } 00168 00169 protected: 00170 ExceptionPtr m_err; 00171 NonRecursiveMutex m_errMutex; 00172 WorkSignal m_doneSig; 00173 }; 00174 00175 00176 //*************************************************************************** 00177 class SpawnWorkItem : public WorkItem 00178 { 00179 public: 00180 SpawnWorkItem(char const * execPath, char const * const argv[], 00181 char const * const envp[], Exec::PreExec & preExec) 00182 : m_execPath(execPath) 00183 , m_argv(argv) 00184 , m_envp(envp) 00185 , m_preExec(preExec) 00186 { 00187 } 00188 00189 virtual ~SpawnWorkItem() 00190 { 00191 } 00192 00193 virtual void doWork() 00194 { 00195 NonRecursiveMutexLock lock(m_resultMutex); 00196 m_result = Exec::spawnImpl(m_execPath, m_argv, m_envp, m_preExec); 00197 } 00198 00199 ProcessRef waitTillDone() 00200 { 00201 m_doneSig.waitForSignal(); 00202 00203 NonRecursiveMutexLock lock(m_resultMutex); 00204 return m_result; 00205 } 00206 00207 protected: 00208 ProcessRef m_result; 00209 NonRecursiveMutex m_resultMutex; 00210 00211 const char * m_execPath; 00212 const char * const * m_argv; 00213 const char * const * m_envp; 00214 Exec::PreExec& m_preExec; 00215 }; 00216 00217 00218 //*************************************************************************** 00219 class WaitpidWorkItem : public WorkItem 00220 { 00221 public: 00222 WaitpidWorkItem(const ::pid_t& pid) 00223 : m_pid(pid) 00224 { 00225 } 00226 00227 virtual ~WaitpidWorkItem() 00228 { 00229 } 00230 00231 virtual void doWork() 00232 { 00233 NonRecursiveMutexLock lock(m_resultMutex); 00234 m_result = pollStatusImpl(m_pid); 00235 } 00236 00237 Process::Status waitTillDone() 00238 { 00239 m_doneSig.waitForSignal(); 00240 00241 NonRecursiveMutexLock lock(m_resultMutex); 00242 return m_result; 00243 } 00244 00245 00246 protected: 00247 Process::Status m_result; 00248 NonRecursiveMutex m_resultMutex; 00249 00250 const ::pid_t& m_pid; 00251 }; 00252 00253 typedef IntrusiveReference<SpawnWorkItem> SpawnWorkItemPtr; 00254 typedef IntrusiveReference<WaitpidWorkItem> WaitpidWorkItemPtr; 00255 00256 class WorkQueue 00257 { 00258 public: 00259 WorkQueue() {} 00260 virtual ~WorkQueue() {} 00261 00262 WorkItem* getWork() 00263 { 00264 NonRecursiveMutexLock lock(m_workMutex); 00265 00266 // Wait for some work to show up 00267 // by checking the predicate in a loop 00268 while(m_work.empty()) 00269 { 00270 m_workNotEmpty.wait(lock); 00271 } 00272 00273 WorkItem* newWork = m_work.front(); 00274 m_work.pop(); 00275 00276 return newWork; 00277 } 00278 00279 void addWork(WorkItem* newWork) 00280 { 00281 NonRecursiveMutexLock lock(m_workMutex); 00282 m_work.push(newWork); 00283 m_workNotEmpty.notifyAll(); 00284 } 00285 00286 private: 00287 std::queue<WorkItem*> m_work; 00288 Condition m_workNotEmpty; 00289 NonRecursiveMutex m_workMutex; 00290 }; 00291 00292 //*************************************************************************** 00293 // This is the worker thread that launches processes and/or calls 00294 // waitpid on them when BLOCXX_WAITPID_THREADING_PROBLEM is defined 00295 //*************************************************************************** 00296 class ProcessThread : public Thread 00297 { 00298 public: 00299 ProcessThread(); 00300 virtual ~ProcessThread(); 00301 00302 virtual Int32 run(); 00303 00304 ProcessRef spawn( 00305 char const * exec_path, 00306 char const * const argv[], 00307 char const * const envp[], 00308 Exec::PreExec & pre_exec 00309 ); 00310 00311 Process::Status waitPid(const ProcId& pid); 00312 00313 protected: 00314 WorkQueue m_workQueue; 00315 00316 NonRecursiveMutex m_idMutex; 00317 }; 00318 00319 ProcessThread::ProcessThread() 00320 { 00321 } 00322 00323 ProcessThread::~ProcessThread() 00324 { 00325 } 00326 00327 // This function will never exit until the process terminates itself. 00328 Int32 ProcessThread::run() 00329 { 00330 // Infinite loop. 00331 while(true) 00332 { 00333 WorkItem* newWork; 00334 newWork = m_workQueue.getWork(); 00335 00336 try 00337 { 00338 newWork->doWork(); 00339 } 00340 catch(Exception& e) 00341 { 00342 newWork->saveException(e.clone()); 00343 } 00344 newWork->signalDone(); 00345 } 00346 00347 // A return (never reached) to make various compilers happy. 00348 return 0; 00349 } 00350 00351 ProcessRef ProcessThread::spawn(char const * exec_path, char const * const argv[], 00352 char const * const envp[], Exec::PreExec & pre_exec) 00353 { 00354 SpawnWorkItemPtr newWork(new SpawnWorkItem(exec_path, argv, envp, pre_exec)); 00355 m_workQueue.addWork(newWork.getPtr()); 00356 00357 ProcessRef result = newWork->waitTillDone(); 00358 00359 Exception* err = newWork->getException(); 00360 if(err != 0) 00361 { 00362 err->rethrow(); 00363 } 00364 00365 return result; 00366 } 00367 00368 Process::Status ProcessThread::waitPid(const ProcId& pid) 00369 { 00370 WaitpidWorkItemPtr newWork(new WaitpidWorkItem(pid)); 00371 m_workQueue.addWork(newWork.getPtr()); 00372 00373 Process::Status result = newWork->waitTillDone(); 00374 00375 Exception* err = newWork->getException(); 00376 if(err != 0) 00377 { 00378 err->rethrow(); 00379 } 00380 00381 return result; 00382 } 00383 00384 00385 void initThread() 00386 { 00387 // create the worker thread 00388 g_processThread = new ProcessThread(); 00389 g_processThread->start(); 00390 } 00391 00392 Thread_t getWorkerThreadId() 00393 { 00394 callOnce(g_initThreadGuard, initThread); 00395 return g_processThread->getId(); 00396 } 00397 00398 } // namespace (anon) 00399 00400 00401 ProcessRef WaitpidThreadFix::spawnProcess(char const * exec_path, 00402 char const * const argv[], char const * const envp[], Exec::PreExec & pre_exec) 00403 { 00404 callOnce(g_initThreadGuard, initThread); 00405 return g_processThread->spawn(exec_path, argv, envp, pre_exec); 00406 } 00407 00408 Process::Status WaitpidThreadFix::waitPid(const ProcId& pid) 00409 { 00410 callOnce(g_initThreadGuard, initThread); 00411 return g_processThread->waitPid(pid); 00412 } 00413 00414 } //namespace BLOCXX_NAMESPACE 00415