blocxx

SocketBaseImpl.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved.
00003 * Copyright (C) 2006, Novell, Inc. All rights reserved.
00004 * 
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 * 
00008 *     * Redistributions of source code must retain the above copyright notice,
00009 *       this list of conditions and the following disclaimer.
00010 *     * Redistributions in binary form must reproduce the above copyright
00011 *       notice, this list of conditions and the following disclaimer in the
00012 *       documentation and/or other materials provided with the distribution.
00013 *     * Neither the name of 
00014 *       Vintela, Inc., 
00015 *       nor Novell, Inc., 
00016 *       nor the names of its contributors or employees may be used to 
00017 *       endorse or promote products derived from this software without 
00018 *       specific prior written permission.
00019 * 
00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030 * POSSIBILITY OF SUCH DAMAGE.
00031 *******************************************************************************/
00032 
00033 
00038 #include "blocxx/BLOCXX_config.h"
00039 
00040 #if !defined(BLOCXX_WIN32)
00041 
00042 #include "blocxx/SocketBaseImpl.hpp"
00043 #include "blocxx/SocketUtils.hpp"
00044 #include "blocxx/Format.hpp"
00045 #include "blocxx/Assertion.hpp"
00046 #include "blocxx/IOException.hpp"
00047 #include "blocxx/Mutex.hpp"
00048 #include "blocxx/MutexLock.hpp"
00049 #include "blocxx/GlobalMutex.hpp"
00050 #include "blocxx/PosixUnnamedPipe.hpp"
00051 #include "blocxx/Socket.hpp"
00052 #include "blocxx/Thread.hpp"
00053 #include "blocxx/DateTime.hpp"
00054 #include "blocxx/TimeoutTimer.hpp"
00055 #include "blocxx/AutoDescriptor.hpp"
00056 #include "blocxx/Logger.hpp"
00057 #include "blocxx/Select.hpp"
00058 
00059 
00060 extern "C"
00061 {
00062 #include <sys/types.h>
00063 #include <sys/time.h>
00064 #include <sys/socket.h>
00065 #include <sys/stat.h>
00066 #include <netdb.h>
00067 #include <arpa/inet.h>
00068 #include <unistd.h>
00069 #include <fcntl.h>
00070 #include <netinet/in.h>
00071 }
00072 
00073 #include <fstream>
00074 #include <cerrno>
00075 #include <cstdio>
00076 
00077 namespace BLOCXX_NAMESPACE
00078 {
00079 
00080 using std::istream;
00081 using std::ostream;
00082 using std::iostream;
00083 using std::ifstream;
00084 using std::ofstream;
00085 using std::fstream;
00086 using std::ios;
00087 
00088 namespace
00089 {
00090 static GlobalMutex g_guard = BLOCXX_GLOBAL_MUTEX_INIT();
00091 }
00092 
00093 String SocketBaseImpl::m_traceFileOut;
00094 String SocketBaseImpl::m_traceFileIn;
00095 
00097 SocketBaseImpl::SocketBaseImpl()
00098    : SelectableIFC()
00099    , IOIFC()
00100    , m_isConnected(false)
00101    , m_sockfd(-1)
00102    , m_localAddress()
00103    , m_peerAddress()
00104    , m_recvTimeoutExprd(false)
00105    , m_streamBuf(this)
00106    , m_in(&m_streamBuf)
00107    , m_out(&m_streamBuf)
00108    , m_inout(&m_streamBuf)
00109    , m_recvTimeout(Timeout::infinite)
00110    , m_sendTimeout(Timeout::infinite)
00111    , m_connectTimeout(Timeout::infinite)
00112 {
00113    m_out.exceptions(std::ios::badbit);
00114    m_inout.exceptions(std::ios::badbit);
00115 }
00117 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd,
00118       SocketAddress::AddressType addrType)
00119    : SelectableIFC()
00120    , IOIFC()
00121    , m_isConnected(true)
00122    , m_sockfd(fd)
00123    , m_localAddress(SocketAddress::getAnyLocalHost())
00124    , m_peerAddress(SocketAddress::allocEmptyAddress(addrType))
00125    , m_recvTimeoutExprd(false)
00126    , m_streamBuf(this)
00127    , m_in(&m_streamBuf)
00128    , m_out(&m_streamBuf)
00129    , m_inout(&m_streamBuf)
00130    , m_recvTimeout(Timeout::infinite)
00131    , m_sendTimeout(Timeout::infinite)
00132    , m_connectTimeout(Timeout::infinite)
00133 {
00134    m_out.exceptions(std::ios::badbit);
00135    m_inout.exceptions(std::ios::badbit);
00136    if (addrType == SocketAddress::INET)
00137    {
00138       fillInetAddrParms();
00139    }
00140    else if (addrType == SocketAddress::UDS)
00141    {
00142       fillUnixAddrParms();
00143    }
00144    else
00145    {
00146       BLOCXX_ASSERT(0);
00147    }
00148 }
00150 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr)
00151    : SelectableIFC()
00152    , IOIFC()
00153    , m_isConnected(false)
00154    , m_sockfd(-1)
00155    , m_localAddress(SocketAddress::getAnyLocalHost())
00156    , m_peerAddress(addr)
00157    , m_recvTimeoutExprd(false)
00158    , m_streamBuf(this)
00159    , m_in(&m_streamBuf)
00160    , m_out(&m_streamBuf)
00161    , m_inout(&m_streamBuf)
00162    , m_recvTimeout(Timeout::infinite)
00163    , m_sendTimeout(Timeout::infinite)
00164    , m_connectTimeout(Timeout::infinite)
00165 {
00166    m_out.exceptions(std::ios::badbit);
00167    m_inout.exceptions(std::ios::badbit);
00168    connect(m_peerAddress);
00169 }
00171 SocketBaseImpl::~SocketBaseImpl()
00172 {
00173    try
00174    {
00175       disconnect();
00176    }
00177    catch (...)
00178    {
00179       // don't let exceptions escape
00180    }
00181 }
00183 Select_t
00184 SocketBaseImpl::getSelectObj() const
00185 {
00186    return m_sockfd;
00187 }
00189 void
00190 SocketBaseImpl::connect(const SocketAddress& addr)
00191 {
00192    if (m_isConnected)
00193    {
00194       disconnect();
00195    }
00196    m_streamBuf.reset();
00197    m_in.clear();
00198    m_out.clear();
00199    m_inout.clear();
00200    BLOCXX_ASSERT(m_sockfd == -1);
00201    BLOCXX_ASSERT(addr.getType() == SocketAddress::INET || addr.getType() == SocketAddress::UDS);
00202 
00203    int domain_type = PF_UNIX;
00204    if( addr.getType() == SocketAddress::INET )
00205    {
00206       domain_type = PF_INET;
00207 #ifdef BLOCXX_HAVE_IPV6
00208       // set PF_INET6 domain type for IPV6 protocol
00209       if( reinterpret_cast<const sockaddr*>(addr.getInetAddress())->sa_family == AF_INET6)
00210       {
00211          domain_type = PF_INET6;
00212       }
00213 #endif
00214    }
00215 
00216    AutoDescriptor sockfd(::socket(domain_type, SOCK_STREAM, 0));
00217    if (sockfd.get() == -1)
00218    {
00219       BLOCXX_THROW_ERRNO_MSG(SocketException,
00220          "Failed to create a socket");
00221    }
00222 
00223    // set the close on exec flag so child process can't keep the socket.
00224    if (::fcntl(sockfd.get(), F_SETFD, FD_CLOEXEC) == -1)
00225    {
00226       BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket");
00227    }
00228    int n;
00229    int flags = ::fcntl(sockfd.get(), F_GETFL, 0);
00230    ::fcntl(sockfd.get(), F_SETFL, flags | O_NONBLOCK);
00231 #if defined(BLOCXX_NCR)
00232    if ((n = ::connect(sockfd.get(), const_cast<SocketAddress_t *>(addr.getNativeForm()), addr.getNativeFormSize())) < 0)
00233 #else
00234    if ((n = ::connect(sockfd.get(), addr.getNativeForm(), addr.getNativeFormSize())) < 0)
00235 #endif
00236    {
00237       if (errno != EINPROGRESS)
00238       {
00239          BLOCXX_THROW_ERRNO_MSG(SocketException,
00240             Format("Failed to connect to: %1", addr.toString()).c_str());
00241       }
00242    }
00243    if (n == -1)
00244    {
00245       // because of the above check for EINPROGRESS
00246       // not connected yet, need to select and wait for connection to complete.
00247       PosixUnnamedPipeRef lUPipe;
00248       int pipefd = -1;
00249       if (Socket::getShutDownMechanism())
00250       {
00251          UnnamedPipeRef foo = Socket::getShutDownMechanism();
00252          lUPipe = foo.cast_to<PosixUnnamedPipe>();
00253          BLOCXX_ASSERT(lUPipe);
00254          pipefd = lUPipe->getInputHandle();
00255       }
00256       Select::SelectObjectArray selra; 
00257       Select::SelectObject sockSo(sockfd.get()); 
00258       sockSo.waitForRead = true; 
00259       sockSo.waitForWrite = true; 
00260       selra.push_back(sockSo); 
00261       if (pipefd != -1)
00262       {
00263          Select::SelectObject pipeSo(pipefd); 
00264          pipeSo.waitForRead = true; 
00265          selra.push_back(pipeSo); 
00266       }
00267       // here we spin checking for thread cancellation every so often.
00268       TimeoutTimer timer(m_connectTimeout);
00269       timer.start();
00270       do
00271       {
00272          Thread::testCancel(); 
00273          n = Select::selectRW(selra, timer.asRelativeTimeout(0.1)); 
00274          timer.loop();
00275       } while (n == Select::SELECT_TIMEOUT && !timer.expired());
00276 
00277       if (timer.expired())
00278       {
00279          BLOCXX_THROW(SocketException, "SocketBaseImpl::connect() select timedout");
00280       }
00281       else if (n == Select::SELECT_ERROR)
00282       {
00283          if (errno == EINTR)
00284          {
00285             Thread::testCancel();
00286          }
00287          BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed");
00288       }
00289 
00290       if (selra.size() == 2 && selra[1].readAvailable)
00291       {
00292          BLOCXX_THROW(SocketException, "Sockets have been shutdown");
00293       }
00294       else if (selra[0].readAvailable || selra[0].writeAvailable)
00295       {
00296          int error = 0;
00297          socklen_t len = sizeof(error);
00298 #if defined(BLOCXX_NCR)
00299          if (::getsockopt(sockfd.get(), SOL_SOCKET, SO_ERROR, (char*)&error, &len) < 0)
00300 #else
00301          if (::getsockopt(sockfd.get(), SOL_SOCKET, SO_ERROR, &error, &len) < 0)
00302 #endif
00303          {
00304             BLOCXX_THROW_ERRNO_MSG(SocketException,
00305                   "SocketBaseImpl::connect() getsockopt() failed");
00306          }
00307          if (error != 0)
00308          {
00309             errno = error;
00310             BLOCXX_THROW_ERRNO_MSG(SocketException,
00311                   "SocketBaseImpl::connect() failed");
00312          }
00313       }
00314       else
00315       {
00316          BLOCXX_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, sockfd not in FD set.");
00317       }
00318    }
00319    ::fcntl(sockfd.get(), F_SETFL, flags);
00320    m_sockfd = sockfd.release();
00321    m_isConnected = true;
00322    m_peerAddress = addr; // To get the hostname from addr
00323    if (addr.getType() == SocketAddress::INET)
00324    {
00325       fillInetAddrParms();
00326    }
00327    else if (addr.getType() == SocketAddress::UDS)
00328    {
00329       fillUnixAddrParms();
00330    }
00331    else
00332    {
00333       BLOCXX_ASSERT(0);
00334    }
00335 
00336    if (!m_traceFileOut.empty())
00337    {
00338       MutexLock ml(g_guard);
00339 
00340       String combofilename = m_traceFileOut + "Combo";
00341       ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00342       if (!comboTraceFile)
00343       {
00344          BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00345       }
00346       DateTime curDateTime;
00347       curDateTime.setToCurrent();
00348       comboTraceFile << Format("\n--->fd: %1 opened to \"%2\" at %3.%4 <---\n", getfd(),
00349          addr.toString(),
00350          curDateTime.toString("%X"), curDateTime.getMicrosecond());
00351    }
00352 }
00354 void
00355 SocketBaseImpl::disconnect()
00356 {
00357    if (m_in)
00358    {
00359       m_in.clear(ios::eofbit);
00360    }
00361    if (m_out)
00362    {
00363       m_out.clear(ios::eofbit);
00364    }
00365    if (m_inout)
00366    {
00367       m_inout.clear(ios::eofbit);
00368    }
00369    if (m_sockfd != -1 && m_isConnected)
00370    {
00371       if (::close(m_sockfd) == -1)
00372       {
00373          int lerrno = errno;
00374          Logger lgr("blocxx");
00375          BLOCXX_LOG_ERROR(lgr, Format("Closing socket handle %1 failed: %2", m_sockfd, lerrno));
00376       }
00377       m_isConnected = false;
00378 
00379       if (!m_traceFileOut.empty())
00380       {
00381          MutexLock ml(g_guard);
00382 
00383          String combofilename = m_traceFileOut + "Combo";
00384          ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00385          if (!comboTraceFile)
00386          {
00387             BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00388          }
00389          DateTime curDateTime;
00390          curDateTime.setToCurrent();
00391          comboTraceFile << "\n--->fd: " << getfd() << " closed at " << curDateTime.toString("%X") <<
00392             '.' << curDateTime.getMicrosecond() << "<---\n";
00393       }
00394 
00395       m_sockfd = -1;
00396    }
00397 }
00399 // JBW this needs reworked.
00400 void
00401 SocketBaseImpl::fillInetAddrParms()
00402 {
00403    // create LocalAddress and PeerAddress structures for IPV6 protocol
00404    socklen_t len;
00405    struct sockaddr *p_addr;
00406    InetSocketAddress_t ss;
00407    memset(&ss, 0, sizeof(ss));
00408    len = sizeof(ss);
00409    p_addr = reinterpret_cast<struct sockaddr*>(&ss);
00410    if (getsockname(m_sockfd, p_addr, &len) != -1)
00411    {
00412        m_localAddress.assignFromNativeForm(&ss, len);
00413    }
00414    memset(&ss, 0, sizeof(ss));
00415    len = sizeof(ss);
00416    if (getpeername(m_sockfd, p_addr, &len) != -1)
00417    {
00418        m_peerAddress.assignFromNativeForm(&ss, len);
00419    }
00420 }
00422 void
00423 SocketBaseImpl::fillUnixAddrParms()
00424 {
00425    socklen_t len;
00426    UnixSocketAddress_t addr;
00427    memset(&addr, 0, sizeof(addr));
00428    len = sizeof(addr);
00429    if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00430    {
00431       BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname");
00432    }
00433    m_localAddress.assignFromNativeForm(&addr, len);
00434    m_peerAddress.assignFromNativeForm(&addr, len);
00435 }
00437 int
00438 SocketBaseImpl::write(const void* dataOut, int dataOutLen, ErrorAction errorAsException)
00439 {
00440    int rc = 0;
00441    bool isError = false;
00442    if (m_isConnected)
00443    {
00444       isError = waitForOutput(m_sendTimeout);
00445       if (isError)
00446       {
00447          rc = -1;
00448       }
00449       else
00450       {
00451          rc = writeAux(dataOut, dataOutLen);
00452          if (!m_traceFileOut.empty() && rc > 0)
00453          {
00454             MutexLock ml(g_guard);
00455             ofstream traceFile(m_traceFileOut.c_str(), std::ios::app);
00456             if (!traceFile)
00457             {
00458                BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", m_traceFileOut));
00459             }
00460             if (!traceFile.write(static_cast<const char*>(dataOut), rc))
00461             {
00462                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00463             }
00464 
00465             String combofilename = m_traceFileOut + "Combo";
00466             ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00467             if (!comboTraceFile)
00468             {
00469                BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00470             }
00471             DateTime curDateTime;
00472             curDateTime.setToCurrent();
00473             comboTraceFile << "\n--->fd: " << getfd() << " Out " << rc << " bytes at " << curDateTime.toString("%X") <<
00474                '.' << curDateTime.getMicrosecond() << "<---\n";
00475             if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc))
00476             {
00477                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00478             }
00479          }
00480       }
00481    }
00482    else
00483    {
00484       rc = -1;
00485    }
00486    if (rc < 0 && errorAsException == E_THROW_ON_ERROR)
00487    {
00488       BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write");
00489    }
00490    return rc;
00491 }
00493 int
00494 SocketBaseImpl::read(void* dataIn, int dataInLen, ErrorAction errorAsException)
00495 {
00496    int rc = 0;
00497    bool isError = false;
00498    if (m_isConnected)
00499    {
00500       isError = waitForInput(m_recvTimeout);
00501       if (isError)
00502       {
00503          rc = -1;
00504       }
00505       else
00506       {
00507          rc = readAux(dataIn, dataInLen);
00508          if (!m_traceFileIn.empty() && rc > 0)
00509          {
00510             MutexLock ml(g_guard);
00511             ofstream traceFile(m_traceFileIn.c_str(), std::ios::app);
00512             if (!traceFile)
00513             {
00514                BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening tracefile \"%1\"", m_traceFileIn));
00515             }
00516             if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00517             {
00518                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00519             }
00520 
00521             String combofilename = m_traceFileOut + "Combo";
00522             ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00523             if (!comboTraceFile)
00524             {
00525                BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00526             }
00527             DateTime curDateTime;
00528             curDateTime.setToCurrent();
00529             comboTraceFile << "\n--->fd: " << getfd() << " In " << rc << " bytes at " << curDateTime.toString("%X") <<
00530                '.' << curDateTime.getMicrosecond() << "<---\n";
00531             if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00532             {
00533                BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00534             }
00535          }
00536       }
00537    }
00538    else
00539    {
00540       rc = -1;
00541    }
00542    if (rc < 0)
00543    {
00544       if (errorAsException == E_THROW_ON_ERROR)
00545       {
00546          BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read");
00547       }
00548    }
00549    return rc;
00550 }
00552 bool
00553 SocketBaseImpl::waitForInput(const Timeout& timeout)
00554 {
00555    int rval = SocketUtils::waitForIO(m_sockfd, timeout, SocketFlags::E_WAIT_FOR_INPUT);
00556    if (rval == ETIMEDOUT)
00557    {
00558       m_recvTimeoutExprd = true;
00559    }
00560    else
00561    {
00562       m_recvTimeoutExprd = false;
00563    }
00564    return (rval != 0);
00565 }
00567 bool
00568 SocketBaseImpl::waitForOutput(const Timeout& timeout)
00569 {
00570    return SocketUtils::waitForIO(m_sockfd, timeout, SocketFlags::E_WAIT_FOR_OUTPUT) != 0;
00571 }
00573 istream&
00574 SocketBaseImpl::getInputStream()
00575 {
00576    return m_in;
00577 }
00579 ostream&
00580 SocketBaseImpl::getOutputStream()
00581 {
00582    return m_out;
00583 }
00585 iostream&
00586 SocketBaseImpl::getIOStream()
00587 {
00588    return m_inout;
00589 }
00591 // STATIC
00592 void
00593 SocketBaseImpl::setDumpFiles(const String& in, const String& out)
00594 {
00595    m_traceFileOut = out;
00596    m_traceFileIn = in;
00597 }
00598 
00599 } // end namespace BLOCXX_NAMESPACE
00600 
00601 #endif   // #if !defined(BLOCXX_WIN32)
00602