blocxx
|
00001 /******************************************************************************* 00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved. 00003 * Copyright (C) 2006, Novell, Inc. All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright notice, 00009 * this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of 00014 * Vintela, Inc., 00015 * nor Novell, Inc., 00016 * nor the names of its contributors or employees may be used to 00017 * endorse or promote products derived from this software without 00018 * specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00030 * POSSIBILITY OF SUCH DAMAGE. 00031 *******************************************************************************/ 00032 00033 00037 #include "blocxx/BLOCXX_config.h" 00038 #include "blocxx/Condition.hpp" 00039 #include "blocxx/NonRecursiveMutexLock.hpp" 00040 #include "blocxx/ExceptionIds.hpp" 00041 #include "blocxx/Timeout.hpp" 00042 #include "blocxx/TimeoutTimer.hpp" 00043 #include "blocxx/ThreadImpl.hpp" 00044 00045 #include <cassert> 00046 #include <cerrno> 00047 #include <limits> 00048 #ifdef BLOCXX_HAVE_SYS_TIME_H 00049 #include <sys/time.h> 00050 #endif 00051 00052 namespace BLOCXX_NAMESPACE 00053 { 00054 00055 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionLock); 00056 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionResource); 00057 #if defined(BLOCXX_USE_PTHREAD) 00058 00059 Condition::Condition() 00060 { 00061 int res = pthread_cond_init(&m_condition, PTHREAD_COND_ATTR_DEFAULT); 00062 if (res != 0) 00063 { 00064 BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable"); 00065 } 00066 } 00068 Condition::~Condition() 00069 { 00070 int res = pthread_cond_destroy(&m_condition); 00071 assert(res == 0); 00072 } 00074 void 00075 Condition::notifyOne() 00076 { 00077 int res = pthread_cond_signal(&m_condition); 00078 assert(res == 0); 00079 } 00081 void 00082 Condition::notifyAll() 00083 { 00084 int res = pthread_cond_broadcast(&m_condition); 00085 assert(res == 0); 00086 } 00088 void 00089 Condition::doWait(NonRecursiveMutex& mutex) 00090 { 00091 ThreadImpl::testCancel(); 00092 int res; 00093 NonRecursiveMutexLockState state; 00094 mutex.conditionPreWait(state); 00095 res = pthread_cond_wait(&m_condition, state.pmutex); 00096 mutex.conditionPostWait(state); 00097 assert(res == 0 || res == EINTR); 00098 if (res == EINTR) 00099 { 00100 ThreadImpl::testCancel(); 00101 } 00102 } 00104 namespace 00105 { 00106 inline 00107 bool timespec_less(struct timespec const & x, struct timespec const & y) 00108 { 00109 return x.tv_sec < y.tv_sec || 00110 x.tv_sec == y.tv_sec && x.tv_nsec < y.tv_nsec; 00111 } 00112 00113 int check_timedwait( 00114 int rc, pthread_cond_t * cond, pthread_mutex_t * mtx, 00115 struct timespec const * abstime 00116 ) 00117 { 00118 #ifdef BLOCXX_NCR 00119 if (rc == -1 && errno == EAGAIN) 00120 { 00121 return ETIMEDOUT; 00122 } 00123 #endif 00124 if (rc != EINVAL) 00125 { 00126 return rc; 00127 } 00128 // Solaris won't let you wait more than 10 ** 8 seconds. 00129 time_t const max_future = 99999999; 00130 time_t const max_time = std::numeric_limits<time_t>::max(); 00131 time_t now_sec = DateTime::getCurrent().get(); 00132 struct timespec new_abstime; 00133 new_abstime.tv_sec = ( 00134 now_sec <= max_time - max_future 00135 ? now_sec + max_future 00136 : max_time 00137 ); 00138 new_abstime.tv_nsec = 0; 00139 bool early = timespec_less(new_abstime, *abstime); 00140 if (!early) 00141 { 00142 new_abstime = *abstime; 00143 } 00144 int newrc = pthread_cond_timedwait(cond, mtx, &new_abstime); 00145 return (newrc == ETIMEDOUT && early ? EINTR : newrc); 00146 } 00147 } 00148 00149 bool 00150 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout) 00151 { 00152 ThreadImpl::testCancel(); 00153 int res; 00154 NonRecursiveMutexLockState state; 00155 mutex.conditionPreWait(state); 00156 bool ret = false; 00157 00158 timespec ts; 00159 TimeoutTimer timer(timeout); 00160 00161 res = pthread_cond_timedwait(&m_condition, state.pmutex, timer.asTimespec(ts)); 00162 res = check_timedwait(res, &m_condition, state.pmutex, &ts); 00163 mutex.conditionPostWait(state); 00164 assert(res == 0 || res == ETIMEDOUT || res == EINTR); 00165 if (res == EINTR) 00166 { 00167 ThreadImpl::testCancel(); 00168 } 00169 ret = res != ETIMEDOUT; 00170 return ret; 00171 } 00172 #elif defined (BLOCXX_WIN32) 00173 00174 Condition::Condition() 00175 : m_condition(new ConditionInfo_t) 00176 { 00177 m_condition->waitersCount = 0; 00178 m_condition->wasBroadcast = false; 00179 m_condition->queue = ::CreateSemaphore( 00180 NULL, // No security 00181 0, // initially 0 00182 0x7fffffff, // max count 00183 NULL); // Unnamed 00184 ::InitializeCriticalSection(&m_condition->waitersCountLock); 00185 m_condition->waitersDone = ::CreateEvent( 00186 NULL, // No security 00187 false, // auto-reset 00188 false, // non-signaled initially 00189 NULL); // Unnamed 00190 } 00192 Condition::~Condition() 00193 { 00194 ::CloseHandle(m_condition->queue); 00195 ::DeleteCriticalSection(&m_condition->waitersCountLock); 00196 ::CloseHandle(m_condition->waitersDone); 00197 delete m_condition; 00198 } 00200 void 00201 Condition::notifyOne() 00202 { 00203 ::EnterCriticalSection(&m_condition->waitersCountLock); 00204 bool haveWaiters = m_condition->waitersCount > 0; 00205 ::LeaveCriticalSection(&m_condition->waitersCountLock); 00206 00207 // If no threads waiting, then this is a no-op 00208 if (haveWaiters) 00209 { 00210 ::ReleaseSemaphore(m_condition->queue, 1, 0); 00211 } 00212 } 00214 void 00215 Condition::notifyAll() 00216 { 00217 ::EnterCriticalSection(&m_condition->waitersCountLock); 00218 bool haveWaiters = false; 00219 if (m_condition->waitersCount > 0) 00220 { 00221 // It's gonna be a broadcast, even if there's only one waiting thread. 00222 haveWaiters = m_condition->wasBroadcast = true; 00223 } 00224 00225 if (haveWaiters) 00226 { 00227 // Wake up all the waiting threads atomically 00228 ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0); 00229 ::LeaveCriticalSection(&m_condition->waitersCountLock); 00230 00231 // Wait for all the threads to acquire the counting semaphore 00232 ::WaitForSingleObject(m_condition->waitersDone, INFINITE); 00233 m_condition->wasBroadcast = false; 00234 } 00235 else 00236 { 00237 ::LeaveCriticalSection(&m_condition->waitersCountLock); 00238 } 00239 } 00241 void 00242 Condition::doWait(NonRecursiveMutex& mutex) 00243 { 00244 doTimedWait(mutex, Timeout::infinite); 00245 } 00247 bool 00248 //Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout) 00249 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout) 00250 { 00251 ThreadImpl::testCancel(); 00252 bool cc = true; 00253 NonRecursiveMutexLockState state; 00254 mutex.conditionPreWait(state); 00255 00256 ::EnterCriticalSection(&m_condition->waitersCountLock); 00257 m_condition->waitersCount++; 00258 ::LeaveCriticalSection(&m_condition->waitersCountLock); 00259 00260 TimeoutTimer timer(timeout); 00261 // Atomically release the mutex and wait on the 00262 // queue until signal/broadcast. 00263 if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, timer.asDWORDMs(), 00264 false) == WAIT_TIMEOUT) 00265 { 00266 cc = false; 00267 } 00268 00269 ::EnterCriticalSection(&m_condition->waitersCountLock); 00270 m_condition->waitersCount--; 00271 00272 // Check to see if we're the last waiter after the broadcast 00273 bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0 00274 && cc == true); 00275 00276 ::LeaveCriticalSection(&m_condition->waitersCountLock); 00277 00278 // If this is the last thread waiting for this broadcast, then let all the 00279 // other threads proceed. 00280 if (isLastWaiter) 00281 { 00282 // Atomically signal the waitersDone event and wait to acquire 00283 // the external mutex. Enusres fairness 00284 ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex, 00285 INFINITE, false); 00286 } 00287 else 00288 { 00289 // Re-gain ownership of the external mutex 00290 ::WaitForSingleObject(mutex.m_mutex, INFINITE); 00291 } 00292 mutex.conditionPostWait(state); 00293 return cc; 00294 } 00295 #else 00296 #error "port me!" 00297 #endif 00298 00299 void 00300 Condition::wait(NonRecursiveMutexLock& lock) 00301 { 00302 if (!lock.isLocked()) 00303 { 00304 BLOCXX_THROW(ConditionLockException, "Lock must be locked"); 00305 } 00306 doWait(*(lock.m_mutex)); 00307 } 00309 bool 00310 Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout) 00311 { 00312 return timedWait(lock, Timeout::relative(sTimeout + static_cast<float>(usTimeout) / 1000000.0)); 00313 } 00314 00316 bool 00317 Condition::timedWait(NonRecursiveMutexLock& lock, const Timeout& timeout) 00318 { 00319 if (!lock.isLocked()) 00320 { 00321 BLOCXX_THROW(ConditionLockException, "Lock must be locked"); 00322 } 00323 return doTimedWait(*(lock.m_mutex), timeout); 00324 } 00325 00326 } // end namespace BLOCXX_NAMESPACE 00327