blocxx
|
00001 /******************************************************************************* 00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved. 00003 * Copyright (C) 2006, Novell, Inc. All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright notice, 00009 * this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of 00014 * Vintela, Inc., 00015 * nor Novell, Inc., 00016 * nor the names of its contributors or employees may be used to 00017 * endorse or promote products derived from this software without 00018 * specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00030 * POSSIBILITY OF SUCH DAMAGE. 00031 *******************************************************************************/ 00032 00033 00039 #include "blocxx/BLOCXX_config.h" 00040 00041 #if !defined(BLOCXX_WIN32) 00042 00043 #include "blocxx/PosixUnnamedPipe.hpp" 00044 #include "blocxx/AutoPtr.hpp" 00045 #include "blocxx/IOException.hpp" 00046 #include "blocxx/Format.hpp" 00047 #include "blocxx/SocketUtils.hpp" 00048 #include "blocxx/Assertion.hpp" 00049 #include "blocxx/DescriptorUtils.hpp" 00050 #include "blocxx/SignalScope.hpp" 00051 #include "blocxx/Logger.hpp" 00052 #include "blocxx/GlobalString.hpp" 00053 00054 00055 #include "blocxx/Thread.hpp" 00056 #ifdef BLOCXX_HAVE_UNISTD_H 00057 #include <unistd.h> 00058 #endif 00059 #include <sys/socket.h> 00060 #include <sys/types.h> 00061 00062 #include <fcntl.h> 00063 #include <errno.h> 00064 #include <cstring> 00065 00066 #if defined(BLOCXX_DARWIN) 00067 // Necessary for detecting the kernel version in order to activate the descriptor passing workaround. 00068 #include "blocxx/ThreadOnce.hpp" 00069 #include "blocxx/PosixRegEx.hpp" 00070 #include <sys/utsname.h> 00071 #endif 00072 00073 00074 namespace BLOCXX_NAMESPACE 00075 { 00076 00077 namespace 00078 { 00079 int upclose(int fd) 00080 { 00081 int rc; 00082 do 00083 { 00084 rc = ::close(fd); 00085 } while (rc < 0 && errno == EINTR); 00086 if (rc == -1) 00087 { 00088 int lerrno = errno; 00089 Logger lgr("blocxx"); 00090 BLOCXX_LOG_ERROR(lgr, Format("Closing pipe handle %1 failed: %2", fd, lerrno)); 00091 } 00092 return rc; 00093 } 00094 00095 ::ssize_t upread(int fd, void * buf, std::size_t count) 00096 { 00097 ::ssize_t rv; 00098 do 00099 { 00100 Thread::testCancel(); 00101 rv = ::read(fd, buf, count); 00102 } while (rv < 0 && errno == EINTR); 00103 return rv; 00104 } 00105 00106 ::ssize_t upwrite(int fd, void const * buf, std::size_t count) 00107 { 00108 ::ssize_t rv; 00109 // block SIGPIPE so we don't kill the process if the pipe is closed. 00110 SignalScope ss(SIGPIPE, SIG_IGN); 00111 do 00112 { 00113 Thread::testCancel(); 00114 rv = ::write(fd, buf, count); 00115 } while (rv < 0 && errno == EINTR); 00116 return rv; 00117 } 00118 00119 int upaccept(int s, struct sockaddr * addr, socklen_t * addrlen) 00120 { 00121 int rv; 00122 do 00123 { 00124 rv = ::accept(s, addr, addrlen); 00125 } while (rv < 0 && errno == EINTR); 00126 return rv; 00127 } 00128 enum EDirection 00129 { 00130 E_WRITE_PIPE, E_READ_PIPE 00131 }; 00132 00133 // bufsz MUST be an int, and not some other integral type (address taken) 00134 // 00135 void setKernelBufferSize(Descriptor sockfd, int bufsz, EDirection edir) 00136 { 00137 if (sockfd == BLOCXX_INVALID_HANDLE) 00138 { 00139 return; 00140 } 00141 00142 int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF); 00143 00144 int getbufsz; 00145 socklen_t getbufsz_len = sizeof(getbufsz); 00146 00147 #ifdef BLOCXX_NCR 00148 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (char*)&getbufsz, &getbufsz_len); 00149 #else 00150 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len); 00151 #endif 00152 if (errc == 0 && getbufsz < bufsz) 00153 { 00154 #ifdef BLOCXX_NCR 00155 ::setsockopt(sockfd, SOL_SOCKET, optname, (char*)&bufsz, sizeof(bufsz)); 00156 #else 00157 ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz, sizeof(bufsz)); 00158 #endif 00159 } 00160 } 00161 00162 void setDefaultKernelBufsz(Descriptor sockfd_read, Descriptor sockfd_write) 00163 { 00164 int const BUFSZ = 64 * 1024; 00165 setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE); 00166 setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE); 00167 } 00168 00169 GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.PosixUnnamedPipe"); 00170 00171 #if defined(BLOCXX_DARWIN) 00172 // Mac OS X < 10.5 has a kernel bug related to passing descriptors. As a workaround, descriptors are passed synchronously. 00173 // This variable determines whether the workaround will be used. It will be set to false by detectDescriptorPassingBug() 00174 bool needDescriptorPassingWorkaround = true; 00175 00176 // This is the control to ensure that detectDescriptorPassingBug() is only called once. 00177 OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT; 00178 00179 // This function can not have logging statements or they will be sent over the pipe before sending the ACK. 00180 void detectDescriptorPassingBug() 00181 { 00182 // until OS X 10.5 is actually released, assume it will be broken (even though Apple said it is fixed) 00183 needDescriptorPassingWorkaround = true; 00184 return; 00185 #if 0 00186 // if uname() reports the version as < 9.0.0 then we'll need the workaround. 00187 struct utsname unamerv; 00188 if (::uname(&unamerv) == -1) 00189 { 00190 needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary. 00191 return; 00192 } 00193 String release(unamerv.release); 00194 PosixRegEx re("([^.]*)\\..*"); 00195 StringArray releaseCapture = re.capture(release); 00196 if (releaseCapture.size() < 2) 00197 { 00198 needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary. 00199 return; 00200 } 00201 String majorRelease = releaseCapture[1]; 00202 try 00203 { 00204 needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9); 00205 } 00206 catch (StringConversionException& e) 00207 { 00208 needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary. 00209 return; 00210 } 00211 #endif 00212 } 00213 #endif 00214 00215 } 00216 00217 #ifdef BLOCXX_NETWARE 00218 namespace 00219 { 00220 class AcceptThread 00221 { 00222 public: 00223 AcceptThread(int serversock) 00224 : m_serversock(serversock) 00225 , m_serverconn(-1) 00226 { 00227 } 00228 00229 void acceptConnection(); 00230 int getConnectFD() { return m_serverconn; } 00231 private: 00232 int m_serversock; 00233 int m_serverconn; 00234 }; 00235 00236 void 00237 AcceptThread::acceptConnection() 00238 { 00239 struct sockaddr_in sin; 00240 size_t val; 00241 int tmp = 1; 00242 00243 tmp = 1; 00244 ::setsockopt(m_serversock, IPPROTO_TCP, 1, // #define TCP_NODELAY 1 00245 (char*) &tmp, sizeof(int)); 00246 00247 val = sizeof(struct sockaddr_in); 00248 if ((m_serverconn = upaccept(m_serversock, (struct sockaddr*)&sin, &val)) 00249 == -1) 00250 { 00251 return; 00252 } 00253 tmp = 1; 00254 ::setsockopt(m_serverconn, IPPROTO_TCP, 1, // #define TCP_NODELAY 1 00255 (char *) &tmp, sizeof(int)); 00256 tmp = 0; 00257 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE, 00258 (char*) &tmp, sizeof(int)); 00259 } 00260 00261 void* 00262 runConnClass(void* arg) 00263 { 00264 AcceptThread* acceptThread = (AcceptThread*)(arg); 00265 acceptThread->acceptConnection(); 00266 ::pthread_exit(NULL); 00267 return 0; 00268 } 00269 00270 int 00271 _pipe(int *fds) 00272 { 00273 int svrfd, lerrno, connectfd; 00274 size_t val; 00275 struct sockaddr_in sin; 00276 00277 svrfd = socket( AF_INET, SOCK_STREAM, 0 ); 00278 sin.sin_family = AF_INET; 00279 sin.sin_addr.s_addr = htonl( 0x7f000001 ); // loopback 00280 sin.sin_port = 0; 00281 memset(sin.sin_zero, 0, 8 ); 00282 if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1) 00283 { 00284 int lerrno = errno; 00285 upclose(svrfd); 00286 fprintf(stderr, "CreateSocket(): Failed to bind on socket" ); 00287 return -1; 00288 } 00289 if (listen(svrfd, 1) == -1) 00290 { 00291 int lerrno = errno; 00292 upclose(svrfd); 00293 return -1; 00294 } 00295 val = sizeof(struct sockaddr_in); 00296 if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1) 00297 { 00298 int lerrno = errno; 00299 fprintf(stderr, "CreateSocket(): Failed to obtain socket name" ); 00300 upclose(svrfd); 00301 return -1; 00302 } 00303 00304 AcceptThread* pat = new AcceptThread(svrfd); 00305 pthread_t athread; 00306 // Start thread that will accept connection on svrfd. 00307 // Once a connection is made the thread will exit. 00308 pthread_create(&athread, NULL, runConnClass, pat); 00309 00310 int clientfd = socket(AF_INET, SOCK_STREAM, 0); 00311 if (clientfd == -1) 00312 { 00313 delete pat; 00314 return -1; 00315 } 00316 00317 // Connect to server 00318 struct sockaddr_in csin; 00319 csin.sin_family = AF_INET; 00320 csin.sin_addr.s_addr = htonl(0x7f000001); // loopback 00321 csin.sin_port = sin.sin_port; 00322 if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1) 00323 { 00324 delete pat; 00325 return -1; 00326 } 00327 00328 #define TCP_NODELAY 1 00329 int tmp = 1; 00330 // 00331 // Set for Non-blocking writes and disable keepalive 00332 // 00333 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int)); 00334 tmp = 0; 00335 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int)); 00336 00337 void* threadResult; 00338 // Wait for accept thread to terminate 00339 ::pthread_join(athread, &threadResult); 00340 00341 upclose(svrfd); 00342 fds[0] = pat->getConnectFD(); 00343 fds[1] = clientfd; 00344 delete pat; 00345 return 0; 00346 } 00347 } 00348 #endif // BLOCXX_NETWARE 00349 00351 PosixUnnamedPipe::PosixUnnamedPipe(EOpen doOpen) 00352 { 00353 m_fds[0] = m_fds[1] = BLOCXX_INVALID_HANDLE; 00354 if (doOpen) 00355 { 00356 open(); 00357 } 00358 setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen() 00359 setBlocking(E_BLOCKING); // necessary to set the pipes up right. 00360 } 00361 00363 PosixUnnamedPipe::PosixUnnamedPipe(AutoDescriptor inputfd, AutoDescriptor outputfd) 00364 { 00365 m_fds[0] = inputfd.get(); 00366 m_fds[1] = outputfd.get(); 00367 setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen() 00368 setBlocking(E_BLOCKING); 00369 setDefaultKernelBufsz(m_fds[0], m_fds[1]); 00370 inputfd.release(); 00371 outputfd.release(); 00372 } 00373 00375 PosixUnnamedPipe::~PosixUnnamedPipe() 00376 { 00377 close(); 00378 } 00380 namespace 00381 { 00382 typedef UnnamedPipe::EBlockingMode EBlockingMode; 00383 00384 void set_desc_blocking( 00385 int d, EBlockingMode & bmflag, EBlockingMode blocking_mode) 00386 { 00387 BLOCXX_ASSERT(d != BLOCXX_INVALID_HANDLE); 00388 int fdflags = fcntl(d, F_GETFL, 0); 00389 if (fdflags == -1) 00390 { 00391 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode"); 00392 } 00393 if (blocking_mode == UnnamedPipe::E_BLOCKING) 00394 { 00395 fdflags &= ~O_NONBLOCK; 00396 } 00397 else 00398 { 00399 fdflags |= O_NONBLOCK; 00400 } 00401 if (fcntl(d, F_SETFL, fdflags) == -1) 00402 { 00403 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode"); 00404 } 00405 bmflag = blocking_mode; 00406 } 00407 } 00409 void 00410 PosixUnnamedPipe::setBlocking(EBlockingMode blocking_mode) 00411 { 00412 BLOCXX_ASSERT(m_fds[0] != BLOCXX_INVALID_HANDLE || m_fds[1] != BLOCXX_INVALID_HANDLE); 00413 00414 for (size_t i = 0; i < 2; ++i) 00415 { 00416 if (m_fds[i] != -1) 00417 { 00418 set_desc_blocking(m_fds[i], m_blocking[i], blocking_mode); 00419 } 00420 } 00421 } 00423 void 00424 PosixUnnamedPipe::setWriteBlocking(EBlockingMode blocking_mode) 00425 { 00426 set_desc_blocking(m_fds[1], m_blocking[1], blocking_mode); 00427 } 00429 void 00430 PosixUnnamedPipe::setReadBlocking(EBlockingMode blocking_mode) 00431 { 00432 set_desc_blocking(m_fds[0], m_blocking[0], blocking_mode); 00433 } 00435 void 00436 PosixUnnamedPipe::open() 00437 { 00438 if (m_fds[0] != BLOCXX_INVALID_HANDLE) 00439 { 00440 close(); 00441 } 00442 #if defined(BLOCXX_NETWARE) 00443 if (_pipe(m_fds) == BLOCXX_INVALID_HANDLE) 00444 { 00445 m_fds[0] = m_fds[1] = BLOCXX_INVALID_HANDLE; 00446 BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()"); 00447 } 00448 00449 #else 00450 if (::socketpair(AF_UNIX, SOCK_STREAM, 0, m_fds) == -1) 00451 { 00452 m_fds[0] = m_fds[1] = -1; 00453 BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()"); 00454 } 00455 ::shutdown(m_fds[0], SHUT_WR); 00456 ::shutdown(m_fds[1], SHUT_RD); 00457 setDefaultKernelBufsz(m_fds[0], m_fds[1]); 00458 #endif 00459 } 00461 int 00462 PosixUnnamedPipe::close() 00463 { 00464 int rc = -1; 00465 00466 // handle the case where both input and output are the same descriptor. It can't be closed twice. 00467 if (m_fds[0] == m_fds[1]) 00468 { 00469 m_fds[1] = BLOCXX_INVALID_HANDLE; 00470 } 00471 00472 if (m_fds[0] != BLOCXX_INVALID_HANDLE) 00473 { 00474 00475 rc = upclose(m_fds[0]); 00476 m_fds[0] = BLOCXX_INVALID_HANDLE; 00477 } 00478 00479 if (m_fds[1] != BLOCXX_INVALID_HANDLE) 00480 { 00481 rc = upclose(m_fds[1]); 00482 m_fds[1] = BLOCXX_INVALID_HANDLE; 00483 } 00484 00485 return rc; 00486 } 00488 bool 00489 PosixUnnamedPipe::isOpen() const 00490 { 00491 return (m_fds[0] != BLOCXX_INVALID_HANDLE) || (m_fds[1] != BLOCXX_INVALID_HANDLE); 00492 } 00493 00495 int 00496 PosixUnnamedPipe::closeInputHandle() 00497 { 00498 int rc = -1; 00499 if (m_fds[0] != BLOCXX_INVALID_HANDLE) 00500 { 00501 if (m_fds[0] != m_fds[1]) 00502 { 00503 rc = upclose(m_fds[0]); 00504 } 00505 m_fds[0] = BLOCXX_INVALID_HANDLE; 00506 } 00507 return rc; 00508 } 00510 int 00511 PosixUnnamedPipe::closeOutputHandle() 00512 { 00513 int rc = -1; 00514 if (m_fds[1] != BLOCXX_INVALID_HANDLE) 00515 { 00516 if (m_fds[0] != m_fds[1]) 00517 { 00518 rc = upclose(m_fds[1]); 00519 } 00520 m_fds[1] = BLOCXX_INVALID_HANDLE; 00521 } 00522 return rc; 00523 } 00525 int 00526 PosixUnnamedPipe::write(const void* data, int dataLen, ErrorAction errorAsException) 00527 { 00528 int rc = -1; 00529 if (m_fds[1] != BLOCXX_INVALID_HANDLE) 00530 { 00531 if (m_blocking[1] == E_BLOCKING) 00532 { 00533 rc = SocketUtils::waitForIO(m_fds[1], getWriteTimeout(), SocketFlags::E_WAIT_FOR_OUTPUT); 00534 if (rc != 0) 00535 { 00536 if (rc == ETIMEDOUT) 00537 { 00538 errno = ETIMEDOUT; 00539 } 00540 if (errorAsException == E_THROW_ON_ERROR) 00541 { 00542 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed."); 00543 } 00544 else 00545 { 00546 return -1; 00547 } 00548 } 00549 } 00550 rc = upwrite(m_fds[1], data, dataLen); 00551 } 00552 if (errorAsException == E_THROW_ON_ERROR && rc == -1) 00553 { 00554 if (m_fds[1] == BLOCXX_INVALID_HANDLE) 00555 { 00556 BLOCXX_THROW(IOException, "pipe write failed because pipe is closed"); 00557 } 00558 else 00559 { 00560 BLOCXX_THROW_ERRNO_MSG(IOException, "pipe write failed"); 00561 } 00562 } 00563 return rc; 00564 } 00566 int 00567 PosixUnnamedPipe::read(void* buffer, int bufferLen, ErrorAction errorAsException) 00568 { 00569 int rc = -1; 00570 if (m_fds[0] != BLOCXX_INVALID_HANDLE) 00571 { 00572 if (m_blocking[0] == E_BLOCKING) 00573 { 00574 rc = SocketUtils::waitForIO(m_fds[0], getReadTimeout(), SocketFlags::E_WAIT_FOR_INPUT); 00575 if (rc != 0) 00576 { 00577 if (rc == ETIMEDOUT) 00578 { 00579 errno = ETIMEDOUT; 00580 } 00581 if (errorAsException == E_THROW_ON_ERROR) 00582 { 00583 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed."); 00584 } 00585 else 00586 { 00587 return -1; 00588 } 00589 } 00590 } 00591 rc = upread(m_fds[0], buffer, bufferLen); 00592 } 00593 00594 if (rc == 0) 00595 { 00596 closeInputHandle(); 00597 } 00598 00599 if (errorAsException == E_THROW_ON_ERROR && rc == -1) 00600 { 00601 if (m_fds[0] == BLOCXX_INVALID_HANDLE) 00602 { 00603 BLOCXX_THROW(IOException, "pipe read failed because pipe is closed"); 00604 } 00605 else 00606 { 00607 BLOCXX_THROW_ERRNO_MSG(IOException, "pipe read failed"); 00608 } 00609 } 00610 return rc; 00611 } 00613 Select_t 00614 PosixUnnamedPipe::getReadSelectObj() const 00615 { 00616 return m_fds[0]; 00617 } 00618 00620 Select_t 00621 PosixUnnamedPipe::getWriteSelectObj() const 00622 { 00623 return m_fds[1]; 00624 } 00625 00627 void 00628 PosixUnnamedPipe::passDescriptor(Descriptor descriptor, const UnnamedPipeRef& ackPipe, const ProcessRef& targetProcess) 00629 { 00630 int rc = -1; 00631 if (m_fds[1] != BLOCXX_INVALID_HANDLE) 00632 { 00633 if (m_blocking[1] == E_BLOCKING) 00634 { 00635 rc = SocketUtils::waitForIO(m_fds[1], getWriteTimeout(), SocketFlags::E_WAIT_FOR_OUTPUT); 00636 00637 if (rc != 0) 00638 { 00639 if (rc == ETIMEDOUT) 00640 { 00641 errno = ETIMEDOUT; 00642 } 00643 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed."); 00644 } 00645 } 00646 00647 rc = blocxx::passDescriptor(m_fds[1], descriptor); 00648 if (rc == -1) 00649 { 00650 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: passDescriptor()"); 00651 } 00652 00653 #if defined(BLOCXX_DARWIN) 00654 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug); 00655 if (rc != -1 && needDescriptorPassingWorkaround) 00656 { 00657 // This ignores the blocking and timeouts, because this ACK shouldn't timeout. 00658 rc = SocketUtils::waitForIO(ackPipe->getInputDescriptor(), Timeout::infinite, SocketFlags::E_WAIT_FOR_INPUT); 00659 if (rc != -1) 00660 { 00661 char ack = 'Z'; 00662 rc = ackPipe->read(&ack, sizeof(ack), E_RETURN_ON_ERROR); 00663 if (rc == -1) 00664 { 00665 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: ackPipe->read()"); 00666 } 00667 if (ack != 'A') 00668 { 00669 BLOCXX_THROW(IOException, Format("sendDescriptor() failed: ackPipe->read() didn't get 'A', got %1", static_cast<int>(ack)).c_str()); 00670 } 00671 } 00672 else 00673 { 00674 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: waitForIO()"); 00675 } 00676 } 00677 #endif 00678 } 00679 if (rc == -1) 00680 { 00681 if (m_fds[1] == BLOCXX_INVALID_HANDLE) 00682 { 00683 BLOCXX_THROW(IOException, "sendDescriptor() failed because pipe is closed"); 00684 } 00685 else 00686 { 00687 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed"); 00688 } 00689 } 00690 } 00691 00693 AutoDescriptor 00694 PosixUnnamedPipe::receiveDescriptor(const UnnamedPipeRef& ackPipe) 00695 { 00696 int rc = -1; 00697 AutoDescriptor descriptor; 00698 if (m_fds[0] != BLOCXX_INVALID_HANDLE) 00699 { 00700 if (m_blocking[0] == E_BLOCKING) 00701 { 00702 rc = SocketUtils::waitForIO(m_fds[0], getReadTimeout(), SocketFlags::E_WAIT_FOR_INPUT); 00703 00704 if (rc != 0) 00705 { 00706 if (rc == ETIMEDOUT) 00707 { 00708 errno = ETIMEDOUT; 00709 } 00710 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed."); 00711 } 00712 } 00713 descriptor = blocxx::receiveDescriptor(m_fds[0]); 00714 00715 #if defined(BLOCXX_DARWIN) 00716 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug); 00717 if (needDescriptorPassingWorkaround) 00718 { 00719 // This ignores the blocking and timeouts, because this ACK shouldn't timeout. 00720 rc = SocketUtils::waitForIO(ackPipe->getOutputDescriptor(), Timeout::infinite, SocketFlags::E_WAIT_FOR_OUTPUT); 00721 if (rc != -1) 00722 { 00723 char ack = 'A'; 00724 ackPipe->write(&ack, sizeof(ack), E_THROW_ON_ERROR); 00725 } 00726 } 00727 #endif 00728 } 00729 else 00730 { 00731 BLOCXX_THROW(IOException, "receiveDescriptor() failed because pipe is closed"); 00732 } 00733 return descriptor; 00734 } 00735 00737 Descriptor 00738 PosixUnnamedPipe::getInputDescriptor() const 00739 { 00740 return m_fds[0]; 00741 } 00742 00744 Descriptor 00745 PosixUnnamedPipe::getOutputDescriptor() const 00746 { 00747 return m_fds[1]; 00748 } 00749 00751 EBlockingMode 00752 PosixUnnamedPipe::getReadBlocking() const 00753 { 00754 return m_blocking[0]; 00755 } 00756 00758 EBlockingMode 00759 PosixUnnamedPipe::getWriteBlocking() const 00760 { 00761 return m_blocking[1]; 00762 } 00763 00764 } // end namespace BLOCXX_NAMESPACE 00765 00766 #endif