blocxx

Exec.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2005, Quest Software, Inc. All rights reserved.
00003 * Copyright (C) 2006, Novell, Inc. All rights reserved.
00004 * 
00005 * Redistribution and use in source and binary forms, with or without
00006 * modification, are permitted provided that the following conditions are met:
00007 * 
00008 *     * Redistributions of source code must retain the above copyright notice,
00009 *       this list of conditions and the following disclaimer.
00010 *     * Redistributions in binary form must reproduce the above copyright
00011 *       notice, this list of conditions and the following disclaimer in the
00012 *       documentation and/or other materials provided with the distribution.
00013 *     * Neither the name of 
00014 *       Quest Software, Inc., 
00015 *       nor Novell, Inc., 
00016 *       nor the names of its contributors or employees may be used to 
00017 *       endorse or promote products derived from this software without 
00018 *       specific prior written permission.
00019 * 
00020 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00021 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00022 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00023 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00024 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00025 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00026 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00027 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00028 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00029 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00030 * POSSIBILITY OF SUCH DAMAGE.
00031 *******************************************************************************/
00032 
00033 
00038 #include "blocxx/BLOCXX_config.h"
00039 #include "blocxx/Exec.hpp"
00040 #include "blocxx/Select.hpp"
00041 #include "blocxx/ExceptionIds.hpp"
00042 #include "blocxx/TimeoutTimer.hpp"
00043 #include "blocxx/ExecMockObject.hpp"
00044 #include "blocxx/GlobalPtr.hpp"
00045 #include "blocxx/WaitpidThreadFix.hpp"
00046 
00047 #if !defined(BLOCXX_WIN32)
00048 #include "blocxx/PosixUnnamedPipe.hpp"
00049 #include "blocxx/PosixExec.hpp"
00050 #else
00051 #include "blocxx/UnnamedPipe.hpp"
00052 #include "blocxx/WinExec.hpp"
00053 #endif
00054 
00055 extern "C"
00056 {
00057 #ifdef BLOCXX_HAVE_SYS_RESOURCE_H
00058 #include <sys/resource.h>
00059 #endif
00060 #ifdef BLOCXX_HAVE_SYS_TYPES_H
00061 #include <sys/types.h>
00062 #endif
00063 #ifdef BLOCXX_HAVE_UNISTD_H
00064 #include <unistd.h>
00065 #endif
00066 #ifndef BLOCXX_WIN32
00067 #include <sys/wait.h>
00068 #include <fcntl.h>
00069 #endif
00070 }
00071 
00072 #ifdef BLOCXX_NCR
00073 #if defined(sigaction)
00074 #undef sigaction
00075 #endif
00076 #undef SIG_DFL
00077 #define  SIG_DFL  (void(*)())0
00078 #endif
00079 
00080 namespace BLOCXX_NAMESPACE
00081 {
00082 BLOCXX_DEFINE_EXCEPTION_WITH_BASE_AND_ID(ExecTimeout, ExecErrorException);
00083 BLOCXX_DEFINE_EXCEPTION_WITH_BASE_AND_ID(ExecBufferFull, ExecErrorException);
00084 BLOCXX_DEFINE_EXCEPTION_WITH_ID(ExecError);
00085 
00086 
00088 namespace Exec
00089 {
00090 ::BLOCXX_NAMESPACE::GlobalPtr<ExecMockObject, Impl::NullFactory> g_execMockObject = BLOCXX_GLOBAL_PTR_INIT;
00091 
00093 Process::Status
00094 system(const Array<String>& command, const char* const envp[], const Timeout& timeout)
00095 {
00096 
00097 #ifndef BLOCXX_WIN32
00098    PosixExec::SystemPreExec spe;
00099 #else
00100    WinExec::WinSystemPreExec spe;
00101 #endif
00102 
00103    ProcessRef proc = Exec::spawn(command[0], command, envp, spe);
00104 
00105    proc->waitCloseTerm(Timeout::relative(0), timeout, Timeout::relative(0));
00106    return proc->processStatus();
00107 }
00108 
00109 
00111 int 
00112 safeSystem(const Array<String>& command, const char* const envp[])
00113 {
00114     Process::Status ps = system(command, envp); 
00115     return ps.getPOSIXwaitpidStatus(); 
00116 }
00117 
00119 ProcessRef spawnImpl(
00120    char const * exec_path,
00121    char const * const argv[], char const * const envp[],
00122    PreExec & pre_exec
00123 )
00124 {
00125 #ifdef BLOCXX_WIN32
00126    return WinExec::spawnImpl(exec_path, argv, envp, pre_exec);
00127 #else
00128    return PosixExec::spawnImpl(exec_path, argv, envp, pre_exec);
00129 #endif
00130 }
00131 
00133 ProcessRef spawn(
00134    char const * exec_path,
00135    char const * const argv[], char const * const envp[],
00136    PreExec & pre_exec
00137 )
00138 {
00139    if (WaitpidThreadFix::shouldUseWaitpidThreadFix())
00140    {
00141       return WaitpidThreadFix::spawnProcess(exec_path, argv, envp, pre_exec);
00142    }
00143    return spawnImpl(exec_path, argv, envp, pre_exec);
00144 }
00145 
00147 ProcessRef spawn(
00148    char const * const argv[], char const * const envp[]
00149 )
00150 {
00151 
00152 #ifdef BLOCXX_WIN32
00153    WinExec::WinStandardPreExec pre_exec;
00154 #else
00155    PosixExec::StandardPreExec pre_exec;
00156 #endif
00157 
00158    return spawn(argv[0], argv, envp, pre_exec);
00159 }
00160 
00161 namespace Impl
00162 {
00163 void close_child_ends(UnnamedPipeRef ppipe[BLOCXX_NPIPE])
00164 {
00165    // prevent the parent from using the child's end of the pipes.
00166    if (ppipe[BLOCXX_IN])
00167    {
00168       ppipe[BLOCXX_IN]->closeInputHandle();
00169    }
00170    if (ppipe[BLOCXX_OUT])
00171    {
00172       ppipe[BLOCXX_OUT]->closeOutputHandle();
00173    }
00174    if (ppipe[BLOCXX_SERR])
00175    {
00176       ppipe[BLOCXX_SERR]->closeOutputHandle();
00177    }
00178    ppipe[BLOCXX_EXEC_ERR]->closeOutputHandle();
00179 }
00180 } // end namespace Impl
00181 
00182 namespace
00183 {
00184 
00185 #ifndef BLOCXX_MIN
00186 #define BLOCXX_MIN(x, y) (x) < (y) ? (x) : (y)
00187 #endif
00188 
00190 class StringOutputGatherer : public OutputCallback
00191 {
00192 public:
00193    StringOutputGatherer(String& stdoutput, String& erroutput, int outputLimit)
00194       : m_output(stdoutput)
00195       , m_erroutput(erroutput)
00196       , m_outputLimit(outputLimit)
00197    {
00198    }
00199    StringOutputGatherer(String& stdoutput, int outputLimit)
00200       : m_output(stdoutput)
00201       , m_erroutput(stdoutput)
00202       , m_outputLimit(outputLimit)
00203    {
00204    }
00205 private:
00206    virtual void doHandleData(const char* data, size_t dataLen, 
00207       EOutputSource outputSource, const ProcessRef& theProc,
00208       size_t streamIndex, Array<char>& inputBuffer)
00209    {
00210       String& output = (outputSource == E_STDOUT) ? m_output : m_erroutput;
00211       if (m_outputLimit >= 0 && output.length() + dataLen > static_cast<size_t>(m_outputLimit))
00212       {
00213          // the process output too much, so just copy what we can and return error
00214          int lentocopy = BLOCXX_MIN(m_outputLimit - output.length(), dataLen);
00215          if (lentocopy >= 0)
00216          {
00217             output += String(data, lentocopy);
00218          }
00219          BLOCXX_THROW(ExecBufferFullException, 
00220             "Exec::StringOutputGatherer::doHandleData(): buffer full");
00221       }
00222 
00223       output += String(data, dataLen);
00224    }
00225    String& m_output;
00226    String& m_erroutput;
00227    int m_outputLimit;
00228 };
00229 
00231 class SingleStringInputCallback : public InputCallback
00232 {
00233 public:
00234    SingleStringInputCallback(const String& s)
00235       : m_s(s)
00236    {
00237    }
00238 private:
00239    virtual void doGetData(Array<char>& inputBuffer, const ProcessRef& theProc, size_t streamIndex)
00240    {
00241       if (m_s.length() > 0)
00242       {
00243          inputBuffer.insert(inputBuffer.end(), m_s.c_str(), m_s.c_str() + m_s.length());
00244          m_s.erase();
00245       }
00246       else if (theProc->in()->isOpen())
00247       {
00248          theProc->in()->close();
00249       }
00250    }
00251    String m_s;
00252 };
00253 
00254 }// end anonymous namespace
00255 
00257 Process::Status executeProcessAndGatherOutput(
00258    char const * const command[],
00259    String& output,
00260    char const * const envVars[],
00261    const Timeout& timeout,
00262    int outputLimit,
00263    char const * input)
00264 {
00265    if (g_execMockObject.get())
00266    {
00267       return g_execMockObject.get()->executeProcessAndGatherOutput(command, output, envVars, timeout, outputLimit, input);
00268    }
00269    return feedProcessAndGatherOutput(spawn(command, envVars),
00270       output, timeout, outputLimit, input);
00271 }
00272 
00274 Process::Status executeProcessAndGatherOutput(
00275    char const * const command[],
00276    String& output,
00277    String& erroutput,
00278    char const * const envVars[],
00279    const Timeout& timeout,
00280    int outputLimit,
00281    char const * input)
00282 {
00283    if (g_execMockObject.get())
00284    {
00285       return g_execMockObject.get()->executeProcessAndGatherOutput2(command, output, 
00286          erroutput, envVars, timeout, outputLimit, input);
00287    }
00288 
00289    return feedProcessAndGatherOutput(spawn(command, envVars),
00290       output, erroutput, timeout, outputLimit, input);
00291 }
00292 
00294 BLOCXX_COMMON_API void executeProcessAndGatherOutput(
00295    const Array<String>& command,
00296    String& output, int& processstatus,
00297    int timeoutsecs, int outputlimit, 
00298    const String& input)
00299 {
00300     Timeout timeout = Timeout::infinite; 
00301     if (timeoutsecs != -1)
00302     {
00303    timeout = Timeout::relative(timeoutsecs); 
00304     }
00305     Process::Status ps = executeProcessAndGatherOutput(command, 
00306                          output, 
00307                          timeout, 
00308                          outputlimit, input); 
00309 
00310    processstatus = ps.getPOSIXwaitpidStatus(); 
00311 }
00312 
00314 Process::Status feedProcessAndGatherOutput(
00315    ProcessRef const & proc,
00316    String & output,
00317    Timeout const & timeout, 
00318    int outputLimit, 
00319    String const & input)
00320 {
00321    Array<ProcessRef> procarr(1, proc);
00322    SingleStringInputCallback singleStringInputCallback(input);
00323 
00324    StringOutputGatherer gatherer(output, outputLimit);
00325    processInputOutput(gatherer, procarr, singleStringInputCallback, timeout);
00326    proc->waitCloseTerm();
00327    return proc->processStatus();
00328 }
00329 
00331 Process::Status feedProcessAndGatherOutput(
00332    ProcessRef const & proc,
00333    String & output,
00334    String & erroutput,
00335    Timeout const & timeout, 
00336    int outputLimit, 
00337    String const & input)
00338 {
00339    Array<ProcessRef> procarr(1, proc);
00340    SingleStringInputCallback singleStringInputCallback(input);
00341 
00342    StringOutputGatherer gatherer(output, erroutput, outputLimit);
00343    processInputOutput(gatherer, procarr, singleStringInputCallback, timeout);
00344    proc->waitCloseTerm();
00345    return proc->processStatus();
00346 }
00347 
00349 void
00350 gatherOutput(String& output, const ProcessRef& proc, int timeoutSecs, int outputLimit)
00351 {
00352    gatherOutput(output, proc, Timeout::relativeWithReset(timeoutSecs), outputLimit);
00353 }
00355 void
00356 gatherOutput(String& output, const ProcessRef& proc, const Timeout& timeout, int outputLimit)
00357 {
00358    Array<ProcessRef> procs(1, proc);
00359 
00360    StringOutputGatherer gatherer(output, outputLimit);
00361    SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(String());
00362    processInputOutput(gatherer, procs, singleStringInputCallback, timeout);
00363 }
00364 
00366 OutputCallback::~OutputCallback()
00367 {
00368 
00369 }
00370 
00372 void
00373 OutputCallback::handleData(const char* data, size_t dataLen, EOutputSource outputSource, const ProcessRef& theProc, size_t streamIndex, Array<char>& inputBuffer)
00374 {
00375    doHandleData(data, dataLen, outputSource, theProc, streamIndex, inputBuffer);
00376 }
00377 
00379 InputCallback::~InputCallback()
00380 {
00381 }
00382 
00384 void
00385 InputCallback::getData(Array<char>& inputBuffer, const ProcessRef& theProc, size_t streamIndex)
00386 {
00387    doGetData(inputBuffer, theProc, streamIndex);
00388 }
00389 
00390 namespace
00391 {
00392    struct ProcessOutputState
00393    {
00394       bool inIsOpen;
00395       bool outIsOpen;
00396       bool errIsOpen;
00397       size_t availableDataLen;
00398 
00399       ProcessOutputState()
00400          : inIsOpen(true)
00401          , outIsOpen(true)
00402          , errIsOpen(true)
00403          , availableDataLen(0)
00404       {
00405       }
00406    };
00407 
00408 }
00409 
00411 void
00412 processInputOutput(OutputCallback& output, Array<ProcessRef>& procs, InputCallback& input, const Timeout& timeout)
00413 {
00414    TimeoutTimer timer(timeout);
00415 
00416    Array<ProcessOutputState> processStates(procs.size());
00417    int numOpenPipes(procs.size() * 2); // count of stdout & stderr. Ignore stdin for purposes of algorithm termination.
00418 
00419    Array<Array<char> > inputs(processStates.size());
00420    for (size_t i = 0; i < processStates.size(); ++i)
00421    {
00422       input.getData(inputs[i], procs[i], i);
00423       processStates[i].availableDataLen = inputs[i].size();
00424       if (!procs[i]->out()->isOpen())
00425       {
00426          processStates[i].outIsOpen = false;
00427       }
00428       if (!procs[i]->err()->isOpen())
00429       {
00430          processStates[i].errIsOpen = false;
00431       }
00432       if (!procs[i]->in()->isOpen())
00433       {
00434          processStates[i].inIsOpen = false;
00435       }
00436 
00437    }
00438 
00439    timer.start();
00440 
00441    while (numOpenPipes > 0)
00442    {
00443       Select::SelectObjectArray selObjs; 
00444       std::map<int, int> inputIndexProcessIndex;
00445       std::map<int, int> outputIndexProcessIndex;
00446       for (size_t i = 0; i < procs.size(); ++i)
00447       {
00448          if (processStates[i].outIsOpen)
00449          {
00450             Select::SelectObject selObj(procs[i]->out()->getReadSelectObj()); 
00451             selObj.waitForRead = true; 
00452             selObjs.push_back(selObj); 
00453             inputIndexProcessIndex[selObjs.size() - 1] = i;
00454          }
00455          if (processStates[i].errIsOpen)
00456          {
00457             Select::SelectObject selObj(procs[i]->err()->getReadSelectObj()); 
00458             selObj.waitForRead = true; 
00459             selObjs.push_back(selObj); 
00460             inputIndexProcessIndex[selObjs.size() - 1] = i;
00461          }
00462          if (processStates[i].inIsOpen && processStates[i].availableDataLen > 0)
00463          {
00464             Select::SelectObject selObj(procs[i]->in()->getWriteSelectObj()); 
00465             selObj.waitForWrite = true; 
00466             selObjs.push_back(selObj); 
00467             outputIndexProcessIndex[selObjs.size() - 1] = i;
00468          }
00469 
00470       }
00471 
00472       int selectrval = Select::selectRW(selObjs, timer.asRelativeTimeout());
00473       switch (selectrval)
00474       {
00475          case Select::SELECT_ERROR:
00476          {
00477             BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: error selecting on stdout and stderr");
00478          }
00479          break;
00480          case Select::SELECT_TIMEOUT:
00481          {
00482             timer.loop();
00483             if (timer.expired())
00484             {
00485                BLOCXX_THROW(ExecTimeoutException, "Exec::gatherOutput: timedout");
00486             }
00487          }
00488          break;
00489          default:
00490          {
00491             int availableToFind = selectrval;
00492             
00493             // reset the timeout counter
00494             timer.resetOnLoop();
00495 
00496             for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
00497             {
00498                if (!selObjs[i].readAvailable)
00499                {
00500                   continue;
00501                }
00502                else
00503                {
00504                   --availableToFind;
00505                }
00506                int streamIndex = inputIndexProcessIndex[i];
00507                UnnamedPipeRef readstream;
00508                if (processStates[streamIndex].outIsOpen)
00509                {
00510                   if (procs[streamIndex]->out()->getReadSelectObj() == selObjs[i].s)
00511                   {
00512                      readstream = procs[streamIndex]->out();
00513                   }
00514                }
00515 
00516                if (!readstream && processStates[streamIndex].errIsOpen)
00517                {
00518                   if (procs[streamIndex]->err()->getReadSelectObj() == selObjs[i].s)
00519                   {
00520                      readstream = procs[streamIndex]->err();
00521                   }
00522                }
00523 
00524                if (!readstream)
00525                {
00526                   continue; // for loop
00527                }
00528 
00529                char buff[1024];
00530                int readrc = readstream->read(buff, sizeof(buff) - 1);
00531                if (readrc == 0)
00532                {
00533                   if (readstream == procs[streamIndex]->out())
00534                   {
00535                      processStates[streamIndex].outIsOpen = false;
00536                      procs[streamIndex]->out()->close();
00537                   }
00538                   else
00539                   {
00540                      processStates[streamIndex].errIsOpen = false;
00541                      procs[streamIndex]->err()->close();
00542                   }
00543                   --numOpenPipes;
00544                }
00545                else if (readrc == -1)
00546                {
00547                   BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: read error");
00548                }
00549                else
00550                {
00551                   buff[readrc] = '\0';
00552                   output.handleData(
00553                      buff,
00554                      readrc,
00555                      readstream == procs[streamIndex]->out() ? E_STDOUT : E_STDERR,
00556                      procs[streamIndex],
00557                      streamIndex, inputs[streamIndex]);
00558                }
00559             }
00560 
00561             // handle stdin for all processes which have data to send to them.
00562             for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
00563             {
00564                if (!selObjs[i].writeAvailable)
00565                {
00566                   continue;
00567                }
00568                else
00569                {
00570                   --availableToFind;
00571                }
00572                int streamIndex = outputIndexProcessIndex[i];
00573                UnnamedPipeRef writestream;
00574                if (processStates[streamIndex].inIsOpen)
00575                {
00576                   writestream = procs[streamIndex]->in();
00577                }
00578 
00579                if (!writestream)
00580                {
00581                   continue; // for loop
00582                }
00583 
00584                size_t offset = inputs[streamIndex].size() - processStates[streamIndex].availableDataLen;
00585                int writerc = writestream->write(&inputs[streamIndex][offset], processStates[streamIndex].availableDataLen);
00586                if (writerc == -1 && errno == EPIPE)
00587                {
00588                   processStates[streamIndex].inIsOpen = false;
00589                   procs[streamIndex]->in()->close();
00590                }
00591                else if (writerc == -1)
00592                {
00593                   BLOCXX_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: write error");
00594                }
00595                else if (writerc != 0)
00596                {
00597                   inputs[streamIndex].erase(inputs[streamIndex].begin(), inputs[streamIndex].begin() + writerc);
00598                   input.getData(inputs[streamIndex], procs[streamIndex], streamIndex);
00599                   processStates[streamIndex].availableDataLen = inputs[streamIndex].size();
00600                }
00601             }
00602          }
00603          break;
00604       }
00605    }
00606 }
00607 
00608 void processInputOutput(const String& input, String& output, const ProcessRef& process, 
00609    const Timeout& timeout, int outputLimit)
00610 {
00611    Array<ProcessRef> procs;
00612    procs.push_back(process);
00613 
00614    StringOutputGatherer gatherer(output, outputLimit);
00615    SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(input);
00616    processInputOutput(gatherer, procs, singleStringInputCallback, timeout);
00617 }
00618 
00619 
00620 } // end namespace Exec
00621 
00622 } // end namespace BLOCXX_NAMESPACE
00623