blocxx

Select.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2006, Novell, Inc. All rights reserved.
00004 * 
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 * 
00008 *     * Redistributions of source code must retain the above copyright notice,
00009 *       this list of conditions and the following disclaimer.
00010 *     * Redistributions in binary form must reproduce the above copyright
00011 *       notice, this list of conditions and the following disclaimer in the
00012 *       documentation and/or other materials provided with the distribution.
00013 *     * Neither the name of 
00014 *       Vintela, Inc., 
00015 *       nor Novell, Inc., 
00016 *       nor the names of its contributors or employees may be used to 
00017 *       endorse or promote products derived from this software without 
00018 *       specific prior written permission.
00019 * 
00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030 * POSSIBILITY OF SUCH DAMAGE.
00031 *******************************************************************************/
00032 
00033 
00039 #include "blocxx/BLOCXX_config.h"
00040 #include "blocxx/Select.hpp"
00041 #include "blocxx/AutoPtr.hpp"
00042 #include "blocxx/Assertion.hpp"
00043 #include "blocxx/Thread.hpp" // for testCancel()
00044 #include "blocxx/TimeoutTimer.hpp"
00045 #include "blocxx/AutoDescriptor.hpp"
00046 
00047 #if defined(BLOCXX_WIN32)
00048 #include <cassert>
00049 #endif
00050 
00051 extern "C"
00052 {
00053 
00054 #ifndef BLOCXX_WIN32
00055  #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00056   #include <sys/epoll.h>
00057  #endif
00058  #if defined (BLOCXX_HAVE_SYS_POLL_H)
00059   #include <sys/poll.h>
00060  #endif
00061  #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00062   #include <sys/select.h>
00063  #endif
00064 #endif
00065 
00066 #ifdef BLOCXX_HAVE_SYS_TIME_H
00067  #include <sys/time.h>
00068 #endif
00069 
00070 #include <sys/types.h>
00071 
00072 #ifdef BLOCXX_HAVE_UNISTD_H
00073  #include <unistd.h>
00074 #endif
00075 
00076 #include <errno.h>
00077 }
00078 
00079 namespace BLOCXX_NAMESPACE
00080 {
00081 
00082 namespace Select
00083 {
00084 
00085 namespace
00086 {
00087    const float LOOP_TIMEOUT = 10.0;
00088 }
00090 // deprecated in 4.0.0
00091 int
00092 selectRW(SelectObjectArray& selarray, UInt32 ms)
00093 {
00094    return selectRW(selarray, Timeout::relative(static_cast<float>(ms) * 1000));
00095 }
00096 
00097 #if defined(BLOCXX_WIN32)
00098 
00099 int
00100 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
00101 {
00102    int rc;
00103    size_t hcount = static_cast<DWORD>(selarray.size());
00104    AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
00105 
00106    size_t handleidx = 0;
00107    for (size_t i = 0; i < selarray.size(); i++, handleidx++)
00108    {
00109       if(selarray[i].s.isSocket && selarray[i].s.networkevents)
00110       {
00111          ::WSAEventSelect(selarray[i].s.sockfd, 
00112             selarray[i].s.event, selarray[i].s.networkevents);
00113       }
00114             
00115       hdls[handleidx] = selarray[i].s.event;
00116    }
00117 
00118    TimeoutTimer timer(timeout);
00119    timer.start();
00120    DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timer.asDWORDMs());
00121 
00122    assert(cc != WAIT_ABANDONED);
00123 
00124    switch (cc)
00125    {
00126       case WAIT_FAILED:
00127          rc = Select::SELECT_ERROR;
00128          break;
00129       case WAIT_TIMEOUT:
00130          rc = Select::SELECT_TIMEOUT;
00131          break;
00132       default:
00133          rc = cc - WAIT_OBJECT_0;
00134          
00135          // If this is a socket, set it back to 
00136          // blocking mode
00137          if(selarray[rc].s.isSocket)
00138          {
00139             if(selarray[rc].s.networkevents
00140                && selarray[rc].s.doreset == false)
00141             {
00142                ::WSAEventSelect(selarray[rc].s.sockfd, 
00143                   selarray[rc].s.event, selarray[rc].s.networkevents);
00144             }
00145             else
00146             {
00147                // Set socket back to blocking
00148                ::WSAEventSelect(selarray[rc].s.sockfd, 
00149                   selarray[rc].s.event, 0);
00150                u_long ioctlarg = 0;
00151                ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
00152             }
00153          }
00154          break;
00155    }
00156 
00157    if( rc < 0 )
00158       return rc;
00159 
00160    int availableCount = 0;
00161    for (size_t i = 0; i < selarray.size(); i++)
00162    {
00163       if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
00164       {
00165          if( selarray[i].waitForRead )
00166             selarray[i].readAvailable = true;
00167          if( selarray[i].waitForWrite )
00168             selarray[i].writeAvailable = true;
00169          ++availableCount;
00170       }
00171       else
00172       {
00173          selarray[i].readAvailable = false;
00174          selarray[i].writeAvailable = false;
00175       }
00176    }
00177    return availableCount;
00178 }
00179 
00180 
00181 #else
00182 
00184 // epoll version
00185 int
00186 selectRWEpoll(SelectObjectArray& selarray, const Timeout& timeout)
00187 {
00188 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00189    int ecc = 0;
00190    AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
00191    AutoDescriptor epfd(epoll_create(selarray.size()));
00192    if(epfd.get() == -1)
00193    {
00194       if (errno == ENOSYS) // kernel doesn't support it
00195       {
00196          return SELECT_NOT_IMPLEMENTED;
00197       }
00198       // Need to return something else?
00199       return Select::SELECT_ERROR;
00200    }
00201 
00202    UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
00203    UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00204    for (size_t i = 0; i < selarray.size(); i++)
00205    {
00206       BLOCXX_ASSERT(selarray[i].s >= 0);
00207       selarray[i].readAvailable = false;
00208       selarray[i].writeAvailable = false;
00209       selarray[i].wasError = false;
00210       events[i].data = epoll_data_t(); // zero-init to make valgrind happy
00211       events[i].data.u32 = i;
00212       events[i].events = 0;
00213       if(selarray[i].waitForRead)
00214       {
00215          events[i].events |= read_events;
00216       }
00217       if(selarray[i].waitForWrite)
00218       {
00219          events[i].events |= write_events;
00220       }
00221 
00222       if(epoll_ctl(epfd.get(), EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
00223       {
00224          return errno == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
00225       }
00226    }
00227 
00228    // here we spin checking for thread cancellation every so often.
00229 
00230    TimeoutTimer timer(timeout);
00231    timer.start();
00232    int savedErrno;
00233    do
00234    {
00235       Thread::testCancel();
00236       const float maxWaitSec = LOOP_TIMEOUT;
00237       ecc = epoll_wait(epfd.get(), events.get(), selarray.size(), timer.asIntMs(maxWaitSec));
00238       savedErrno = errno;
00239       if (ecc < 0 && errno == EINTR)
00240       {
00241          ecc = 0;
00242          errno = 0;
00243          Thread::testCancel();
00244       }
00245       timer.loop();
00246    } while ((ecc == 0) && !timer.expired());
00247 
00248    if (ecc < 0)
00249    {
00250       errno = savedErrno;
00251       return Select::SELECT_ERROR;
00252    }
00253    if (ecc == 0)
00254    {
00255       return Select::SELECT_TIMEOUT;
00256    }
00257 
00258    for(int i = 0; i < ecc; i++)
00259    {
00260       SelectObject & so = selarray[events[i].data.u32];
00261       so.readAvailable = so.waitForRead && (events[i].events & read_events);
00262       so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
00263    }
00264 
00265    return ecc;
00266 #else
00267    return SELECT_NOT_IMPLEMENTED;
00268 #endif
00269 }
00270 
00272 // poll() version
00273 int
00274 selectRWPoll(SelectObjectArray& selarray, const Timeout& timeout)
00275 {
00276 #if defined (BLOCXX_HAVE_SYS_POLL_H)
00277    int rc = 0;
00278 
00279    AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
00280 
00281    // here we spin checking for thread cancellation every so often.
00282    TimeoutTimer timer(timeout);
00283    timer.start();
00284 
00285    int savedErrno;
00286    do
00287    {
00288       for (size_t i = 0; i < selarray.size(); i++)
00289       {
00290          BLOCXX_ASSERT(selarray[i].s >= 0);
00291          selarray[i].readAvailable = false;
00292          selarray[i].writeAvailable = false;
00293          selarray[i].wasError = false;
00294          pfds[i].revents = 0;
00295          pfds[i].fd = selarray[i].s;
00296          pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
00297          if(selarray[i].waitForWrite)
00298             pfds[i].events |= POLLOUT;
00299       }
00300 
00301       Thread::testCancel();
00302       const float maxWaitSec = LOOP_TIMEOUT;
00303       rc = ::poll(pfds.get(), selarray.size(), timer.asIntMs(maxWaitSec));
00304       savedErrno = errno;
00305       if (rc < 0 && errno == EINTR)
00306       {
00307          rc = 0;
00308          errno = 0;
00309          Thread::testCancel();
00310 #ifdef  BLOCXX_NETWARE
00311          //  When the NetWare server is shutting down, select will
00312          //  set errno to EINTR on return. If this thread does not
00313          //  yield control (cooperative multitasking) then we end
00314          //  up in a very tight loop and get a CPUHog server abbend.
00315          pthread_yield();
00316 #endif
00317       }
00318 
00319       timer.loop();
00320    } while ((rc == 0) && !timer.expired());
00321    
00322    if (rc < 0)
00323    {
00324       errno = savedErrno;
00325       return Select::SELECT_ERROR;
00326    }
00327    if (rc == 0)
00328    {
00329       return Select::SELECT_TIMEOUT;
00330    }
00331    for (size_t i = 0; i < selarray.size(); i++)
00332    {
00333       if (pfds[i].revents & (POLLERR | POLLNVAL))
00334       {
00335          selarray[i].wasError = true;
00336       }
00337 
00338       if(selarray[i].waitForRead)
00339       {
00340          selarray[i].readAvailable = (pfds[i].revents & 
00341             (POLLIN | POLLPRI | POLLHUP));
00342       }
00343 
00344       if(selarray[i].waitForWrite)
00345       {
00346          selarray[i].writeAvailable = (pfds[i].revents &
00347             (POLLOUT | POLLHUP));
00348       }
00349    }
00350 
00351    return rc;
00352 #else
00353    return SELECT_NOT_IMPLEMENTED;
00354 #endif
00355 }
00357 // ::select() version
00358 int
00359 selectRWSelect(SelectObjectArray& selarray, const Timeout& timeout)
00360 {
00361 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00362    int rc = 0;
00363    fd_set ifds;
00364    fd_set ofds;
00365 
00366    // here we spin checking for thread cancellation every so often.
00367    TimeoutTimer timer(timeout);
00368    timer.start();
00369 
00370    int savedErrno;
00371    do
00372    {
00373       int maxfd = 0;
00374       FD_ZERO(&ifds);
00375       FD_ZERO(&ofds);
00376       for (size_t i = 0; i < selarray.size(); ++i)
00377       {
00378          int fd = selarray[i].s;
00379          BLOCXX_ASSERT(fd >= 0);
00380          if (maxfd < fd)
00381          {
00382             maxfd = fd;
00383          }
00384          if (fd < 0 || fd >= FD_SETSIZE)
00385          {
00386             errno = EINVAL;
00387             return Select::SELECT_ERROR;
00388          }
00389          if (selarray[i].waitForRead)
00390          {
00391             FD_SET(fd, &ifds);
00392          }
00393          if (selarray[i].waitForWrite)
00394          {
00395             FD_SET(fd, &ofds);
00396          }
00397       }
00398 
00399       Thread::testCancel();
00400       struct timeval tv;
00401       const float maxWaitSec = LOOP_TIMEOUT;
00402       rc = ::select(maxfd+1, &ifds, &ofds, NULL, timer.asTimeval(tv, maxWaitSec));
00403       savedErrno = errno;
00404       if (rc < 0 && errno == EINTR)
00405       {
00406          rc = 0;
00407          errno = 0;
00408          Thread::testCancel();
00409 #ifdef  BLOCXX_NETWARE
00410          //  When the NetWare server is shutting down, select will
00411          //  set errno to EINTR on return. If this thread does not
00412          //  yield control (cooperative multitasking) then we end
00413          //  up in a very tight loop and get a CPUHog server abbend.
00414          pthread_yield();
00415 #endif
00416       }
00417 
00418       timer.loop();
00419    } while ((rc == 0) && !timer.expired());
00420    
00421    if (rc < 0)
00422    {
00423       errno = savedErrno;
00424       return Select::SELECT_ERROR;
00425    }
00426    if (rc == 0)
00427    {
00428       return Select::SELECT_TIMEOUT;
00429    }
00430    int availableCount = 0;
00431    int cval;
00432    for (size_t i = 0; i < selarray.size(); i++)
00433    {
00434       selarray[i].wasError = false;
00435       cval = 0;
00436       if (FD_ISSET(selarray[i].s, &ifds))
00437       {
00438          selarray[i].readAvailable = true;
00439          cval = 1;
00440       }
00441       else
00442       {
00443          selarray[i].readAvailable = false;
00444       }
00445 
00446       if (FD_ISSET(selarray[i].s, &ofds))
00447       {
00448          selarray[i].writeAvailable = true;
00449          cval = 1;
00450       }
00451       else
00452       {
00453          selarray[i].writeAvailable = false;
00454       }
00455 
00456       availableCount += cval;
00457 
00458    }
00459       
00460    return availableCount;
00461 #else
00462    return SELECT_NOT_IMPLEMENTED;
00463 #endif
00464 }
00465 
00466 int
00467 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
00468 {
00469    int rv = selectRWEpoll(selarray, timeout);
00470    if (rv != SELECT_NOT_IMPLEMENTED)
00471    {
00472       return rv;
00473    }
00474 
00475    rv = selectRWPoll(selarray, timeout);
00476    if (rv != SELECT_NOT_IMPLEMENTED)
00477    {
00478       return rv;
00479    }
00480 
00481    rv = selectRWSelect(selarray, timeout);
00482    BLOCXX_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
00483    return rv;
00484 }
00485 
00487 #endif   // #else BLOCXX_WIN32
00488 
00489 int
00490 select(const SelectTypeArray& selarray, UInt32 ms)
00491 {
00492    return select(selarray, Timeout::relative(static_cast<float>(ms) * 1000.0));
00493 }
00494 
00496 int
00497 select(const SelectTypeArray& selarray, const Timeout& timeout)
00498 {
00499    SelectObjectArray soa;
00500    soa.reserve(selarray.size());
00501    for (size_t i = 0; i < selarray.size(); ++i)
00502    {
00503       SelectObject curObj(selarray[i]);
00504       curObj.waitForRead = true;
00505       soa.push_back(curObj);
00506    }
00507    int rv = selectRW(soa, timeout);
00508    if (rv < 0)
00509    {
00510       return rv;
00511    }
00512 
00513    // find the first selected object
00514    for (size_t i = 0; i < soa.size(); ++i)
00515    {
00516       if (soa[i].readAvailable)
00517       {
00518          return i;
00519       }
00520    }
00521    errno = 0;
00522    return SELECT_ERROR;
00523 }
00524 
00525 } // end namespace Select
00526 
00527 } // end namespace BLOCXX_NAMESPACE
00528