blocxx

PosixUnnamedPipe.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2006, Novell, Inc. All rights reserved.
00004 * 
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 * 
00008 *     * Redistributions of source code must retain the above copyright notice,
00009 *       this list of conditions and the following disclaimer.
00010 *     * Redistributions in binary form must reproduce the above copyright
00011 *       notice, this list of conditions and the following disclaimer in the
00012 *       documentation and/or other materials provided with the distribution.
00013 *     * Neither the name of 
00014 *       Vintela, Inc., 
00015 *       nor Novell, Inc., 
00016 *       nor the names of its contributors or employees may be used to 
00017 *       endorse or promote products derived from this software without 
00018 *       specific prior written permission.
00019 * 
00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030 * POSSIBILITY OF SUCH DAMAGE.
00031 *******************************************************************************/
00032 
00033 
00039 #include "blocxx/BLOCXX_config.h"
00040 
00041 #if !defined(BLOCXX_WIN32) 
00042 
00043 #include "blocxx/PosixUnnamedPipe.hpp"
00044 #include "blocxx/AutoPtr.hpp"
00045 #include "blocxx/IOException.hpp"
00046 #include "blocxx/Format.hpp"
00047 #include "blocxx/SocketUtils.hpp"
00048 #include "blocxx/Assertion.hpp"
00049 #include "blocxx/DescriptorUtils.hpp"
00050 #include "blocxx/SignalScope.hpp"
00051 #include "blocxx/Logger.hpp"
00052 #include "blocxx/GlobalString.hpp"
00053 
00054 
00055 #include "blocxx/Thread.hpp"
00056 #ifdef BLOCXX_HAVE_UNISTD_H
00057    #include <unistd.h>
00058 #endif
00059 #include <sys/socket.h>
00060 #include <sys/types.h>
00061 
00062 #include <fcntl.h>
00063 #include <errno.h>
00064 #include <cstring>
00065 
00066 #if defined(BLOCXX_DARWIN)
00067 // Necessary for detecting the kernel version in order to activate the descriptor passing workaround.
00068 #include "blocxx/ThreadOnce.hpp"
00069 #include "blocxx/PosixRegEx.hpp"
00070 #include <sys/utsname.h>
00071 #endif
00072 
00073 
00074 namespace BLOCXX_NAMESPACE
00075 {
00076 
00077 namespace
00078 {
00079    int upclose(int fd)
00080    {
00081       int rc;
00082       do
00083       {
00084          rc = ::close(fd);
00085       } while (rc < 0 && errno == EINTR);
00086       if (rc == -1)
00087       {
00088          int lerrno = errno;
00089          Logger lgr("blocxx");
00090          BLOCXX_LOG_ERROR(lgr, Format("Closing pipe handle %1 failed: %2", fd, lerrno));
00091       }
00092       return rc;
00093    }
00094 
00095    ::ssize_t upread(int fd, void * buf, std::size_t count)
00096    {
00097       ::ssize_t rv;
00098       do
00099       {
00100          Thread::testCancel();
00101          rv = ::read(fd, buf, count);
00102       } while (rv < 0 && errno == EINTR);
00103       return rv;
00104    }
00105 
00106    ::ssize_t upwrite(int fd, void const * buf, std::size_t count)
00107    {
00108       ::ssize_t rv;
00109       // block SIGPIPE so we don't kill the process if the pipe is closed.
00110       SignalScope ss(SIGPIPE, SIG_IGN);
00111       do
00112       {
00113          Thread::testCancel();
00114          rv = ::write(fd, buf, count);
00115       } while (rv < 0 && errno == EINTR);
00116       return rv;
00117    }
00118 
00119    int upaccept(int s, struct sockaddr * addr, socklen_t * addrlen)
00120    {
00121       int rv;
00122       do
00123       {
00124          rv = ::accept(s, addr, addrlen);
00125       } while (rv < 0 && errno == EINTR);
00126       return rv;
00127    }
00128    enum EDirection
00129    {
00130       E_WRITE_PIPE, E_READ_PIPE
00131    };
00132 
00133    // bufsz MUST be an int, and not some other integral type (address taken)
00134    //
00135    void setKernelBufferSize(Descriptor sockfd, int bufsz, EDirection edir)
00136    {
00137       if (sockfd == BLOCXX_INVALID_HANDLE)
00138       {
00139          return;
00140       }
00141 
00142       int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
00143 
00144       int getbufsz;
00145       socklen_t getbufsz_len = sizeof(getbufsz);
00146 
00147 #ifdef BLOCXX_NCR
00148       int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (char*)&getbufsz, &getbufsz_len);
00149 #else
00150       int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
00151 #endif
00152       if (errc == 0 && getbufsz < bufsz)
00153       {
00154 #ifdef BLOCXX_NCR
00155          ::setsockopt(sockfd, SOL_SOCKET, optname, (char*)&bufsz, sizeof(bufsz));
00156 #else
00157          ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz, sizeof(bufsz));
00158 #endif
00159       }
00160    }
00161 
00162    void setDefaultKernelBufsz(Descriptor sockfd_read, Descriptor sockfd_write)
00163    {
00164       int const BUFSZ = 64 * 1024;
00165       setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE);
00166       setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE);
00167    }
00168 
00169    GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.PosixUnnamedPipe");
00170 
00171 #if defined(BLOCXX_DARWIN)
00172    // Mac OS X < 10.5 has a kernel bug related to passing descriptors. As a workaround, descriptors are passed synchronously.
00173    // This variable determines whether the workaround will be used. It will be set to false by detectDescriptorPassingBug()
00174    bool needDescriptorPassingWorkaround = true;
00175 
00176    // This is the control to ensure that detectDescriptorPassingBug() is only called once.
00177    OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
00178 
00179    // This function can not have logging statements or they will be sent over the pipe before sending the ACK.
00180    void detectDescriptorPassingBug()
00181    {
00182       // until OS X 10.5 is actually released, assume it will be broken (even though Apple said it is fixed)
00183       needDescriptorPassingWorkaround = true;
00184       return;
00185 #if 0
00186       // if uname() reports the version as < 9.0.0 then we'll need the workaround.
00187       struct utsname unamerv;
00188       if (::uname(&unamerv) == -1)
00189       {
00190          needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
00191          return;
00192       }
00193       String release(unamerv.release);
00194       PosixRegEx re("([^.]*)\\..*");
00195       StringArray releaseCapture = re.capture(release);
00196       if (releaseCapture.size() < 2)
00197       {
00198          needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
00199          return;
00200       }
00201       String majorRelease = releaseCapture[1];
00202       try
00203       {
00204          needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9);
00205       } 
00206       catch (StringConversionException& e)
00207       {
00208          needDescriptorPassingWorkaround = true; // unknown, so just assume it's necessary.
00209          return;
00210       }
00211 #endif
00212    }
00213 #endif
00214 
00215 }
00216 
00217 #ifdef BLOCXX_NETWARE
00218 namespace
00219 {
00220 class AcceptThread
00221 {
00222 public:
00223    AcceptThread(int serversock)
00224       : m_serversock(serversock)
00225       , m_serverconn(-1)
00226    {
00227    }
00228 
00229    void acceptConnection();
00230    int getConnectFD() { return m_serverconn; }
00231 private:
00232    int m_serversock;
00233    int m_serverconn;
00234 };
00235 
00236 void
00237 AcceptThread::acceptConnection()
00238 {
00239    struct sockaddr_in sin;
00240    size_t val;
00241    int tmp = 1;
00242 
00243    tmp = 1;
00244    ::setsockopt(m_serversock, IPPROTO_TCP, 1,      // #define TCP_NODELAY 1
00245       (char*) &tmp, sizeof(int));
00246 
00247    val = sizeof(struct sockaddr_in);
00248    if ((m_serverconn = upaccept(m_serversock, (struct sockaddr*)&sin, &val))
00249       == -1)
00250    {
00251       return;
00252    }
00253    tmp = 1;
00254    ::setsockopt(m_serverconn, IPPROTO_TCP, 1, // #define TCP_NODELAY 1
00255       (char *) &tmp, sizeof(int));
00256    tmp = 0;
00257    ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
00258              (char*) &tmp, sizeof(int));
00259 }
00260 
00261 void*
00262 runConnClass(void* arg)
00263 {
00264    AcceptThread* acceptThread = (AcceptThread*)(arg);
00265    acceptThread->acceptConnection();
00266    ::pthread_exit(NULL);
00267    return 0;
00268 }
00269 
00270 int
00271 _pipe(int *fds)
00272 {
00273    int svrfd, lerrno, connectfd;
00274    size_t val;
00275    struct sockaddr_in sin;
00276 
00277    svrfd = socket( AF_INET, SOCK_STREAM, 0 );
00278    sin.sin_family = AF_INET;
00279    sin.sin_addr.s_addr = htonl( 0x7f000001 ); // loopback
00280    sin.sin_port = 0;
00281    memset(sin.sin_zero, 0, 8 );
00282    if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1)
00283    {
00284       int lerrno = errno;
00285       upclose(svrfd);
00286       fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
00287       return -1;
00288    }
00289    if (listen(svrfd, 1) == -1)
00290    {
00291       int lerrno = errno;
00292       upclose(svrfd);
00293       return -1;
00294    }
00295    val = sizeof(struct sockaddr_in);
00296    if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1)
00297    {
00298       int lerrno = errno;
00299       fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
00300       upclose(svrfd);
00301       return -1;
00302    }
00303 
00304    AcceptThread* pat = new AcceptThread(svrfd);
00305    pthread_t athread;
00306    // Start thread that will accept connection on svrfd.
00307    // Once a connection is made the thread will exit.
00308    pthread_create(&athread, NULL, runConnClass, pat);
00309 
00310    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
00311    if (clientfd == -1)
00312    {
00313       delete pat;
00314       return -1;
00315    }
00316 
00317    // Connect to server
00318    struct sockaddr_in csin;
00319    csin.sin_family = AF_INET;
00320    csin.sin_addr.s_addr = htonl(0x7f000001); // loopback
00321    csin.sin_port = sin.sin_port;
00322    if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
00323    {
00324       delete pat;
00325       return -1;
00326    }
00327 
00328 #define TCP_NODELAY 1
00329    int tmp = 1;
00330    //
00331    // Set for Non-blocking writes and disable keepalive
00332    //
00333    ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
00334    tmp = 0;
00335    ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
00336 
00337    void* threadResult;
00338    // Wait for accept thread to terminate
00339    ::pthread_join(athread, &threadResult);
00340 
00341    upclose(svrfd);
00342    fds[0] = pat->getConnectFD();
00343    fds[1] = clientfd;
00344    delete pat;
00345    return 0;
00346 }
00347 }
00348 #endif // BLOCXX_NETWARE
00349 
00351 PosixUnnamedPipe::PosixUnnamedPipe(EOpen doOpen)
00352 {
00353    m_fds[0] = m_fds[1] = BLOCXX_INVALID_HANDLE;
00354    if (doOpen)
00355    {
00356       open();
00357    }
00358    setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen()
00359    setBlocking(E_BLOCKING); // necessary to set the pipes up right.
00360 }
00361 
00363 PosixUnnamedPipe::PosixUnnamedPipe(AutoDescriptor inputfd, AutoDescriptor outputfd)
00364 {
00365    m_fds[0] = inputfd.get();
00366    m_fds[1] = outputfd.get();
00367    setTimeouts(Timeout::relative(60 * 10)); // 10 minutes. This helps break deadlocks when using safePopen()
00368    setBlocking(E_BLOCKING);
00369    setDefaultKernelBufsz(m_fds[0], m_fds[1]);
00370    inputfd.release();
00371    outputfd.release();
00372 }
00373 
00375 PosixUnnamedPipe::~PosixUnnamedPipe()
00376 {
00377    close();
00378 }
00380 namespace
00381 {
00382    typedef UnnamedPipe::EBlockingMode EBlockingMode;
00383 
00384    void set_desc_blocking(
00385       int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
00386    {
00387       BLOCXX_ASSERT(d != BLOCXX_INVALID_HANDLE);
00388       int fdflags = fcntl(d, F_GETFL, 0);
00389       if (fdflags == -1)
00390       {
00391          BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
00392       }
00393       if (blocking_mode == UnnamedPipe::E_BLOCKING)
00394       {
00395          fdflags &= ~O_NONBLOCK;
00396       }
00397       else
00398       {
00399          fdflags |= O_NONBLOCK;
00400       }
00401       if (fcntl(d, F_SETFL, fdflags) == -1)
00402       {
00403          BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
00404       }
00405       bmflag = blocking_mode;
00406    }
00407 }
00409 void
00410 PosixUnnamedPipe::setBlocking(EBlockingMode blocking_mode)
00411 {
00412    BLOCXX_ASSERT(m_fds[0] != BLOCXX_INVALID_HANDLE || m_fds[1] != BLOCXX_INVALID_HANDLE);
00413 
00414    for (size_t i = 0; i < 2; ++i)
00415    {
00416       if (m_fds[i] != -1)
00417       {
00418          set_desc_blocking(m_fds[i], m_blocking[i], blocking_mode);
00419       }
00420    }
00421 }
00423 void
00424 PosixUnnamedPipe::setWriteBlocking(EBlockingMode blocking_mode)
00425 {
00426    set_desc_blocking(m_fds[1], m_blocking[1], blocking_mode);
00427 }
00429 void
00430 PosixUnnamedPipe::setReadBlocking(EBlockingMode blocking_mode)
00431 {
00432    set_desc_blocking(m_fds[0], m_blocking[0], blocking_mode);
00433 }
00435 void
00436 PosixUnnamedPipe::open()
00437 {
00438    if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00439    {
00440       close();
00441    }
00442 #if defined(BLOCXX_NETWARE)
00443    if (_pipe(m_fds) == BLOCXX_INVALID_HANDLE)
00444    {
00445       m_fds[0] = m_fds[1] = BLOCXX_INVALID_HANDLE;
00446       BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
00447    }
00448 
00449 #else
00450    if (::socketpair(AF_UNIX, SOCK_STREAM, 0, m_fds) == -1)
00451    {
00452       m_fds[0] = m_fds[1] = -1;
00453       BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
00454    }
00455    ::shutdown(m_fds[0], SHUT_WR);
00456    ::shutdown(m_fds[1], SHUT_RD);
00457    setDefaultKernelBufsz(m_fds[0], m_fds[1]);
00458 #endif
00459 }
00461 int
00462 PosixUnnamedPipe::close()
00463 {
00464    int rc = -1;
00465 
00466    // handle the case where both input and output are the same descriptor.  It can't be closed twice.
00467    if (m_fds[0] == m_fds[1])
00468    {
00469       m_fds[1] = BLOCXX_INVALID_HANDLE;
00470    }
00471 
00472    if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00473    {
00474 
00475       rc = upclose(m_fds[0]);
00476       m_fds[0] = BLOCXX_INVALID_HANDLE;
00477    }
00478 
00479    if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00480    {
00481       rc = upclose(m_fds[1]);
00482       m_fds[1] = BLOCXX_INVALID_HANDLE;
00483    }
00484 
00485    return rc;
00486 }
00488 bool
00489 PosixUnnamedPipe::isOpen() const
00490 {
00491    return (m_fds[0] != BLOCXX_INVALID_HANDLE) || (m_fds[1] != BLOCXX_INVALID_HANDLE);
00492 }
00493 
00495 int
00496 PosixUnnamedPipe::closeInputHandle()
00497 {
00498    int rc = -1;
00499    if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00500    {
00501       if (m_fds[0] != m_fds[1])
00502       {
00503          rc = upclose(m_fds[0]);
00504       }
00505       m_fds[0] = BLOCXX_INVALID_HANDLE;
00506    }
00507    return rc;
00508 }
00510 int
00511 PosixUnnamedPipe::closeOutputHandle()
00512 {
00513    int rc = -1;
00514    if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00515    {
00516       if (m_fds[0] != m_fds[1])
00517       {
00518          rc = upclose(m_fds[1]);
00519       }
00520       m_fds[1] = BLOCXX_INVALID_HANDLE;
00521    }
00522    return rc;
00523 }
00525 int
00526 PosixUnnamedPipe::write(const void* data, int dataLen, ErrorAction errorAsException)
00527 {
00528    int rc = -1;
00529    if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00530    {
00531       if (m_blocking[1] == E_BLOCKING)
00532       {
00533          rc = SocketUtils::waitForIO(m_fds[1], getWriteTimeout(), SocketFlags::E_WAIT_FOR_OUTPUT);
00534          if (rc != 0)
00535          {
00536             if (rc == ETIMEDOUT)
00537             {
00538                errno = ETIMEDOUT;
00539             }
00540             if (errorAsException == E_THROW_ON_ERROR)
00541             {
00542                BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00543             }
00544             else
00545             {
00546                return -1;
00547             }
00548          }
00549       }
00550       rc = upwrite(m_fds[1], data, dataLen);
00551    }
00552    if (errorAsException == E_THROW_ON_ERROR && rc == -1)
00553    {
00554       if (m_fds[1] == BLOCXX_INVALID_HANDLE)
00555       {
00556          BLOCXX_THROW(IOException, "pipe write failed because pipe is closed");
00557       }
00558       else
00559       {
00560          BLOCXX_THROW_ERRNO_MSG(IOException, "pipe write failed");
00561       }
00562    }
00563    return rc;
00564 }
00566 int
00567 PosixUnnamedPipe::read(void* buffer, int bufferLen, ErrorAction errorAsException)
00568 {
00569    int rc = -1;
00570    if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00571    {
00572       if (m_blocking[0] == E_BLOCKING)
00573       {
00574          rc = SocketUtils::waitForIO(m_fds[0], getReadTimeout(), SocketFlags::E_WAIT_FOR_INPUT);
00575          if (rc != 0)
00576          {
00577             if (rc == ETIMEDOUT)
00578             {
00579                errno = ETIMEDOUT;
00580             }
00581             if (errorAsException == E_THROW_ON_ERROR)
00582             {
00583                BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00584             }
00585             else
00586             {
00587                return -1;
00588             }
00589          }
00590       }
00591       rc = upread(m_fds[0], buffer, bufferLen);
00592    }
00593 
00594    if (rc == 0)
00595    {
00596       closeInputHandle();
00597    }
00598 
00599    if (errorAsException == E_THROW_ON_ERROR && rc == -1)
00600    {
00601       if (m_fds[0] == BLOCXX_INVALID_HANDLE)
00602       {
00603          BLOCXX_THROW(IOException, "pipe read failed because pipe is closed");
00604       }
00605       else
00606       {
00607          BLOCXX_THROW_ERRNO_MSG(IOException, "pipe read failed");
00608       }
00609    }
00610    return rc;
00611 }
00613 Select_t
00614 PosixUnnamedPipe::getReadSelectObj() const
00615 {
00616    return m_fds[0];
00617 }
00618 
00620 Select_t
00621 PosixUnnamedPipe::getWriteSelectObj() const
00622 {
00623    return m_fds[1];
00624 }
00625 
00627 void
00628 PosixUnnamedPipe::passDescriptor(Descriptor descriptor, const UnnamedPipeRef& ackPipe, const ProcessRef& targetProcess)
00629 {
00630    int rc = -1;
00631    if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00632    {
00633       if (m_blocking[1] == E_BLOCKING)
00634       {
00635          rc = SocketUtils::waitForIO(m_fds[1], getWriteTimeout(), SocketFlags::E_WAIT_FOR_OUTPUT);
00636 
00637          if (rc != 0)
00638          {
00639             if (rc == ETIMEDOUT)
00640             {
00641                errno = ETIMEDOUT;
00642             }
00643             BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00644          }
00645       }
00646  
00647       rc = blocxx::passDescriptor(m_fds[1], descriptor);
00648       if (rc == -1)
00649       {
00650          BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: passDescriptor()");
00651       }
00652 
00653 #if defined(BLOCXX_DARWIN)
00654       callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
00655       if (rc != -1 && needDescriptorPassingWorkaround)
00656       {
00657          // This ignores the blocking and timeouts, because this ACK shouldn't timeout.
00658          rc = SocketUtils::waitForIO(ackPipe->getInputDescriptor(), Timeout::infinite, SocketFlags::E_WAIT_FOR_INPUT);
00659          if (rc != -1)
00660          {
00661             char ack = 'Z';
00662             rc = ackPipe->read(&ack, sizeof(ack), E_RETURN_ON_ERROR);
00663             if (rc == -1)
00664             {
00665                BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: ackPipe->read()");
00666             }
00667             if (ack != 'A')
00668             {
00669                BLOCXX_THROW(IOException, Format("sendDescriptor() failed: ackPipe->read() didn't get 'A', got %1", static_cast<int>(ack)).c_str());
00670             }
00671          }
00672          else
00673          {
00674             BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: waitForIO()");
00675          }
00676       }
00677 #endif
00678    }
00679    if (rc == -1)
00680    {
00681       if (m_fds[1] == BLOCXX_INVALID_HANDLE)
00682       {
00683          BLOCXX_THROW(IOException, "sendDescriptor() failed because pipe is closed");
00684       }
00685       else
00686       {
00687          BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed");
00688       }
00689    }
00690 }
00691 
00693 AutoDescriptor
00694 PosixUnnamedPipe::receiveDescriptor(const UnnamedPipeRef& ackPipe)
00695 {
00696    int rc = -1;
00697    AutoDescriptor descriptor;
00698    if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00699    {
00700       if (m_blocking[0] == E_BLOCKING)
00701       {
00702          rc = SocketUtils::waitForIO(m_fds[0], getReadTimeout(), SocketFlags::E_WAIT_FOR_INPUT);
00703 
00704          if (rc != 0)
00705          {
00706             if (rc == ETIMEDOUT)
00707             {
00708                errno = ETIMEDOUT;
00709             }
00710             BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00711          }
00712       }
00713       descriptor = blocxx::receiveDescriptor(m_fds[0]);
00714 
00715 #if defined(BLOCXX_DARWIN)
00716       callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
00717       if (needDescriptorPassingWorkaround)
00718       {
00719          // This ignores the blocking and timeouts, because this ACK shouldn't timeout.
00720          rc = SocketUtils::waitForIO(ackPipe->getOutputDescriptor(), Timeout::infinite, SocketFlags::E_WAIT_FOR_OUTPUT);
00721          if (rc != -1)
00722          {
00723             char ack = 'A';
00724             ackPipe->write(&ack, sizeof(ack), E_THROW_ON_ERROR);
00725          }
00726       }
00727 #endif
00728    }
00729    else
00730    {
00731       BLOCXX_THROW(IOException, "receiveDescriptor() failed because pipe is closed");
00732    }
00733    return descriptor;
00734 }
00735 
00737 Descriptor
00738 PosixUnnamedPipe::getInputDescriptor() const
00739 {
00740    return m_fds[0];
00741 }
00742 
00744 Descriptor
00745 PosixUnnamedPipe::getOutputDescriptor() const
00746 {
00747    return m_fds[1];
00748 }
00749 
00751 EBlockingMode
00752 PosixUnnamedPipe::getReadBlocking() const
00753 {
00754    return m_blocking[0];
00755 }
00756 
00758 EBlockingMode
00759 PosixUnnamedPipe::getWriteBlocking() const
00760 {
00761    return m_blocking[1];
00762 }
00763 
00764 } // end namespace BLOCXX_NAMESPACE
00765 
00766 #endif