From 702d32b41c1c4f496dba046c2017cb5b907e55cd Mon Sep 17 00:00:00 2001 From: Roland Reichwein Date: Thu, 12 Jan 2023 20:00:40 +0100 Subject: FCGI test --- tests/websocketserverprocess.cpp | 205 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 tests/websocketserverprocess.cpp (limited to 'tests/websocketserverprocess.cpp') diff --git a/tests/websocketserverprocess.cpp b/tests/websocketserverprocess.cpp new file mode 100644 index 0000000..89a50ee --- /dev/null +++ b/tests/websocketserverprocess.cpp @@ -0,0 +1,205 @@ +#include "websocketserverprocess.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "webserver.h" +#include "response.h" + +#include "helper.h" + +using namespace std::string_literals; +namespace fs = std::filesystem; +namespace pt = boost::property_tree; +using namespace boost::unit_test; +using namespace Reichwein; + +WebsocketServerProcess::WebsocketServerProcess() +{ + // RAII pattern for shared memory allocation/deallocation + m_shared = std::unique_ptr>( + (shared_data_t*)mmap(NULL, sizeof(shared_data_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0), + [this](shared_data_t*){munmap(m_shared.get(), sizeof(shared_data_t));}); + start(); +} + +WebsocketServerProcess::~WebsocketServerProcess() +{ + stop(); +} + +void WebsocketServerProcess::do_session(boost::asio::ip::tcp::socket socket) +{ + try + { + // Construct the stream by moving in the socket + boost::beast::websocket::stream ws{std::move(socket)}; + + // Set a decorator to change the Server of the handshake + ws.set_option(boost::beast::websocket::stream_base::decorator( + [](boost::beast::websocket::response_type& res) + { + res.set(boost::beast::http::field::server, + std::string("Reichwein.IT Test Websocket Server")); + })); + + boost::beast::http::request_parser parser; + request_type req; + boost::beast::flat_buffer buffer; + + boost::beast::http::read(ws.next_layer(), buffer, parser); + req = parser.get(); + { + std::lock_guard lock{m_shared->mutex}; + strncpy(m_shared->subprotocol, std::string{req[http::field::sec_websocket_protocol]}.data(), sizeof(m_shared->subprotocol)); + strncpy(m_shared->target, std::string{req.target()}.data(), sizeof(m_shared->target)); + } + + ws.accept(req); + + for(;;) + { + boost::beast::flat_buffer buffer; + + ws.read(buffer); + + // Reply with : + ws.text(ws.got_text()); + std::string data(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data())); + data += ": " + std::to_string(m_count++); + buffer.consume(buffer.size()); + boost::beast::ostream(buffer) << data; + ws.write(buffer.data()); + } + } + catch(boost::beast::system_error const& se) + { + // This indicates that the session was closed + if(se.code() != boost::beast::websocket::error::closed) + std::cerr << "Error: " << se.code().message() << std::endl; + } + catch(std::exception const& e) + { + std::cerr << "Error: " << e.what() << std::endl; + } +} + +bool WebsocketServerProcess::is_running() +{ + if (m_pid == 0) + return false; + + return Reichwein::Process::is_running(m_pid); +} + +void WebsocketServerProcess::start() +{ + if (m_pid != 0) + throw std::runtime_error("Process already running, so it can't be started"); + + // connect stdout of new child process to stream of parent, via pipe + m_pid = fork(); + if (m_pid < 0) + throw std::runtime_error("Fork unsuccessful."); + + if (m_pid == 0) { // child process branch + try + { + auto const address = boost::asio::ip::make_address("::1"); + auto const port = static_cast(8765); + + // The io_context is required for all I/O + boost::asio::io_context ioc{1}; + + // The acceptor receives incoming connections + boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}}; + for(;;) + { + // This will receive the new connection + boost::asio::ip::tcp::socket socket{ioc}; + + // Block until we get a connection + acceptor.accept(socket); + + // Launch the session, transferring ownership of the socket + std::thread( + &WebsocketServerProcess::do_session, this, + std::move(socket)).detach(); + } + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << std::endl; + } + exit(0); + } + + wait_for_pid_listening_on(m_pid, 8765); +} + +void WebsocketServerProcess::stop() +{ + if (!is_running()) + throw std::runtime_error("Process not running, so it can't be stopped"); + + if (kill(m_pid, SIGTERM) != 0) + throw std::runtime_error("Unable to kill process"); + + if (int result = waitpid(m_pid, NULL, 0); result != m_pid) + throw std::runtime_error("waitpid returned "s + std::to_string(result)); + + m_pid = 0; +} + +std::string WebsocketServerProcess::subprotocol() +{ + std::lock_guard lock{m_shared->mutex}; + return m_shared->subprotocol; +} + +std::string WebsocketServerProcess::target() +{ + std::lock_guard lock{m_shared->mutex}; + return m_shared->target; +} + -- cgit v1.2.3