blocxx
|
00001 /******************************************************************************* 00002 * Copyright (C) 2005, Vintela, Inc. All rights reserved. 00003 * Copyright (C) 2006, Novell, Inc. All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright notice, 00009 * this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of 00014 * Vintela, Inc., 00015 * nor Novell, Inc., 00016 * nor the names of its contributors or employees may be used to 00017 * endorse or promote products derived from this software without 00018 * specific prior written permission. 00019 * 00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00030 * POSSIBILITY OF SUCH DAMAGE. 00031 *******************************************************************************/ 00032 00033 00038 #include "blocxx/BLOCXX_config.h" 00039 00040 #if !defined(BLOCXX_WIN32) 00041 00042 #include "blocxx/SocketBaseImpl.hpp" 00043 #include "blocxx/SocketUtils.hpp" 00044 #include "blocxx/Format.hpp" 00045 #include "blocxx/Assertion.hpp" 00046 #include "blocxx/IOException.hpp" 00047 #include "blocxx/Mutex.hpp" 00048 #include "blocxx/MutexLock.hpp" 00049 #include "blocxx/GlobalMutex.hpp" 00050 #include "blocxx/PosixUnnamedPipe.hpp" 00051 #include "blocxx/Socket.hpp" 00052 #include "blocxx/Thread.hpp" 00053 #include "blocxx/DateTime.hpp" 00054 #include "blocxx/TimeoutTimer.hpp" 00055 #include "blocxx/AutoDescriptor.hpp" 00056 #include "blocxx/Logger.hpp" 00057 #include "blocxx/Select.hpp" 00058 00059 00060 extern "C" 00061 { 00062 #include <sys/types.h> 00063 #include <sys/time.h> 00064 #include <sys/socket.h> 00065 #include <sys/stat.h> 00066 #include <netdb.h> 00067 #include <arpa/inet.h> 00068 #include <unistd.h> 00069 #include <fcntl.h> 00070 #include <netinet/in.h> 00071 } 00072 00073 #include <fstream> 00074 #include <cerrno> 00075 #include <cstdio> 00076 00077 namespace BLOCXX_NAMESPACE 00078 { 00079 00080 using std::istream; 00081 using std::ostream; 00082 using std::iostream; 00083 using std::ifstream; 00084 using std::ofstream; 00085 using std::fstream; 00086 using std::ios; 00087 00088 namespace 00089 { 00090 static GlobalMutex g_guard = BLOCXX_GLOBAL_MUTEX_INIT(); 00091 } 00092 00093 String SocketBaseImpl::m_traceFileOut; 00094 String SocketBaseImpl::m_traceFileIn; 00095 00097 SocketBaseImpl::SocketBaseImpl() 00098 : SelectableIFC() 00099 , IOIFC() 00100 , m_isConnected(false) 00101 , m_sockfd(-1) 00102 , m_localAddress() 00103 , m_peerAddress() 00104 , m_recvTimeoutExprd(false) 00105 , m_streamBuf(this) 00106 , m_in(&m_streamBuf) 00107 , m_out(&m_streamBuf) 00108 , m_inout(&m_streamBuf) 00109 , m_recvTimeout(Timeout::infinite) 00110 , m_sendTimeout(Timeout::infinite) 00111 , m_connectTimeout(Timeout::infinite) 00112 { 00113 m_out.exceptions(std::ios::badbit); 00114 m_inout.exceptions(std::ios::badbit); 00115 } 00117 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd, 00118 SocketAddress::AddressType addrType) 00119 : SelectableIFC() 00120 , IOIFC() 00121 , m_isConnected(true) 00122 , m_sockfd(fd) 00123 , m_localAddress(SocketAddress::getAnyLocalHost()) 00124 , m_peerAddress(SocketAddress::allocEmptyAddress(addrType)) 00125 , m_recvTimeoutExprd(false) 00126 , m_streamBuf(this) 00127 , m_in(&m_streamBuf) 00128 , m_out(&m_streamBuf) 00129 , m_inout(&m_streamBuf) 00130 , m_recvTimeout(Timeout::infinite) 00131 , m_sendTimeout(Timeout::infinite) 00132 , m_connectTimeout(Timeout::infinite) 00133 { 00134 m_out.exceptions(std::ios::badbit); 00135 m_inout.exceptions(std::ios::badbit); 00136 if (addrType == SocketAddress::INET) 00137 { 00138 fillInetAddrParms(); 00139 } 00140 else if (addrType == SocketAddress::UDS) 00141 { 00142 fillUnixAddrParms(); 00143 } 00144 else 00145 { 00146 BLOCXX_ASSERT(0); 00147 } 00148 } 00150 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr) 00151 : SelectableIFC() 00152 , IOIFC() 00153 , m_isConnected(false) 00154 , m_sockfd(-1) 00155 , m_localAddress(SocketAddress::getAnyLocalHost()) 00156 , m_peerAddress(addr) 00157 , m_recvTimeoutExprd(false) 00158 , m_streamBuf(this) 00159 , m_in(&m_streamBuf) 00160 , m_out(&m_streamBuf) 00161 , m_inout(&m_streamBuf) 00162 , m_recvTimeout(Timeout::infinite) 00163 , m_sendTimeout(Timeout::infinite) 00164 , m_connectTimeout(Timeout::infinite) 00165 { 00166 m_out.exceptions(std::ios::badbit); 00167 m_inout.exceptions(std::ios::badbit); 00168 connect(m_peerAddress); 00169 } 00171 SocketBaseImpl::~SocketBaseImpl() 00172 { 00173 try 00174 { 00175 disconnect(); 00176 } 00177 catch (...) 00178 { 00179 // don't let exceptions escape 00180 } 00181 } 00183 Select_t 00184 SocketBaseImpl::getSelectObj() const 00185 { 00186 return m_sockfd; 00187 } 00189 void 00190 SocketBaseImpl::connect(const SocketAddress& addr) 00191 { 00192 if (m_isConnected) 00193 { 00194 disconnect(); 00195 } 00196 m_streamBuf.reset(); 00197 m_in.clear(); 00198 m_out.clear(); 00199 m_inout.clear(); 00200 BLOCXX_ASSERT(m_sockfd == -1); 00201 BLOCXX_ASSERT(addr.getType() == SocketAddress::INET || addr.getType() == SocketAddress::UDS); 00202 00203 int domain_type = PF_UNIX; 00204 if( addr.getType() == SocketAddress::INET ) 00205 { 00206 domain_type = PF_INET; 00207 #ifdef BLOCXX_HAVE_IPV6 00208 // set PF_INET6 domain type for IPV6 protocol 00209 if( reinterpret_cast<const sockaddr*>(addr.getInetAddress())->sa_family == AF_INET6) 00210 { 00211 domain_type = PF_INET6; 00212 } 00213 #endif 00214 } 00215 00216 AutoDescriptor sockfd(::socket(domain_type, SOCK_STREAM, 0)); 00217 if (sockfd.get() == -1) 00218 { 00219 BLOCXX_THROW_ERRNO_MSG(SocketException, 00220 "Failed to create a socket"); 00221 } 00222 00223 // set the close on exec flag so child process can't keep the socket. 00224 if (::fcntl(sockfd.get(), F_SETFD, FD_CLOEXEC) == -1) 00225 { 00226 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket"); 00227 } 00228 int n; 00229 int flags = ::fcntl(sockfd.get(), F_GETFL, 0); 00230 ::fcntl(sockfd.get(), F_SETFL, flags | O_NONBLOCK); 00231 #if defined(BLOCXX_NCR) 00232 if ((n = ::connect(sockfd.get(), const_cast<SocketAddress_t *>(addr.getNativeForm()), addr.getNativeFormSize())) < 0) 00233 #else 00234 if ((n = ::connect(sockfd.get(), addr.getNativeForm(), addr.getNativeFormSize())) < 0) 00235 #endif 00236 { 00237 if (errno != EINPROGRESS) 00238 { 00239 BLOCXX_THROW_ERRNO_MSG(SocketException, 00240 Format("Failed to connect to: %1", addr.toString()).c_str()); 00241 } 00242 } 00243 if (n == -1) 00244 { 00245 // because of the above check for EINPROGRESS 00246 // not connected yet, need to select and wait for connection to complete. 00247 PosixUnnamedPipeRef lUPipe; 00248 int pipefd = -1; 00249 if (Socket::getShutDownMechanism()) 00250 { 00251 UnnamedPipeRef foo = Socket::getShutDownMechanism(); 00252 lUPipe = foo.cast_to<PosixUnnamedPipe>(); 00253 BLOCXX_ASSERT(lUPipe); 00254 pipefd = lUPipe->getInputHandle(); 00255 } 00256 Select::SelectObjectArray selra; 00257 Select::SelectObject sockSo(sockfd.get()); 00258 sockSo.waitForRead = true; 00259 sockSo.waitForWrite = true; 00260 selra.push_back(sockSo); 00261 if (pipefd != -1) 00262 { 00263 Select::SelectObject pipeSo(pipefd); 00264 pipeSo.waitForRead = true; 00265 selra.push_back(pipeSo); 00266 } 00267 // here we spin checking for thread cancellation every so often. 00268 TimeoutTimer timer(m_connectTimeout); 00269 timer.start(); 00270 do 00271 { 00272 Thread::testCancel(); 00273 n = Select::selectRW(selra, timer.asRelativeTimeout(0.1)); 00274 timer.loop(); 00275 } while (n == Select::SELECT_TIMEOUT && !timer.expired()); 00276 00277 if (timer.expired()) 00278 { 00279 BLOCXX_THROW(SocketException, "SocketBaseImpl::connect() select timedout"); 00280 } 00281 else if (n == Select::SELECT_ERROR) 00282 { 00283 if (errno == EINTR) 00284 { 00285 Thread::testCancel(); 00286 } 00287 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed"); 00288 } 00289 00290 if (selra.size() == 2 && selra[1].readAvailable) 00291 { 00292 BLOCXX_THROW(SocketException, "Sockets have been shutdown"); 00293 } 00294 else if (selra[0].readAvailable || selra[0].writeAvailable) 00295 { 00296 int error = 0; 00297 socklen_t len = sizeof(error); 00298 #if defined(BLOCXX_NCR) 00299 if (::getsockopt(sockfd.get(), SOL_SOCKET, SO_ERROR, (char*)&error, &len) < 0) 00300 #else 00301 if (::getsockopt(sockfd.get(), SOL_SOCKET, SO_ERROR, &error, &len) < 0) 00302 #endif 00303 { 00304 BLOCXX_THROW_ERRNO_MSG(SocketException, 00305 "SocketBaseImpl::connect() getsockopt() failed"); 00306 } 00307 if (error != 0) 00308 { 00309 errno = error; 00310 BLOCXX_THROW_ERRNO_MSG(SocketException, 00311 "SocketBaseImpl::connect() failed"); 00312 } 00313 } 00314 else 00315 { 00316 BLOCXX_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, sockfd not in FD set."); 00317 } 00318 } 00319 ::fcntl(sockfd.get(), F_SETFL, flags); 00320 m_sockfd = sockfd.release(); 00321 m_isConnected = true; 00322 m_peerAddress = addr; // To get the hostname from addr 00323 if (addr.getType() == SocketAddress::INET) 00324 { 00325 fillInetAddrParms(); 00326 } 00327 else if (addr.getType() == SocketAddress::UDS) 00328 { 00329 fillUnixAddrParms(); 00330 } 00331 else 00332 { 00333 BLOCXX_ASSERT(0); 00334 } 00335 00336 if (!m_traceFileOut.empty()) 00337 { 00338 MutexLock ml(g_guard); 00339 00340 String combofilename = m_traceFileOut + "Combo"; 00341 ofstream comboTraceFile(combofilename.c_str(), std::ios::app); 00342 if (!comboTraceFile) 00343 { 00344 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename)); 00345 } 00346 DateTime curDateTime; 00347 curDateTime.setToCurrent(); 00348 comboTraceFile << Format("\n--->fd: %1 opened to \"%2\" at %3.%4 <---\n", getfd(), 00349 addr.toString(), 00350 curDateTime.toString("%X"), curDateTime.getMicrosecond()); 00351 } 00352 } 00354 void 00355 SocketBaseImpl::disconnect() 00356 { 00357 if (m_in) 00358 { 00359 m_in.clear(ios::eofbit); 00360 } 00361 if (m_out) 00362 { 00363 m_out.clear(ios::eofbit); 00364 } 00365 if (m_inout) 00366 { 00367 m_inout.clear(ios::eofbit); 00368 } 00369 if (m_sockfd != -1 && m_isConnected) 00370 { 00371 if (::close(m_sockfd) == -1) 00372 { 00373 int lerrno = errno; 00374 Logger lgr("blocxx"); 00375 BLOCXX_LOG_ERROR(lgr, Format("Closing socket handle %1 failed: %2", m_sockfd, lerrno)); 00376 } 00377 m_isConnected = false; 00378 00379 if (!m_traceFileOut.empty()) 00380 { 00381 MutexLock ml(g_guard); 00382 00383 String combofilename = m_traceFileOut + "Combo"; 00384 ofstream comboTraceFile(combofilename.c_str(), std::ios::app); 00385 if (!comboTraceFile) 00386 { 00387 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename)); 00388 } 00389 DateTime curDateTime; 00390 curDateTime.setToCurrent(); 00391 comboTraceFile << "\n--->fd: " << getfd() << " closed at " << curDateTime.toString("%X") << 00392 '.' << curDateTime.getMicrosecond() << "<---\n"; 00393 } 00394 00395 m_sockfd = -1; 00396 } 00397 } 00399 // JBW this needs reworked. 00400 void 00401 SocketBaseImpl::fillInetAddrParms() 00402 { 00403 // create LocalAddress and PeerAddress structures for IPV6 protocol 00404 socklen_t len; 00405 struct sockaddr *p_addr; 00406 InetSocketAddress_t ss; 00407 memset(&ss, 0, sizeof(ss)); 00408 len = sizeof(ss); 00409 p_addr = reinterpret_cast<struct sockaddr*>(&ss); 00410 if (getsockname(m_sockfd, p_addr, &len) != -1) 00411 { 00412 m_localAddress.assignFromNativeForm(&ss, len); 00413 } 00414 memset(&ss, 0, sizeof(ss)); 00415 len = sizeof(ss); 00416 if (getpeername(m_sockfd, p_addr, &len) != -1) 00417 { 00418 m_peerAddress.assignFromNativeForm(&ss, len); 00419 } 00420 } 00422 void 00423 SocketBaseImpl::fillUnixAddrParms() 00424 { 00425 socklen_t len; 00426 UnixSocketAddress_t addr; 00427 memset(&addr, 0, sizeof(addr)); 00428 len = sizeof(addr); 00429 if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1) 00430 { 00431 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname"); 00432 } 00433 m_localAddress.assignFromNativeForm(&addr, len); 00434 m_peerAddress.assignFromNativeForm(&addr, len); 00435 } 00437 int 00438 SocketBaseImpl::write(const void* dataOut, int dataOutLen, ErrorAction errorAsException) 00439 { 00440 int rc = 0; 00441 bool isError = false; 00442 if (m_isConnected) 00443 { 00444 isError = waitForOutput(m_sendTimeout); 00445 if (isError) 00446 { 00447 rc = -1; 00448 } 00449 else 00450 { 00451 rc = writeAux(dataOut, dataOutLen); 00452 if (!m_traceFileOut.empty() && rc > 0) 00453 { 00454 MutexLock ml(g_guard); 00455 ofstream traceFile(m_traceFileOut.c_str(), std::ios::app); 00456 if (!traceFile) 00457 { 00458 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", m_traceFileOut)); 00459 } 00460 if (!traceFile.write(static_cast<const char*>(dataOut), rc)) 00461 { 00462 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump"); 00463 } 00464 00465 String combofilename = m_traceFileOut + "Combo"; 00466 ofstream comboTraceFile(combofilename.c_str(), std::ios::app); 00467 if (!comboTraceFile) 00468 { 00469 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename)); 00470 } 00471 DateTime curDateTime; 00472 curDateTime.setToCurrent(); 00473 comboTraceFile << "\n--->fd: " << getfd() << " Out " << rc << " bytes at " << curDateTime.toString("%X") << 00474 '.' << curDateTime.getMicrosecond() << "<---\n"; 00475 if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc)) 00476 { 00477 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump"); 00478 } 00479 } 00480 } 00481 } 00482 else 00483 { 00484 rc = -1; 00485 } 00486 if (rc < 0 && errorAsException == E_THROW_ON_ERROR) 00487 { 00488 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write"); 00489 } 00490 return rc; 00491 } 00493 int 00494 SocketBaseImpl::read(void* dataIn, int dataInLen, ErrorAction errorAsException) 00495 { 00496 int rc = 0; 00497 bool isError = false; 00498 if (m_isConnected) 00499 { 00500 isError = waitForInput(m_recvTimeout); 00501 if (isError) 00502 { 00503 rc = -1; 00504 } 00505 else 00506 { 00507 rc = readAux(dataIn, dataInLen); 00508 if (!m_traceFileIn.empty() && rc > 0) 00509 { 00510 MutexLock ml(g_guard); 00511 ofstream traceFile(m_traceFileIn.c_str(), std::ios::app); 00512 if (!traceFile) 00513 { 00514 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening tracefile \"%1\"", m_traceFileIn)); 00515 } 00516 if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc)) 00517 { 00518 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump"); 00519 } 00520 00521 String combofilename = m_traceFileOut + "Combo"; 00522 ofstream comboTraceFile(combofilename.c_str(), std::ios::app); 00523 if (!comboTraceFile) 00524 { 00525 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename)); 00526 } 00527 DateTime curDateTime; 00528 curDateTime.setToCurrent(); 00529 comboTraceFile << "\n--->fd: " << getfd() << " In " << rc << " bytes at " << curDateTime.toString("%X") << 00530 '.' << curDateTime.getMicrosecond() << "<---\n"; 00531 if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc)) 00532 { 00533 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump"); 00534 } 00535 } 00536 } 00537 } 00538 else 00539 { 00540 rc = -1; 00541 } 00542 if (rc < 0) 00543 { 00544 if (errorAsException == E_THROW_ON_ERROR) 00545 { 00546 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read"); 00547 } 00548 } 00549 return rc; 00550 } 00552 bool 00553 SocketBaseImpl::waitForInput(const Timeout& timeout) 00554 { 00555 int rval = SocketUtils::waitForIO(m_sockfd, timeout, SocketFlags::E_WAIT_FOR_INPUT); 00556 if (rval == ETIMEDOUT) 00557 { 00558 m_recvTimeoutExprd = true; 00559 } 00560 else 00561 { 00562 m_recvTimeoutExprd = false; 00563 } 00564 return (rval != 0); 00565 } 00567 bool 00568 SocketBaseImpl::waitForOutput(const Timeout& timeout) 00569 { 00570 return SocketUtils::waitForIO(m_sockfd, timeout, SocketFlags::E_WAIT_FOR_OUTPUT) != 0; 00571 } 00573 istream& 00574 SocketBaseImpl::getInputStream() 00575 { 00576 return m_in; 00577 } 00579 ostream& 00580 SocketBaseImpl::getOutputStream() 00581 { 00582 return m_out; 00583 } 00585 iostream& 00586 SocketBaseImpl::getIOStream() 00587 { 00588 return m_inout; 00589 } 00591 // STATIC 00592 void 00593 SocketBaseImpl::setDumpFiles(const String& in, const String& out) 00594 { 00595 m_traceFileOut = out; 00596 m_traceFileIn = in; 00597 } 00598 00599 } // end namespace BLOCXX_NAMESPACE 00600 00601 #endif // #if !defined(BLOCXX_WIN32) 00602