From 9e635d9b19e72eefef082dd8071d3e4c9d6cfab1 Mon Sep 17 00:00:00 2001 From: Roland Reichwein Date: Thu, 7 May 2020 19:24:45 +0200 Subject: Separated out TCP socket class --- TODO | 1 + plugins/fcgi/Makefile | 4 +- plugins/fcgi/fcgi.cpp | 207 ++++++++++++++++++++---------------------------- plugins/fcgi/fcgi.h | 35 +------- plugins/fcgi/fcgiid.cpp | 19 +++++ plugins/fcgi/fcgiid.h | 18 +++++ plugins/fcgi/socket.cpp | 121 ++++++++++++++++++++++++++++ plugins/fcgi/socket.h | 70 ++++++++++++++++ 8 files changed, 324 insertions(+), 151 deletions(-) create mode 100644 plugins/fcgi/fcgiid.cpp create mode 100644 plugins/fcgi/fcgiid.h create mode 100644 plugins/fcgi/socket.cpp create mode 100644 plugins/fcgi/socket.h diff --git a/TODO b/TODO index 2fb3aa4..8a9d15b 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,4 @@ +fcgi: via unix domain socket Speed up config.GetPath weblog: link consistency check (cron?) Debian: keyring package diff --git a/plugins/fcgi/Makefile b/plugins/fcgi/Makefile index bb54d7c..c2789d8 100644 --- a/plugins/fcgi/Makefile +++ b/plugins/fcgi/Makefile @@ -57,7 +57,9 @@ LIBS+= \ endif PROGSRC=\ - fcgi.cpp + fcgi.cpp \ + fcgiid.cpp \ + socket.cpp TESTSRC=\ test-webserver.cpp \ diff --git a/plugins/fcgi/fcgi.cpp b/plugins/fcgi/fcgi.cpp index 464ba75..4ff8253 100644 --- a/plugins/fcgi/fcgi.cpp +++ b/plugins/fcgi/fcgi.cpp @@ -1,6 +1,7 @@ #include "fcgi.h" #include "fastcgi.h" +#include "socket.h" #include #include @@ -343,149 +344,119 @@ std::string fcgi_plugin::fcgiQuery(FCGIContext& context) std::unordered_map app_values; // will be read by FCGI_GET_VALUES - size_t pos { app_addr.find(':') }; - if (pos != app_addr.npos) { // tcp socket: host:port - auto endpoints{m_resolver.resolve(app_addr.substr(0, pos), app_addr.substr(pos + 1))}; - bool opening{false}; - - std::lock_guard socket_lock{m_socket_mutex}; + auto it {m_sockets.find(app_addr)}; - auto it {m_sockets.find(app_addr)}; + std::shared_ptr socket; - std::pair::iterator, bool> it2{m_sockets.end(), false}; - if (it == m_sockets.end()) - it2 = m_sockets.emplace(app_addr, m_io_context); // add new element if necessary + if (it == m_sockets.end()) { // add new element + socket = m_socket_factory.create(app_addr); - boost::asio::ip::tcp::socket& socket { it2.second ? it2.first->second : it->second }; // use just added element or previously found one + if (!socket) { + std::cerr << "FCGI Error: Invalid app_addr." << std::endl; + return HttpStatus("500", "FCGI configuration", context.SetResponseHeader); + } - socket.close(); // TODO: Bug workaround: Keeping TCP socket open doesn't work for now + m_sockets[app_addr] = socket; - if (!socket.is_open()) { - std::cout << "FCGI: Opening new socket" << std::endl; + } else { // use already existing element + socket = it->second; + } - boost::asio::connect(socket, endpoints); + bool opening{false}; + + std::lock_guard socket_lock{socket->getMutex()}; - boost::asio::socket_base::keep_alive keepAlive(true); - socket.set_option(keepAlive); + socket->close(); // TODO: Bug workaround: Keeping TCP socket open doesn't work for now - struct timeval tv; - tv.tv_sec = 0; // infinite - tv.tv_usec = 0; - if (setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) - std::cerr << "FCGI Error: SO_RCVTIMEO" << std::endl; + if (!socket->is_open()) { + std::cout << "FCGI: Opening new socket" << std::endl; - if (setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))) - std::cerr << "FCGI Error: SO_SNDTIMEO" << std::endl; + socket->open(); + opening = true; + } - int val{1}; - if (setsockopt(socket.native_handle(), SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val))) - std::cerr << "FCGI Error: SO_KEEPALIVE" << std::endl; + if (!socket->is_open()) { + return HttpStatus("500", "FCGI connection", context.SetResponseHeader); + } - opening = true; - } + FCGI_ID_Guard id_guard(socket->fcgi_id()); + uint16_t id {id_guard.getID()}; - if (!socket.is_open()) { - return HttpStatus("500", "FCGI connection", context.SetResponseHeader); + try { + if (opening) { + FCGI_Record get_values{FCGI_GET_VALUES, 0, system_config_bytes}; + if (socket->write(get_values.getBuffer()) != get_values.getBuffer().size()) + std::cerr << "Warning: Not all bytes written 1" << std::endl; } - FCGI_ID_Guard id_guard(m_fcgi_id); - uint16_t id {id_guard.getID()}; - - try { - if (opening) { - FCGI_Record get_values{FCGI_GET_VALUES, 0, system_config_bytes}; - if (socket.write_some(boost::asio::buffer(get_values.getBuffer())) != get_values.getBuffer().size()) - std::cerr << "Warning: Not all bytes written 1" << std::endl; - } - - FCGI_Record begin_request{FCGI_BEGIN_REQUEST, id, FCGI_RESPONDER, FCGI_KEEP_CONN}; - if (socket.write_some(boost::asio::buffer(begin_request.getBuffer())) != begin_request.getBuffer().size()) - std::cerr << "Warning: Not all bytes written 3" << std::endl; + FCGI_Record begin_request{FCGI_BEGIN_REQUEST, id, FCGI_RESPONDER, FCGI_KEEP_CONN}; + if (socket->write(begin_request.getBuffer()) != begin_request.getBuffer().size()) + std::cerr << "Warning: Not all bytes written 3" << std::endl; - FCGI_Record params{FCGI_PARAMS, id, env_bytes}; - if (socket.write_some(boost::asio::buffer(params.getBuffer())) != params.getBuffer().size()) - std::cerr << "Warning: Not all bytes written 4" << std::endl; + FCGI_Record params{FCGI_PARAMS, id, env_bytes}; + if (socket->write(params.getBuffer()) != params.getBuffer().size()) + std::cerr << "Warning: Not all bytes written 4" << std::endl; - if (env_bytes.size()) { - FCGI_Record params_end{FCGI_PARAMS, id, std::string{}}; - if (socket.write_some(boost::asio::buffer(params_end.getBuffer())) != params_end.getBuffer().size()) - std::cerr << "Warning: Not all bytes written 5" << std::endl; - } + if (env_bytes.size()) { + FCGI_Record params_end{FCGI_PARAMS, id, std::string{}}; + if (socket->write(params_end.getBuffer()) != params_end.getBuffer().size()) + std::cerr << "Warning: Not all bytes written 5" << std::endl; + } - std::string body {context.GetRequestParam("body")}; - FCGI_Record stdin_{FCGI_STDIN, id, body}; - if (socket.write_some(boost::asio::buffer(stdin_.getBuffer())) != stdin_.getBuffer().size()) - std::cerr << "Warning: Not all bytes written 6" << std::endl; - - if (body.size()) { - FCGI_Record stdin_end{FCGI_STDIN, id, std::string{}}; - if (socket.write_some(boost::asio::buffer(stdin_end.getBuffer())) != stdin_end.getBuffer().size()) - std::cerr << "Warning: Not all bytes written 7" << std::endl; - } - } catch (const boost::system::system_error& ex) { - if (ex.code() == boost::asio::error::eof) { - std::cerr << "FCGI Error: EOF on write" << std::endl; // seems to be ok here - return HttpStatus("500", "FCGI connection: EOF on write", context.SetResponseHeader); - } + std::string body {context.GetRequestParam("body")}; + FCGI_Record stdin_{FCGI_STDIN, id, body}; + if (socket->write(stdin_.getBuffer()) != stdin_.getBuffer().size()) + std::cerr << "Warning: Not all bytes written 6" << std::endl; + + if (body.size()) { + FCGI_Record stdin_end{FCGI_STDIN, id, std::string{}}; + if (socket->write(stdin_end.getBuffer()) != stdin_end.getBuffer().size()) + std::cerr << "Warning: Not all bytes written 7" << std::endl; } + } catch (const fcgi_eof_error&) { + std::cerr << "FCGI Error: EOF on write" << std::endl; // seems to be ok here + return HttpStatus("500", "FCGI connection: EOF on write", context.SetResponseHeader); + } #if 0 - FCGI_Record data{FCGI_DATA, id, std::string{}}; - if (socket.write_some(boost::asio::buffer(data.getBuffer())) != data.getBuffer().size()) - std::cerr << "Warning: Not all bytes written 8" << std::endl; + FCGI_Record data{FCGI_DATA, id, std::string{}}; + if (socket->write(data.getBuffer()) != data.getBuffer().size()) + std::cerr << "Warning: Not all bytes written 8" << std::endl; #endif - bool ended{false}; - std::vector inbuf; - std::vector inbuf_part(1024); - while (!ended) { + bool ended{false}; + std::vector inbuf; + std::vector inbuf_part(1024); + while (!ended) { + try { + size_t got {socket->read(inbuf_part)}; + inbuf.insert(inbuf.end(), inbuf_part.begin(), inbuf_part.begin() + got); + } catch (const fcgi_eof_error&) { + //std::cerr << "FCGI Warning: Early EOF" << std::endl; // seems to be ok here + ended = true; + //return HttpStatus("500", "FCGI connection: EOF on read", context.SetResponseHeader); + } + + while (inbuf.size() > 0) { + try { - size_t got {socket.read_some(boost::asio::buffer(inbuf_part))}; - inbuf.insert(inbuf.end(), inbuf_part.begin(), inbuf_part.begin() + got); - } catch (const boost::system::system_error& ex) { - if (ex.code() == boost::asio::error::eof) { - //std::cerr << "FCGI Warning: Early EOF" << std::endl; // seems to be ok here - ended = true; - //return HttpStatus("500", "FCGI connection: EOF on read", context.SetResponseHeader); - } else { - std::cerr << "FCGI Warning: Expected EOF, got " << ex.code() << ", " << ex.what() << std::endl; + FCGI_Record r{inbuf}; + if (r.getType() == FCGI_END_REQUEST) { ended = true; - } - } - - while (inbuf.size() > 0) { - - try { - FCGI_Record r{inbuf}; - if (r.getType() == FCGI_END_REQUEST) { - ended = true; - } else if (r.getType() == FCGI_STDOUT) { - output_data += r.getContent(); - } else if (r.getType() == FCGI_STDERR) { - std::cerr << "FCGI STDERR: " << r.getContent() << std::endl; - } else if (r.getType() == FCGI_GET_VALUES_RESULT) { - FCGI_DecodeEnv(r.getContent(), app_values); - DumpAppValues(app_values); - } else - throw std::runtime_error("Unhandled FCGI type: "s + std::to_string(r.getType())); - } catch (const std::length_error& ex) { - // ignore if not enough data available yet - break; - } + } else if (r.getType() == FCGI_STDOUT) { + output_data += r.getContent(); + } else if (r.getType() == FCGI_STDERR) { + std::cerr << "FCGI STDERR: " << r.getContent() << std::endl; + } else if (r.getType() == FCGI_GET_VALUES_RESULT) { + FCGI_DecodeEnv(r.getContent(), app_values); + DumpAppValues(app_values); + } else + throw std::runtime_error("Unhandled FCGI type: "s + std::to_string(r.getType())); + } catch (const std::length_error& ex) { + // ignore if not enough data available yet + break; } } - - } else if (fs::is_socket(fs::path{app_addr})) { // Unix domain socket - // TODO - std::cerr << "FCGI Error: Unix domain sockets not yet implemented." << std::endl; - return HttpStatus("500", "FCGI configuration", context.SetResponseHeader); - } else if (fs::is_regular_file(fs::path{app_addr})) { // Executable to start - // TODO - std::cerr << "FCGI Error: Executable FCGI not yet implemented." << std::endl; - return HttpStatus("500", "FCGI configuration", context.SetResponseHeader); - } else { - std::cerr << "FCGI Error: Invalid app_addr type." << std::endl; - return HttpStatus("500", "FCGI configuration", context.SetResponseHeader); } std::istringstream is_out{output_data}; @@ -532,8 +503,6 @@ std::string fcgi_plugin::name() } fcgi_plugin::fcgi_plugin() - : m_io_context() - , m_resolver(m_io_context) { //std::cout << "Plugin constructor" << std::endl; } diff --git a/plugins/fcgi/fcgi.h b/plugins/fcgi/fcgi.h index 4f77719..289c4d6 100644 --- a/plugins/fcgi/fcgi.h +++ b/plugins/fcgi/fcgi.h @@ -2,38 +2,14 @@ #include "../../plugin_interface.h" +#include "socket.h" + #include #include #include #include -// TODO: multithreading -class FCGI_ID -{ - std::setm_unused; - uint16_t m_current_max{}; - -public: - FCGI_ID(){} - - // starting at 1 - uint16_t getID(){ - if (m_unused.empty()) { - m_current_max++; - return m_current_max; - } else { - uint16_t result{*m_unused.begin()}; - m_unused.erase(m_unused.begin()); - return result; - } - } - - void putID(uint16_t id){ - m_unused.insert(id); - } -}; - // automatically reserves ID, and releases it via RAII class FCGI_ID_Guard { @@ -57,12 +33,9 @@ struct FCGIContext; class fcgi_plugin: public webserver_plugin_interface { - FCGI_ID m_fcgi_id; - boost::asio::io_context m_io_context; - boost::asio::ip::tcp::resolver m_resolver; + SocketFactory m_socket_factory; - std::mutex m_socket_mutex; // guard m_socket use in different threads - std::unordered_map m_sockets; + std::unordered_map> m_sockets; public: fcgi_plugin(); diff --git a/plugins/fcgi/fcgiid.cpp b/plugins/fcgi/fcgiid.cpp new file mode 100644 index 0000000..778cbc9 --- /dev/null +++ b/plugins/fcgi/fcgiid.cpp @@ -0,0 +1,19 @@ +#include "fcgiid.h" + + // starting at 1 +uint16_t FCGI_ID::getID() +{ + if (m_unused.empty()) { + m_current_max++; + return m_current_max; + } else { + uint16_t result{*m_unused.begin()}; + m_unused.erase(m_unused.begin()); + return result; + } +} + +void FCGI_ID::putID(uint16_t id) +{ + m_unused.insert(id); +} diff --git a/plugins/fcgi/fcgiid.h b/plugins/fcgi/fcgiid.h new file mode 100644 index 0000000..e3649d7 --- /dev/null +++ b/plugins/fcgi/fcgiid.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +class FCGI_ID +{ + std::setm_unused; + uint16_t m_current_max{}; + +public: + FCGI_ID(){} + + uint16_t getID(); + void putID(uint16_t id); +}; + + diff --git a/plugins/fcgi/socket.cpp b/plugins/fcgi/socket.cpp new file mode 100644 index 0000000..0a2a381 --- /dev/null +++ b/plugins/fcgi/socket.cpp @@ -0,0 +1,121 @@ +#include "socket.h" + +#include +#include + +namespace fs = std::filesystem; +using namespace std::string_literals; + +std::mutex& Socket::getMutex() +{ + return m_mutex; +} + +FCGI_ID& Socket::fcgi_id() +{ + return m_fcgi_id; +} + +SocketFactory::SocketFactory() + : m_io_context() +{ +} + +std::shared_ptr SocketFactory::create(const std::string& app_addr) +{ + size_t pos { app_addr.find(':') }; + if (pos != app_addr.npos) { // tcp socket: host:port + + return std::make_shared(app_addr.substr(0, pos), app_addr.substr(pos + 1), m_io_context); + + } else if (fs::is_socket(fs::path{app_addr})) { // Unix domain socket + // TODO + std::cerr << "FCGI Error: Unix domain sockets not yet implemented." << std::endl; + } else if (fs::is_regular_file(fs::path{app_addr})) { // Executable to start + // TODO + std::cerr << "FCGI Error: Executable FCGI not yet implemented." << std::endl; + } else { + std::cerr << "FCGI Error: Invalid app_addr type." << std::endl; + } + + return {}; +} + +TCPSocket::TCPSocket(const std::string& host, const std::string& port, boost::asio::io_context& io_context) + : m_io_context(io_context) + , m_host(host) + , m_port(port) + , m_socket(io_context) +{ +} + +TCPSocket::~TCPSocket() +{ +} + +void TCPSocket::open() +{ + boost::asio::ip::tcp::resolver resolver(m_io_context); + auto endpoints{resolver.resolve(m_host, m_port)}; + try { + boost::asio::connect(m_socket, endpoints); + } catch(const std::exception& ex) { + std::cerr << "FCGI Error: Error on connecting to " << m_host << ":" << m_port << ": " << ex.what() << std::endl; + return; + } + + boost::asio::socket_base::keep_alive keepAlive(true); + m_socket.set_option(keepAlive); + + struct timeval tv; + tv.tv_sec = 0; // infinite + tv.tv_usec = 0; + if (setsockopt(m_socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) + std::cerr << "FCGI Error: SO_RCVTIMEO" << std::endl; + + if (setsockopt(m_socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))) + std::cerr << "FCGI Error: SO_SNDTIMEO" << std::endl; + + int val{1}; + if (setsockopt(m_socket.native_handle(), SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val))) + std::cerr << "FCGI Error: SO_KEEPALIVE" << std::endl; +} + +bool TCPSocket::is_open() +{ + return m_socket.is_open(); +} + +void TCPSocket::close() +{ + m_socket.close(); +} + +size_t TCPSocket::write(const std::vector& data) +{ + try { + return m_socket.write_some(boost::asio::buffer(data)); + } catch (const boost::system::system_error& ex) { + if (ex.code() == boost::asio::error::eof) { + throw fcgi_eof_error("EOF on write"); + } else + throw std::runtime_error("FCGI Error: Unknown boost asio exception on write: "s + ex.what()); + } catch (const std::exception& ex) { + throw std::runtime_error("FCGI Error: Unknown exception on write: "s + ex.what()); + } +} + +size_t TCPSocket::read(std::vector& data) +{ + try { + return m_socket.read_some(boost::asio::buffer(data)); + } catch (const boost::system::system_error& ex) { + if (ex.code() == boost::asio::error::eof) { + throw fcgi_eof_error("EOF on read"); + } else + throw std::runtime_error("FCGI Error: Unknown boost asio exception on read: "s + ex.what()); + } catch (const std::exception& ex) { + throw std::runtime_error("FCGI Error: Unknown exception on read: "s + ex.what()); + } +} + diff --git a/plugins/fcgi/socket.h b/plugins/fcgi/socket.h new file mode 100644 index 0000000..b4ec54b --- /dev/null +++ b/plugins/fcgi/socket.h @@ -0,0 +1,70 @@ +#pragma once + +#include "fcgiid.h" + +#include + +#include +#include +#include +#include + +class fcgi_eof_error: public std::runtime_error +{ +public: + fcgi_eof_error(const std::string& what_arg): std::runtime_error(what_arg) {} +}; + +class Socket +{ + std::mutex m_mutex; // guard socket use in different threads + FCGI_ID m_fcgi_id; + +public: + virtual ~Socket() {} + + std::mutex& getMutex(); + + FCGI_ID& fcgi_id(); + + virtual void open() = 0; + virtual void close() = 0; + virtual bool is_open() = 0; + virtual size_t write(const std::vector& data) = 0; + virtual size_t read(std::vector& data) = 0; +}; + +class SocketFactory +{ + boost::asio::io_context m_io_context; + +public: + SocketFactory(); + std::shared_ptr create(const std::string& name); +}; + +class TCPSocket: public Socket +{ + boost::asio::io_context& m_io_context; + std::string m_host; + std::string m_port; + boost::asio::ip::tcp::socket m_socket; + +public: + TCPSocket(const std::string& host, const std::string& port, boost::asio::io_context& io_context); + ~TCPSocket() override; + + void open() override; + void close() override; + bool is_open() override; + size_t write(const std::vector& data) override; + size_t read(std::vector& data) override; +}; + +#if 0 +class FileSocket: public Socket +{ + ~FileSocket() override; +}; +#endif + -- cgit v1.2.3