diff options
author | Roland Reichwein <mail@reichwein.it> | 2023-02-05 10:09:33 +0100 |
---|---|---|
committer | Roland Reichwein <mail@reichwein.it> | 2023-02-05 10:09:33 +0100 |
commit | ca81dcf08d9a3bf49b3d540e3b3b792bfc3b3016 (patch) | |
tree | 161fb4b98aae491541d82080b1dd2fd75d120c23 | |
parent | 2d48c70f2f76d90e2e3e6c1badec7a4f0e6c623b (diff) |
Remove websocket mutex in favour of boost strands
-rw-r--r-- | whiteboard.cpp | 41 | ||||
-rw-r--r-- | whiteboard.h | 1 |
2 files changed, 25 insertions, 17 deletions
diff --git a/whiteboard.cpp b/whiteboard.cpp index ea56cc3..922bd37 100644 --- a/whiteboard.cpp +++ b/whiteboard.cpp @@ -95,11 +95,10 @@ class session: public std::enable_shared_from_this<session> public: using connection = std::shared_ptr<boost::beast::websocket::stream<boost::asio::ip::tcp::socket>>; - session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, std::mutex& websocket_mutex, boost::asio::ip::tcp::socket socket): + session(ConnectionRegistry& registry, Storage& storage, std::mutex& storage_mutex, boost::asio::ip::tcp::socket socket): m_registry(registry), m_storage(storage), m_storage_mutex(storage_mutex), - m_websocket_mutex(websocket_mutex), m_ws(std::make_shared<connection::element_type>(std::move(socket))), m_connection_guard(m_registry, m_ws) { @@ -115,7 +114,8 @@ public: std::string("Reichwein.IT Whiteboard")); })); - boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser, boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this())); + boost::beast::http::async_read(m_ws->next_layer(), m_buffer, m_parser, + boost::asio::bind_executor(m_ws->next_layer().get_executor(), boost::beast::bind_front_handler(&session::on_read_handshake, shared_from_this()))); } void on_read_handshake(boost::beast::error_code ec, std::size_t bytes_transferred) @@ -173,8 +173,9 @@ public: } if (data.size() > 0) { boost::beast::ostream(m_buffer) << data; - std::lock_guard<std::mutex> lock(m_websocket_mutex); - m_ws->async_write(m_buffer.data(), boost::beast::bind_front_handler(&session::on_write, shared_from_this())); + m_ws->async_write(m_buffer.data(), + boost::asio::bind_executor(m_ws->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write, shared_from_this()))); } else { do_read(); } @@ -196,22 +197,31 @@ public: do_read_handshake(); } + void on_write_notify(std::shared_ptr<std::string> data, std::shared_ptr<boost::asio::const_buffer> buffer, boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) { + std::cerr << "Error on session write notify: " << ec.message() << std::endl; + } + } + void notify_other_connections_diff(const std::string& id, const Diff& diff) { std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) { if (m_ws != ci) { - boost::beast::flat_buffer buffer; pt::ptree ptree {make_ptree({ {"type", "getdiff"}, {"revision", std::to_string(m_storage.getRevision(id))}, {"pos", std::to_string(m_storage.getCursorPos(id)) } })}; ptree.put_child("serverinfo.diff", diff.get_structure().get_child("diff")); - boost::beast::ostream(buffer) << Reichwein::XML::plain_xml(ptree); - std::lock_guard<std::mutex> lock(m_websocket_mutex); + auto data{std::make_shared<std::string>(Reichwein::XML::plain_xml(ptree))}; + auto buffer{std::make_shared<boost::asio::const_buffer>(data->data(), data->size())}; try { - ci->write(buffer.data()); + ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); } catch (const std::exception& ex) { std::cerr << "Warning: Notify getdiff write for " << ci << " not possible, id " << id << std::endl; m_registry.dump(); @@ -225,14 +235,14 @@ public: std::for_each(m_registry.begin(id), m_registry.end(id), [&](const connection& ci) { if (m_ws != ci) { - boost::beast::flat_buffer buffer; - boost::beast::ostream(buffer) << make_xml({ + auto data{std::make_shared<std::string>(make_xml({ {"type", "getpos"}, {"pos", std::to_string(m_storage.getCursorPos(id)) } - }); - std::lock_guard<std::mutex> lock(m_websocket_mutex); + }))}; + auto buffer{std::make_shared<boost::asio::const_buffer>(data->data(), data->size())}; try { - ci->write(buffer.data()); + ci->async_write(*buffer, boost::asio::bind_executor(ci->next_layer().get_executor(), + boost::beast::bind_front_handler(&session::on_write_notify, shared_from_this(), data, buffer))); } catch (const std::exception& ex) { std::cerr << "Warning: Notify getpos write for " << ci << " not possible, id " << id << std::endl; m_registry.dump(); @@ -348,7 +358,6 @@ private: ConnectionRegistry& m_registry; Storage& m_storage; std::mutex& m_storage_mutex; - std::mutex& m_websocket_mutex; connection m_ws; ConnectionRegistry::RegistryGuard m_connection_guard; @@ -370,7 +379,7 @@ void Whiteboard::on_accept(boost::system::error_code ec, boost::asio::ip::tcp::s if (ec) { std::cerr << "Error on accept: " << ec.message() << std::endl; } else { - std::make_shared<session>(m_registry, *m_storage, m_storage_mutex, m_websocket_mutex, std::move(socket))->run(); + std::make_shared<session>(m_registry, *m_storage, m_storage_mutex, std::move(socket))->run(); } do_accept(); diff --git a/whiteboard.h b/whiteboard.h index 15d764a..7648bd4 100644 --- a/whiteboard.h +++ b/whiteboard.h @@ -22,7 +22,6 @@ private: std::unique_ptr<Config> m_config; std::unique_ptr<Storage> m_storage; std::mutex m_storage_mutex; - std::mutex m_websocket_mutex; ConnectionRegistry m_registry; |