blocxx

Condition.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2006, Novell, Inc. All rights reserved.
00004 * 
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 * 
00008 *     * Redistributions of source code must retain the above copyright notice,
00009 *       this list of conditions and the following disclaimer.
00010 *     * Redistributions in binary form must reproduce the above copyright
00011 *       notice, this list of conditions and the following disclaimer in the
00012 *       documentation and/or other materials provided with the distribution.
00013 *     * Neither the name of 
00014 *       Vintela, Inc., 
00015 *       nor Novell, Inc., 
00016 *       nor the names of its contributors or employees may be used to 
00017 *       endorse or promote products derived from this software without 
00018 *       specific prior written permission.
00019 * 
00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030 * POSSIBILITY OF SUCH DAMAGE.
00031 *******************************************************************************/
00032 
00033 
00037 #include "blocxx/BLOCXX_config.h"
00038 #include "blocxx/Condition.hpp"
00039 #include "blocxx/NonRecursiveMutexLock.hpp"
00040 #include "blocxx/ExceptionIds.hpp"
00041 #include "blocxx/Timeout.hpp"
00042 #include "blocxx/TimeoutTimer.hpp"
00043 #include "blocxx/ThreadImpl.hpp"
00044 
00045 #include <cassert>
00046 #include <cerrno>
00047 #include <limits>
00048 #ifdef BLOCXX_HAVE_SYS_TIME_H
00049 #include <sys/time.h>
00050 #endif
00051 
00052 namespace BLOCXX_NAMESPACE
00053 {
00054 
00055 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionLock);
00056 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ConditionResource);
00057 #if defined(BLOCXX_USE_PTHREAD)
00058 
00059 Condition::Condition()
00060 {
00061    int res = pthread_cond_init(&m_condition, PTHREAD_COND_ATTR_DEFAULT);
00062    if (res != 0)
00063    {
00064       BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable");
00065    }
00066 }
00068 Condition::~Condition()
00069 {
00070    int res = pthread_cond_destroy(&m_condition);
00071    assert(res == 0);
00072 }
00074 void
00075 Condition::notifyOne()
00076 {
00077    int res = pthread_cond_signal(&m_condition);
00078    assert(res == 0);
00079 }
00081 void
00082 Condition::notifyAll()
00083 {
00084    int res = pthread_cond_broadcast(&m_condition);
00085    assert(res == 0);
00086 }
00088 void
00089 Condition::doWait(NonRecursiveMutex& mutex)
00090 {
00091    ThreadImpl::testCancel();
00092    int res;
00093    NonRecursiveMutexLockState state;
00094    mutex.conditionPreWait(state);
00095    res = pthread_cond_wait(&m_condition, state.pmutex);
00096    mutex.conditionPostWait(state);
00097    assert(res == 0 || res == EINTR);
00098    if (res == EINTR)
00099    {
00100       ThreadImpl::testCancel();
00101    }
00102 }
00104 namespace
00105 {
00106    inline 
00107    bool timespec_less(struct timespec const & x, struct timespec const & y)
00108    {
00109       return x.tv_sec < y.tv_sec ||
00110          x.tv_sec == y.tv_sec && x.tv_nsec < y.tv_nsec;
00111    }
00112 
00113    int check_timedwait(
00114       int rc, pthread_cond_t * cond, pthread_mutex_t * mtx,
00115       struct timespec const * abstime
00116    )
00117    {
00118 #ifdef BLOCXX_NCR
00119       if (rc == -1 && errno == EAGAIN)
00120       {
00121          return ETIMEDOUT;
00122       }
00123 #endif
00124       if (rc != EINVAL)
00125       {
00126          return rc;
00127       }
00128       // Solaris won't let you wait more than 10 ** 8 seconds.
00129       time_t const max_future = 99999999;
00130       time_t const max_time = std::numeric_limits<time_t>::max();
00131       time_t now_sec = DateTime::getCurrent().get();
00132       struct timespec new_abstime;
00133       new_abstime.tv_sec = (
00134          now_sec <= max_time - max_future
00135          ? now_sec + max_future
00136          : max_time
00137       );
00138       new_abstime.tv_nsec = 0;
00139       bool early = timespec_less(new_abstime, *abstime);
00140       if (!early)
00141       {
00142          new_abstime = *abstime;
00143       }
00144       int newrc = pthread_cond_timedwait(cond, mtx, &new_abstime);
00145       return (newrc == ETIMEDOUT && early ? EINTR : newrc);
00146    }
00147 }
00148 
00149 bool
00150 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
00151 {
00152    ThreadImpl::testCancel();
00153    int res;
00154    NonRecursiveMutexLockState state;
00155    mutex.conditionPreWait(state);
00156    bool ret = false;
00157 
00158    timespec ts;
00159    TimeoutTimer timer(timeout);
00160 
00161    res = pthread_cond_timedwait(&m_condition, state.pmutex, timer.asTimespec(ts));
00162    res = check_timedwait(res, &m_condition, state.pmutex, &ts);
00163    mutex.conditionPostWait(state);
00164    assert(res == 0 || res == ETIMEDOUT || res == EINTR);
00165    if (res == EINTR)
00166    {
00167       ThreadImpl::testCancel();
00168    }
00169    ret = res != ETIMEDOUT;
00170    return ret;
00171 }
00172 #elif defined (BLOCXX_WIN32)
00173 
00174 Condition::Condition()
00175    : m_condition(new ConditionInfo_t)
00176 {
00177    m_condition->waitersCount = 0;
00178    m_condition->wasBroadcast = false;
00179    m_condition->queue = ::CreateSemaphore(
00180       NULL,    // No security
00181       0,       // initially 0
00182       0x7fffffff, // max count
00183       NULL);      // Unnamed
00184    ::InitializeCriticalSection(&m_condition->waitersCountLock);
00185    m_condition->waitersDone = ::CreateEvent(
00186       NULL,    // No security
00187       false,      // auto-reset
00188       false,      // non-signaled initially
00189       NULL);      // Unnamed
00190 }
00192 Condition::~Condition()
00193 {
00194    ::CloseHandle(m_condition->queue);
00195    ::DeleteCriticalSection(&m_condition->waitersCountLock);
00196    ::CloseHandle(m_condition->waitersDone);
00197    delete m_condition;
00198 }
00200 void
00201 Condition::notifyOne()
00202 {
00203    ::EnterCriticalSection(&m_condition->waitersCountLock);
00204    bool haveWaiters = m_condition->waitersCount > 0;
00205    ::LeaveCriticalSection(&m_condition->waitersCountLock);
00206 
00207    // If no threads waiting, then this is a no-op
00208    if (haveWaiters)
00209    {
00210       ::ReleaseSemaphore(m_condition->queue, 1, 0);
00211    }
00212 }
00214 void
00215 Condition::notifyAll()
00216 {
00217    ::EnterCriticalSection(&m_condition->waitersCountLock);
00218    bool haveWaiters = false;
00219    if (m_condition->waitersCount > 0)
00220    {
00221       // It's gonna be a broadcast, even if there's only one waiting thread.
00222       haveWaiters = m_condition->wasBroadcast = true;
00223    }
00224 
00225    if (haveWaiters)
00226    {
00227       // Wake up all the waiting threads atomically
00228       ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
00229       ::LeaveCriticalSection(&m_condition->waitersCountLock);
00230 
00231       // Wait for all the threads to acquire the counting semaphore
00232       ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
00233       m_condition->wasBroadcast = false;
00234    }
00235    else
00236    {
00237       ::LeaveCriticalSection(&m_condition->waitersCountLock);
00238    }
00239 }
00241 void
00242 Condition::doWait(NonRecursiveMutex& mutex)
00243 {
00244   doTimedWait(mutex, Timeout::infinite);
00245 }
00247 bool
00248 //Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
00249 Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
00250 {
00251    ThreadImpl::testCancel();
00252    bool cc = true;
00253    NonRecursiveMutexLockState state;
00254    mutex.conditionPreWait(state);
00255 
00256    ::EnterCriticalSection(&m_condition->waitersCountLock);
00257    m_condition->waitersCount++;
00258    ::LeaveCriticalSection(&m_condition->waitersCountLock);
00259 
00260    TimeoutTimer timer(timeout);
00261    // Atomically release the mutex and wait on the
00262    // queue until signal/broadcast.
00263    if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, timer.asDWORDMs(),
00264       false) == WAIT_TIMEOUT)
00265    {
00266       cc = false;
00267    }
00268 
00269    ::EnterCriticalSection(&m_condition->waitersCountLock);
00270    m_condition->waitersCount--;
00271 
00272    // Check to see if we're the last waiter after the broadcast
00273    bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
00274       && cc == true);
00275 
00276    ::LeaveCriticalSection(&m_condition->waitersCountLock);
00277 
00278    // If this is the last thread waiting for this broadcast, then let all the
00279    // other threads proceed.
00280    if (isLastWaiter)
00281    {
00282       // Atomically signal the waitersDone event and wait to acquire
00283       // the external mutex. Enusres fairness
00284       ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
00285          INFINITE, false);
00286    }
00287    else
00288    {
00289       // Re-gain ownership of the external mutex
00290       ::WaitForSingleObject(mutex.m_mutex, INFINITE);
00291    }
00292    mutex.conditionPostWait(state);
00293    return cc;
00294 }
00295 #else
00296 #error "port me!"
00297 #endif
00298 
00299 void
00300 Condition::wait(NonRecursiveMutexLock& lock)
00301 {
00302    if (!lock.isLocked())
00303    {
00304       BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00305    }
00306    doWait(*(lock.m_mutex));
00307 }
00309 bool
00310 Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
00311 {
00312    return timedWait(lock, Timeout::relative(sTimeout + static_cast<float>(usTimeout) / 1000000.0));
00313 }
00314 
00316 bool
00317 Condition::timedWait(NonRecursiveMutexLock& lock, const Timeout& timeout)
00318 {
00319    if (!lock.isLocked())
00320    {
00321       BLOCXX_THROW(ConditionLockException, "Lock must be locked");
00322    }
00323    return doTimedWait(*(lock.m_mutex), timeout);
00324 }
00325 
00326 } // end namespace BLOCXX_NAMESPACE
00327