blocxx
|
00001 /******************************************************************************* 00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved. 00003 * Copyright (C) 2006, Novell, Inc. All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright notice, 00009 * this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of 00014 * Vintela, Inc., 00015 * nor Novell, Inc., 00016 * nor the names of its contributors or employees may be used to 00017 * endorse or promote products derived from this software without 00018 * specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00030 * POSSIBILITY OF SUCH DAMAGE. 00031 *******************************************************************************/ 00032 00033 00039 #include "blocxx/BLOCXX_config.h" 00040 #include "blocxx/Select.hpp" 00041 #include "blocxx/AutoPtr.hpp" 00042 #include "blocxx/Assertion.hpp" 00043 #include "blocxx/Thread.hpp" // for testCancel() 00044 #include "blocxx/TimeoutTimer.hpp" 00045 #include "blocxx/AutoDescriptor.hpp" 00046 00047 #if defined(BLOCXX_WIN32) 00048 #include <cassert> 00049 #endif 00050 00051 extern "C" 00052 { 00053 00054 #ifndef BLOCXX_WIN32 00055 #ifdef BLOCXX_HAVE_SYS_EPOLL_H 00056 #include <sys/epoll.h> 00057 #endif 00058 #if defined (BLOCXX_HAVE_SYS_POLL_H) 00059 #include <sys/poll.h> 00060 #endif 00061 #if defined (BLOCXX_HAVE_SYS_SELECT_H) 00062 #include <sys/select.h> 00063 #endif 00064 #endif 00065 00066 #ifdef BLOCXX_HAVE_SYS_TIME_H 00067 #include <sys/time.h> 00068 #endif 00069 00070 #include <sys/types.h> 00071 00072 #ifdef BLOCXX_HAVE_UNISTD_H 00073 #include <unistd.h> 00074 #endif 00075 00076 #include <errno.h> 00077 } 00078 00079 namespace BLOCXX_NAMESPACE 00080 { 00081 00082 namespace Select 00083 { 00084 00085 namespace 00086 { 00087 const float LOOP_TIMEOUT = 10.0; 00088 } 00090 // deprecated in 4.0.0 00091 int 00092 selectRW(SelectObjectArray& selarray, UInt32 ms) 00093 { 00094 return selectRW(selarray, Timeout::relative(static_cast<float>(ms) * 1000)); 00095 } 00096 00097 #if defined(BLOCXX_WIN32) 00098 00099 int 00100 selectRW(SelectObjectArray& selarray, const Timeout& timeout) 00101 { 00102 int rc; 00103 size_t hcount = static_cast<DWORD>(selarray.size()); 00104 AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]); 00105 00106 size_t handleidx = 0; 00107 for (size_t i = 0; i < selarray.size(); i++, handleidx++) 00108 { 00109 if(selarray[i].s.isSocket && selarray[i].s.networkevents) 00110 { 00111 ::WSAEventSelect(selarray[i].s.sockfd, 00112 selarray[i].s.event, selarray[i].s.networkevents); 00113 } 00114 00115 hdls[handleidx] = selarray[i].s.event; 00116 } 00117 00118 TimeoutTimer timer(timeout); 00119 timer.start(); 00120 DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timer.asDWORDMs()); 00121 00122 assert(cc != WAIT_ABANDONED); 00123 00124 switch (cc) 00125 { 00126 case WAIT_FAILED: 00127 rc = Select::SELECT_ERROR; 00128 break; 00129 case WAIT_TIMEOUT: 00130 rc = Select::SELECT_TIMEOUT; 00131 break; 00132 default: 00133 rc = cc - WAIT_OBJECT_0; 00134 00135 // If this is a socket, set it back to 00136 // blocking mode 00137 if(selarray[rc].s.isSocket) 00138 { 00139 if(selarray[rc].s.networkevents 00140 && selarray[rc].s.doreset == false) 00141 { 00142 ::WSAEventSelect(selarray[rc].s.sockfd, 00143 selarray[rc].s.event, selarray[rc].s.networkevents); 00144 } 00145 else 00146 { 00147 // Set socket back to blocking 00148 ::WSAEventSelect(selarray[rc].s.sockfd, 00149 selarray[rc].s.event, 0); 00150 u_long ioctlarg = 0; 00151 ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg); 00152 } 00153 } 00154 break; 00155 } 00156 00157 if( rc < 0 ) 00158 return rc; 00159 00160 int availableCount = 0; 00161 for (size_t i = 0; i < selarray.size(); i++) 00162 { 00163 if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 ) 00164 { 00165 if( selarray[i].waitForRead ) 00166 selarray[i].readAvailable = true; 00167 if( selarray[i].waitForWrite ) 00168 selarray[i].writeAvailable = true; 00169 ++availableCount; 00170 } 00171 else 00172 { 00173 selarray[i].readAvailable = false; 00174 selarray[i].writeAvailable = false; 00175 } 00176 } 00177 return availableCount; 00178 } 00179 00180 00181 #else 00182 00184 // epoll version 00185 int 00186 selectRWEpoll(SelectObjectArray& selarray, const Timeout& timeout) 00187 { 00188 #ifdef BLOCXX_HAVE_SYS_EPOLL_H 00189 int ecc = 0; 00190 AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]); 00191 AutoDescriptor epfd(epoll_create(selarray.size())); 00192 if(epfd.get() == -1) 00193 { 00194 if (errno == ENOSYS) // kernel doesn't support it 00195 { 00196 return SELECT_NOT_IMPLEMENTED; 00197 } 00198 // Need to return something else? 00199 return Select::SELECT_ERROR; 00200 } 00201 00202 UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP; 00203 UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP; 00204 for (size_t i = 0; i < selarray.size(); i++) 00205 { 00206 BLOCXX_ASSERT(selarray[i].s >= 0); 00207 selarray[i].readAvailable = false; 00208 selarray[i].writeAvailable = false; 00209 selarray[i].wasError = false; 00210 events[i].data = epoll_data_t(); // zero-init to make valgrind happy 00211 events[i].data.u32 = i; 00212 events[i].events = 0; 00213 if(selarray[i].waitForRead) 00214 { 00215 events[i].events |= read_events; 00216 } 00217 if(selarray[i].waitForWrite) 00218 { 00219 events[i].events |= write_events; 00220 } 00221 00222 if(epoll_ctl(epfd.get(), EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0) 00223 { 00224 return errno == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR; 00225 } 00226 } 00227 00228 // here we spin checking for thread cancellation every so often. 00229 00230 TimeoutTimer timer(timeout); 00231 timer.start(); 00232 int savedErrno; 00233 do 00234 { 00235 Thread::testCancel(); 00236 const float maxWaitSec = LOOP_TIMEOUT; 00237 ecc = epoll_wait(epfd.get(), events.get(), selarray.size(), timer.asIntMs(maxWaitSec)); 00238 savedErrno = errno; 00239 if (ecc < 0 && errno == EINTR) 00240 { 00241 ecc = 0; 00242 errno = 0; 00243 Thread::testCancel(); 00244 } 00245 timer.loop(); 00246 } while ((ecc == 0) && !timer.expired()); 00247 00248 if (ecc < 0) 00249 { 00250 errno = savedErrno; 00251 return Select::SELECT_ERROR; 00252 } 00253 if (ecc == 0) 00254 { 00255 return Select::SELECT_TIMEOUT; 00256 } 00257 00258 for(int i = 0; i < ecc; i++) 00259 { 00260 SelectObject & so = selarray[events[i].data.u32]; 00261 so.readAvailable = so.waitForRead && (events[i].events & read_events); 00262 so.writeAvailable = so.waitForWrite && (events[i].events & write_events); 00263 } 00264 00265 return ecc; 00266 #else 00267 return SELECT_NOT_IMPLEMENTED; 00268 #endif 00269 } 00270 00272 // poll() version 00273 int 00274 selectRWPoll(SelectObjectArray& selarray, const Timeout& timeout) 00275 { 00276 #if defined (BLOCXX_HAVE_SYS_POLL_H) 00277 int rc = 0; 00278 00279 AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]); 00280 00281 // here we spin checking for thread cancellation every so often. 00282 TimeoutTimer timer(timeout); 00283 timer.start(); 00284 00285 int savedErrno; 00286 do 00287 { 00288 for (size_t i = 0; i < selarray.size(); i++) 00289 { 00290 BLOCXX_ASSERT(selarray[i].s >= 0); 00291 selarray[i].readAvailable = false; 00292 selarray[i].writeAvailable = false; 00293 selarray[i].wasError = false; 00294 pfds[i].revents = 0; 00295 pfds[i].fd = selarray[i].s; 00296 pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0; 00297 if(selarray[i].waitForWrite) 00298 pfds[i].events |= POLLOUT; 00299 } 00300 00301 Thread::testCancel(); 00302 const float maxWaitSec = LOOP_TIMEOUT; 00303 rc = ::poll(pfds.get(), selarray.size(), timer.asIntMs(maxWaitSec)); 00304 savedErrno = errno; 00305 if (rc < 0 && errno == EINTR) 00306 { 00307 rc = 0; 00308 errno = 0; 00309 Thread::testCancel(); 00310 #ifdef BLOCXX_NETWARE 00311 // When the NetWare server is shutting down, select will 00312 // set errno to EINTR on return. If this thread does not 00313 // yield control (cooperative multitasking) then we end 00314 // up in a very tight loop and get a CPUHog server abbend. 00315 pthread_yield(); 00316 #endif 00317 } 00318 00319 timer.loop(); 00320 } while ((rc == 0) && !timer.expired()); 00321 00322 if (rc < 0) 00323 { 00324 errno = savedErrno; 00325 return Select::SELECT_ERROR; 00326 } 00327 if (rc == 0) 00328 { 00329 return Select::SELECT_TIMEOUT; 00330 } 00331 for (size_t i = 0; i < selarray.size(); i++) 00332 { 00333 if (pfds[i].revents & (POLLERR | POLLNVAL)) 00334 { 00335 selarray[i].wasError = true; 00336 } 00337 00338 if(selarray[i].waitForRead) 00339 { 00340 selarray[i].readAvailable = (pfds[i].revents & 00341 (POLLIN | POLLPRI | POLLHUP)); 00342 } 00343 00344 if(selarray[i].waitForWrite) 00345 { 00346 selarray[i].writeAvailable = (pfds[i].revents & 00347 (POLLOUT | POLLHUP)); 00348 } 00349 } 00350 00351 return rc; 00352 #else 00353 return SELECT_NOT_IMPLEMENTED; 00354 #endif 00355 } 00357 // ::select() version 00358 int 00359 selectRWSelect(SelectObjectArray& selarray, const Timeout& timeout) 00360 { 00361 #if defined (BLOCXX_HAVE_SYS_SELECT_H) 00362 int rc = 0; 00363 fd_set ifds; 00364 fd_set ofds; 00365 00366 // here we spin checking for thread cancellation every so often. 00367 TimeoutTimer timer(timeout); 00368 timer.start(); 00369 00370 int savedErrno; 00371 do 00372 { 00373 int maxfd = 0; 00374 FD_ZERO(&ifds); 00375 FD_ZERO(&ofds); 00376 for (size_t i = 0; i < selarray.size(); ++i) 00377 { 00378 int fd = selarray[i].s; 00379 BLOCXX_ASSERT(fd >= 0); 00380 if (maxfd < fd) 00381 { 00382 maxfd = fd; 00383 } 00384 if (fd < 0 || fd >= FD_SETSIZE) 00385 { 00386 errno = EINVAL; 00387 return Select::SELECT_ERROR; 00388 } 00389 if (selarray[i].waitForRead) 00390 { 00391 FD_SET(fd, &ifds); 00392 } 00393 if (selarray[i].waitForWrite) 00394 { 00395 FD_SET(fd, &ofds); 00396 } 00397 } 00398 00399 Thread::testCancel(); 00400 struct timeval tv; 00401 const float maxWaitSec = LOOP_TIMEOUT; 00402 rc = ::select(maxfd+1, &ifds, &ofds, NULL, timer.asTimeval(tv, maxWaitSec)); 00403 savedErrno = errno; 00404 if (rc < 0 && errno == EINTR) 00405 { 00406 rc = 0; 00407 errno = 0; 00408 Thread::testCancel(); 00409 #ifdef BLOCXX_NETWARE 00410 // When the NetWare server is shutting down, select will 00411 // set errno to EINTR on return. If this thread does not 00412 // yield control (cooperative multitasking) then we end 00413 // up in a very tight loop and get a CPUHog server abbend. 00414 pthread_yield(); 00415 #endif 00416 } 00417 00418 timer.loop(); 00419 } while ((rc == 0) && !timer.expired()); 00420 00421 if (rc < 0) 00422 { 00423 errno = savedErrno; 00424 return Select::SELECT_ERROR; 00425 } 00426 if (rc == 0) 00427 { 00428 return Select::SELECT_TIMEOUT; 00429 } 00430 int availableCount = 0; 00431 int cval; 00432 for (size_t i = 0; i < selarray.size(); i++) 00433 { 00434 selarray[i].wasError = false; 00435 cval = 0; 00436 if (FD_ISSET(selarray[i].s, &ifds)) 00437 { 00438 selarray[i].readAvailable = true; 00439 cval = 1; 00440 } 00441 else 00442 { 00443 selarray[i].readAvailable = false; 00444 } 00445 00446 if (FD_ISSET(selarray[i].s, &ofds)) 00447 { 00448 selarray[i].writeAvailable = true; 00449 cval = 1; 00450 } 00451 else 00452 { 00453 selarray[i].writeAvailable = false; 00454 } 00455 00456 availableCount += cval; 00457 00458 } 00459 00460 return availableCount; 00461 #else 00462 return SELECT_NOT_IMPLEMENTED; 00463 #endif 00464 } 00465 00466 int 00467 selectRW(SelectObjectArray& selarray, const Timeout& timeout) 00468 { 00469 int rv = selectRWEpoll(selarray, timeout); 00470 if (rv != SELECT_NOT_IMPLEMENTED) 00471 { 00472 return rv; 00473 } 00474 00475 rv = selectRWPoll(selarray, timeout); 00476 if (rv != SELECT_NOT_IMPLEMENTED) 00477 { 00478 return rv; 00479 } 00480 00481 rv = selectRWSelect(selarray, timeout); 00482 BLOCXX_ASSERT(rv != SELECT_NOT_IMPLEMENTED); 00483 return rv; 00484 } 00485 00487 #endif // #else BLOCXX_WIN32 00488 00489 int 00490 select(const SelectTypeArray& selarray, UInt32 ms) 00491 { 00492 return select(selarray, Timeout::relative(static_cast<float>(ms) * 1000.0)); 00493 } 00494 00496 int 00497 select(const SelectTypeArray& selarray, const Timeout& timeout) 00498 { 00499 SelectObjectArray soa; 00500 soa.reserve(selarray.size()); 00501 for (size_t i = 0; i < selarray.size(); ++i) 00502 { 00503 SelectObject curObj(selarray[i]); 00504 curObj.waitForRead = true; 00505 soa.push_back(curObj); 00506 } 00507 int rv = selectRW(soa, timeout); 00508 if (rv < 0) 00509 { 00510 return rv; 00511 } 00512 00513 // find the first selected object 00514 for (size_t i = 0; i < soa.size(); ++i) 00515 { 00516 if (soa[i].readAvailable) 00517 { 00518 return i; 00519 } 00520 } 00521 errno = 0; 00522 return SELECT_ERROR; 00523 } 00524 00525 } // end namespace Select 00526 00527 } // end namespace BLOCXX_NAMESPACE 00528