diff --git a/.gitignore b/.gitignore index 42bc9e8..12a8b76 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,5 @@ .DS_Store ./doc-gen -__pycache__/ \ No newline at end of file +__pycache__/ +./runme \ No newline at end of file diff --git a/README.md b/README.md index 2becdc5..66510ad 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,19 @@ All tests pass on linux & mac. Most pass under mingw & MSVC. # Changelog +# 0.6.0 TBA + +**Breaking Changes** + +- `[[nodiscard]]`` added to pipe_x functions. Most likely an error on your part + if the return value is ignored. + +**Non-breaking changes** +- Fixed: threads ignoring return value of pipe_write and never terminating +- Changed: internal threads block SIGPIPE on a thread basis. Threads terminate + and close pipes as needed on pipe errors. This breaks the pipe chains which + prevents potential hangs with deep pipe chains. + # 0.5.0 2025-12-09 **Breaking Changes** diff --git a/src/cpp/subprocess.hpp b/src/cpp/subprocess.hpp index 3221cb0..8b41e67 100644 --- a/src/cpp/subprocess.hpp +++ b/src/cpp/subprocess.hpp @@ -4,4 +4,20 @@ #include "subprocess/pipe.hpp" #include "subprocess/ProcessBuilder.hpp" #include "subprocess/shell_utils.hpp" -#include "subprocess/environ.hpp" \ No newline at end of file +#include "subprocess/environ.hpp" + +#ifdef SUBPROCESS_AMALGAMATE_SOURCES +/* To regen: + (for file in *.cpp; do; echo '#include "'subprocess/$file'"'; done;) | sort +*/ +#include "subprocess/CowData.cpp" +#include "subprocess/environ.cpp" +#include "subprocess/pipe.cpp" +#include "subprocess/PipeVar.cpp" +#include "subprocess/ProcessBuilder.cpp" +#include "subprocess/ProcessBuilder_posix.cpp" +#include "subprocess/ProcessBuilder_windows.cpp" +#include "subprocess/shell_utils.cpp" +#include "subprocess/utf8_to_utf16.cpp" + +#endif \ No newline at end of file diff --git a/src/cpp/subprocess/ProcessBuilder.cpp b/src/cpp/subprocess/ProcessBuilder.cpp index eb889e7..06bc713 100644 --- a/src/cpp/subprocess/ProcessBuilder.cpp +++ b/src/cpp/subprocess/ProcessBuilder.cpp @@ -45,6 +45,25 @@ namespace subprocess { message += std::strerror(errno_code); throw OSError(message); } + struct NoSigPipe { + #ifndef _WIN32 + NoSigPipe() { + // get the current state + sigprocmask(SIG_BLOCK, NULL, &old_state); + + sigset_t set = old_state; + sigaddset(&set, SIGPIPE); + sigprocmask(SIG_BLOCK, &set, NULL); + } + + ~NoSigPipe() { + sigprocmask(SIG_BLOCK, &old_state, NULL); + } + + private: + sigset_t old_state; + #endif + }; } double monotonic_seconds() { static bool needs_init = true; @@ -91,6 +110,7 @@ namespace subprocess { }; std::thread pipe_thread(PipeHandle input, std::ostream* output) { return std::thread([=]() { + details::NoSigPipe noSigPipe; AutoClosePipe autoclose(input); std::vector buffer(2048); while (true) { @@ -102,58 +122,83 @@ namespace subprocess { }); } + [[nodiscard]] + static ssize_t fwrite_fully(FILE* output, const void* buffer, size_t size) { + const uint8_t* cursor = reinterpret_cast(buffer); + ssize_t total = 0; + while (total < size) { + auto transferred = fwrite(cursor, 1, size - total, output); + if (transferred == 0) + break; + cursor += transferred; + total += transferred; + } + return total; + } + std::thread pipe_thread(PipeHandle input, FILE* output) { return std::thread([=]() { + details::NoSigPipe noSigPipe; AutoClosePipe autoclose(input); std::vector buffer(2048); while (true) { - ssize_t transfered = pipe_read(input, &buffer[0], buffer.size()); - if (transfered <= 0) + ssize_t transferred = pipe_read(input, &buffer[0], buffer.size()); + if (transferred <= 0) + break; + transferred = fwrite_fully(output, &buffer[0], transferred); + if (transferred <= 0) break; - fwrite(&buffer[0], 1, transfered, output); } }); } + std::thread pipe_thread(FILE* input, PipeHandle output) { return std::thread([=]() { + details::NoSigPipe noSigPipe; AutoClosePipe autoclose(output); std::vector buffer(2048); while (true) { - ssize_t transfered = fread(&buffer[0], 1, buffer.size(), input); - if (transfered <= 0) + ssize_t transferred = fread(&buffer[0], 1, buffer.size(), input); + if (transferred <= 0) + break; + transferred = pipe_write_fully(output, &buffer[0], transferred); + if (transferred <= 0) break; - pipe_write(output, &buffer[0], transfered); } }); } std::thread pipe_thread(std::string& input, PipeHandle output) { return std::thread([input(move(input)), output]() { + details::NoSigPipe noSigPipe; AutoClosePipe autoclose(output); std::size_t pos = 0; while (pos < input.size()) { - ssize_t transfered = pipe_write(output, input.c_str()+pos, input.size() - pos); - if (transfered <= 0) + ssize_t transferred = pipe_write_fully(output, input.c_str()+pos, input.size() - pos); + if (transferred <= 0) break; - pos += transfered; + pos += transferred; } }); } std::thread pipe_thread(std::istream* input, PipeHandle output) { return std::thread([=]() { + details::NoSigPipe noSigPipe; AutoClosePipe autoclose(output); std::vector buffer(2048); while (true) { input->read(&buffer[0], buffer.size()); - ssize_t transfered = input->gcount(); + ssize_t transferred = input->gcount(); if (input->bad()) break; - if (transfered <= 0) { + if (transferred <= 0) { if (input->eof()) break; continue; } - pipe_write(output, &buffer[0], transfered); + transferred = pipe_write_fully(output, &buffer[0], transferred); + if (transferred <= 0) + break; } }); } @@ -279,6 +324,7 @@ namespace subprocess { Popen::~Popen() { close(); } + void Popen::close() { if (cin_thread.joinable()) cin_thread.join(); diff --git a/src/cpp/subprocess/ProcessBuilder.hpp b/src/cpp/subprocess/ProcessBuilder.hpp index 3c9ded1..ae9c11a 100644 --- a/src/cpp/subprocess/ProcessBuilder.hpp +++ b/src/cpp/subprocess/ProcessBuilder.hpp @@ -185,7 +185,7 @@ namespace subprocess { /** equivalent to send_signal(SIGKILL) */ bool kill(); - /** Destructs the object and initializes to basic state */ + /** Destructs the object and initializes to base state */ void close(); /** Closes the cin pipe */ void close_cin() { diff --git a/src/cpp/subprocess/pipe.cpp b/src/cpp/subprocess/pipe.cpp index a55affd..d89803c 100644 --- a/src/cpp/subprocess/pipe.cpp +++ b/src/cpp/subprocess/pipe.cpp @@ -319,6 +319,23 @@ namespace subprocess { return second + 1; } + ssize_t pipe_write_fully(PipeHandle handle, const void* buffer, size_t size) { + ssize_t transferred = 0; + ssize_t total = 0; + const uint8_t* cursor = reinterpret_cast(buffer); + while (total < size) { + ssize_t transferred = pipe_write(handle, cursor, size - total); + if (transferred < 0) + return -total - 1; + if (transferred == 0) + break; + cursor += transferred; + total += transferred; + } + + return total; + } + PipeHandle pipe_file(const char* filename, const char* mode) { using std::strchr; #ifdef _WIN32 diff --git a/src/cpp/subprocess/pipe.hpp b/src/cpp/subprocess/pipe.hpp index d6eb6f0..2fb666a 100644 --- a/src/cpp/subprocess/pipe.hpp +++ b/src/cpp/subprocess/pipe.hpp @@ -47,6 +47,7 @@ namespace subprocess { }; /** Peak into how many bytes available in pipe to read. */ + [[nodiscard]] ssize_t pipe_peak_bytes(PipeHandle pipe); /** Closes a pipe handle. @@ -76,6 +77,7 @@ namespace subprocess { to make both ends to be inheritble at creation then you likely have a bug. */ + [[nodiscard]] PipePair pipe_create(bool inheritable = false); /** Set the pipe to be inheritable or not for subprocess. @@ -90,13 +92,24 @@ namespace subprocess { @returns -1 on error. if 0 it could be the end, or perhaps wait for more data. */ + [[nodiscard]] ssize_t pipe_read(PipeHandle, void* buffer, size_t size); + /** @returns -1 on error. if 0 it could be full, or perhaps wait for more data. */ + [[nodiscard]] ssize_t pipe_write(PipeHandle, const void* buffer, size_t size); + /** Like pipe_write but keep writing while return value is > 0. + + @returns bytes written on success, (-total_transferred - 1) on error, + if 0 it could be full, + */ + [[nodiscard]] + ssize_t pipe_write_fully(PipeHandle, const void* buffer, size_t size); + /** Sets the blocking bit. The handle state is first queried as to only change the blocking bit. @@ -120,6 +133,7 @@ namespace subprocess { @return all data read from pipe as a string object. This works fine with binary data. */ + [[nodiscard]] std::string pipe_read_all(PipeHandle handle); /** Waits for the pipes to be change state. @@ -133,12 +147,14 @@ namespace subprocess { @param seconds The timeout in seconds to wait for. -1 for indefinate */ + [[nodiscard]] int pipe_wait_for_read( PipeHandle pipe, double seconds ); /** Will read up to size and not block until buffer is filled. */ + [[nodiscard]] ssize_t pipe_read_some(PipeHandle, void* buffer, size_t size); /** Opens a file and returns the handle. @@ -157,6 +173,7 @@ namespace subprocess { @returns the handle to the opened file, or kBadPipeValue on error */ + [[nodiscard]] PipeHandle pipe_file(const char* filename, const char* mode); #if 0 diff --git a/test/basic_test.cpp b/test/basic_test.cpp index ad35173..384746a 100644 --- a/test/basic_test.cpp +++ b/test/basic_test.cpp @@ -1,5 +1,6 @@ #include #include +#include #include @@ -107,7 +108,8 @@ class BasicSuite : public CxxTest::TestSuite { // use thread to prevent blocking in case the OS desides to wait on read std::thread thread([&] { subprocess::sleep_seconds(1); - subprocess::pipe_write(pipe.output, str.data(), str.size()); + auto transferred = subprocess::pipe_write(pipe.output, str.data(), str.size()); + TS_ASSERT_EQUALS(transferred, str.size()); }); std::vector text; text.resize(32); @@ -442,7 +444,8 @@ class BasicSuite : public CxxTest::TestSuite { std::string str = "hello world"; std::thread thread([&] { subprocess::sleep_seconds(0.1); - pipe_write(pipe.output, str.data(), str.size()); + auto transferred = pipe_write(pipe.output, str.data(), str.size()); + TS_ASSERT_EQUALS(transferred, str.size()); }); std::vector buffer; buffer.resize(1024); @@ -511,6 +514,83 @@ class BasicSuite : public CxxTest::TestSuite { TS_ASSERT_EQUALS(str, str2); } + void testWriteClosed() { + subprocess::EnvGuard guard; + prepend_this_to_path(); + + class DevZeroStreamBuf : public std::streambuf { + public: + using Super = std::streambuf; + using int_type = typename Super::int_type; + using char_type = std::streambuf::char_type; + using traits_type = Super::traits_type; + void close() { + open = false; + } + protected: + int_type underflow() override { + return open? traits_type::to_int_type('\0') : traits_type::eof(); + } + + std::streamsize xsgetn(char_type* s, std::streamsize n) override { + if (!open) + return 0; + std::memset(s, 0, n * sizeof(char_type)); + return n; + } + + int_type overflow(int_type c) override { + return open? traits_type::not_eof(c) : traits_type::eof(); + } + + std::streamsize xsputn(const char_type* s, std::streamsize n) override { + return open? n : 0; + } + + bool open = true; + }; + DevZeroStreamBuf zero; + std::istream stream(&zero); + + bool write_failed = false; + std::vector order; + std::mutex mutex; + + subprocess::Popen echo = RunBuilder({"echo", "hello", "world"}) + .cin(&stream) + .cout(PipeOption::pipe).popen(); + + /* this thread should start after echo is launched in case it takes a while + for echo to start. + */ + std::thread thread([&] { + subprocess::sleep_seconds(1); + std::unique_lock lock(mutex); + order.push_back(1); + zero.close(); + }); + + std::vector buffer; + buffer.resize(64); + auto transferred = subprocess::pipe_read(echo.cout, &buffer[0], buffer.size()); + std::string expected = "hello world" EOL; + TS_ASSERT_EQUALS(transferred, expected.size()); + echo.close(); + std::unique_lock lock(mutex); + order.push_back(0); + lock.unlock(); + if (thread.joinable()) + thread.join(); + /* on failure the internal thread will never close until the read end + closes which will result in order to be 1, 0. + */ + TS_ASSERT_EQUALS(order.size(), 2); + TS_ASSERT_EQUALS(order[0], 0); + TS_ASSERT_EQUALS(order[1], 1); + std::string cout = &buffer[0]; + TS_ASSERT_EQUALS(cout, expected); + } + /* void tesxtCat() { diff --git a/test/examples.cpp b/test/examples.cpp index 0dcb1e7..2dd6d4e 100644 --- a/test/examples.cpp +++ b/test/examples.cpp @@ -56,7 +56,7 @@ void popen_examples() { Popen popen = subprocess::RunBuilder({"echo", "hello", "world"}) .cout(PipeOption::pipe).popen(); char buf[1024] = {0}; // initializes everything to 0 - subprocess::pipe_read(popen.cout, buf, 1024); + auto ignore = subprocess::pipe_read(popen.cout, buf, 1024); std::cout << buf; // the destructor will call wait on your behalf. popen.close(); @@ -70,7 +70,7 @@ void popen_examples() { you provide buffers for cin, internally the library spins it's own thread. */ std::thread write_thread([&]() { - subprocess::pipe_write(popen.cin, "hello world\n", std::strlen("hello world\n")); + auto ignore = subprocess::pipe_write(popen.cin, "hello world\n", std::strlen("hello world\n")); // no more data to send. If we don't close we may run into a deadlock as // we are looking to read for more. popen.close_cin(); @@ -79,7 +79,7 @@ void popen_examples() { for (auto& c : buf) c = 0; - subprocess::pipe_read(popen.cout, buf, 1024); + ignore = subprocess::pipe_read(popen.cout, buf, 1024); std::cout << buf; popen.close(); if (write_thread.joinable())