diff options
Diffstat (limited to 'tests/websocketserverprocess.cpp')
-rw-r--r-- | tests/websocketserverprocess.cpp | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/tests/websocketserverprocess.cpp b/tests/websocketserverprocess.cpp new file mode 100644 index 0000000..89a50ee --- /dev/null +++ b/tests/websocketserverprocess.cpp @@ -0,0 +1,205 @@ +#include "websocketserverprocess.h" + +#include <boost/test/data/dataset.hpp> +#include <boost/test/data/monomorphic.hpp> +#include <boost/test/data/test_case.hpp> + +#include <boost/algorithm/string.hpp> +#include <boost/beast/core.hpp> +#include <boost/beast/http.hpp> +#include <boost/beast/websocket.hpp> +#include <boost/beast/websocket/ssl.hpp> +#include <boost/beast/ssl.hpp> +#include <boost/beast/version.hpp> +#include <boost/asio/buffer.hpp> +#include <boost/asio/buffers_iterator.hpp> +#include <boost/asio/connect.hpp> +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/ssl/error.hpp> +#include <boost/asio/ssl/stream.hpp> +#include <boost/property_tree/ptree.hpp> +#include <boost/property_tree/xml_parser.hpp> + +#include <fmt/core.h> + +#include <chrono> +#include <exception> +#include <filesystem> +#include <iostream> +#include <memory> +#include <mutex> +#include <sstream> +#include <stdexcept> +#include <string> +#include <thread> + +#include <ext/stdio_filebuf.h> +#include <signal.h> +#include <sys/wait.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/types.h> + +#include <libreichwein/file.h> +#include <libreichwein/process.h> + +#include "webserver.h" +#include "response.h" + +#include "helper.h" + +using namespace std::string_literals; +namespace fs = std::filesystem; +namespace pt = boost::property_tree; +using namespace boost::unit_test; +using namespace Reichwein; + +WebsocketServerProcess::WebsocketServerProcess() +{ + // RAII pattern for shared memory allocation/deallocation + m_shared = std::unique_ptr<shared_data_t, std::function<void(shared_data_t*)>>( + (shared_data_t*)mmap(NULL, sizeof(shared_data_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0), + [this](shared_data_t*){munmap(m_shared.get(), sizeof(shared_data_t));}); + start(); +} + +WebsocketServerProcess::~WebsocketServerProcess() +{ + stop(); +} + +void WebsocketServerProcess::do_session(boost::asio::ip::tcp::socket socket) +{ + try + { + // Construct the stream by moving in the socket + boost::beast::websocket::stream<boost::asio::ip::tcp::socket> ws{std::move(socket)}; + + // Set a decorator to change the Server of the handshake + ws.set_option(boost::beast::websocket::stream_base::decorator( + [](boost::beast::websocket::response_type& res) + { + res.set(boost::beast::http::field::server, + std::string("Reichwein.IT Test Websocket Server")); + })); + + boost::beast::http::request_parser<boost::beast::http::string_body> parser; + request_type req; + boost::beast::flat_buffer buffer; + + boost::beast::http::read(ws.next_layer(), buffer, parser); + req = parser.get(); + { + std::lock_guard lock{m_shared->mutex}; + strncpy(m_shared->subprotocol, std::string{req[http::field::sec_websocket_protocol]}.data(), sizeof(m_shared->subprotocol)); + strncpy(m_shared->target, std::string{req.target()}.data(), sizeof(m_shared->target)); + } + + ws.accept(req); + + for(;;) + { + boost::beast::flat_buffer buffer; + + ws.read(buffer); + + // Reply with <request>: <counter> + ws.text(ws.got_text()); + std::string data(boost::asio::buffers_begin(buffer.data()), boost::asio::buffers_end(buffer.data())); + data += ": " + std::to_string(m_count++); + buffer.consume(buffer.size()); + boost::beast::ostream(buffer) << data; + ws.write(buffer.data()); + } + } + catch(boost::beast::system_error const& se) + { + // This indicates that the session was closed + if(se.code() != boost::beast::websocket::error::closed) + std::cerr << "Error: " << se.code().message() << std::endl; + } + catch(std::exception const& e) + { + std::cerr << "Error: " << e.what() << std::endl; + } +} + +bool WebsocketServerProcess::is_running() +{ + if (m_pid == 0) + return false; + + return Reichwein::Process::is_running(m_pid); +} + +void WebsocketServerProcess::start() +{ + if (m_pid != 0) + throw std::runtime_error("Process already running, so it can't be started"); + + // connect stdout of new child process to stream of parent, via pipe + m_pid = fork(); + if (m_pid < 0) + throw std::runtime_error("Fork unsuccessful."); + + if (m_pid == 0) { // child process branch + try + { + auto const address = boost::asio::ip::make_address("::1"); + auto const port = static_cast<unsigned short>(8765); + + // The io_context is required for all I/O + boost::asio::io_context ioc{1}; + + // The acceptor receives incoming connections + boost::asio::ip::tcp::acceptor acceptor{ioc, {address, port}}; + for(;;) + { + // This will receive the new connection + boost::asio::ip::tcp::socket socket{ioc}; + + // Block until we get a connection + acceptor.accept(socket); + + // Launch the session, transferring ownership of the socket + std::thread( + &WebsocketServerProcess::do_session, this, + std::move(socket)).detach(); + } + } + catch (const std::exception& e) + { + std::cerr << "Error: " << e.what() << std::endl; + } + exit(0); + } + + wait_for_pid_listening_on(m_pid, 8765); +} + +void WebsocketServerProcess::stop() +{ + if (!is_running()) + throw std::runtime_error("Process not running, so it can't be stopped"); + + if (kill(m_pid, SIGTERM) != 0) + throw std::runtime_error("Unable to kill process"); + + if (int result = waitpid(m_pid, NULL, 0); result != m_pid) + throw std::runtime_error("waitpid returned "s + std::to_string(result)); + + m_pid = 0; +} + +std::string WebsocketServerProcess::subprotocol() +{ + std::lock_guard lock{m_shared->mutex}; + return m_shared->subprotocol; +} + +std::string WebsocketServerProcess::target() +{ + std::lock_guard lock{m_shared->mutex}; + return m_shared->target; +} + |