Merge pull request #5818

f91a06c Dropping cppzmq dependency; adding some zmq utils (vtnerd)
This commit is contained in:
luigi1111 2019-09-14 13:25:14 -05:00
commit d663e1e3db
No known key found for this signature in database
GPG key ID: F4ACA0183641E010
10 changed files with 553 additions and 80 deletions

View file

@ -979,14 +979,14 @@ if(CMAKE_C_COMPILER_ID STREQUAL "Clang" AND ARCH_WIDTH EQUAL "32" AND NOT IOS AN
endif() endif()
endif() endif()
find_path(ZMQ_INCLUDE_PATH zmq.hpp) find_path(ZMQ_INCLUDE_PATH zmq.h)
find_library(ZMQ_LIB zmq) find_library(ZMQ_LIB zmq)
find_library(PGM_LIBRARY pgm) find_library(PGM_LIBRARY pgm)
find_library(NORM_LIBRARY norm) find_library(NORM_LIBRARY norm)
find_library(SODIUM_LIBRARY sodium) find_library(SODIUM_LIBRARY sodium)
if(NOT ZMQ_INCLUDE_PATH) if(NOT ZMQ_INCLUDE_PATH)
message(FATAL_ERROR "Could not find required header zmq.hpp") message(FATAL_ERROR "Could not find required header zmq.h")
endif() endif()
if(NOT ZMQ_LIB) if(NOT ZMQ_LIB)
message(FATAL_ERROR "Could not find required libzmq") message(FATAL_ERROR "Could not find required libzmq")

View file

@ -181,7 +181,7 @@ library archives (`.a`).
| pkg-config | any | NO | `pkg-config` | `base-devel` | `pkgconf` | NO | | | pkg-config | any | NO | `pkg-config` | `base-devel` | `pkgconf` | NO | |
| Boost | 1.58 | NO | `libboost-all-dev` | `boost` | `boost-devel` | NO | C++ libraries | | Boost | 1.58 | NO | `libboost-all-dev` | `boost` | `boost-devel` | NO | C++ libraries |
| OpenSSL | basically any | NO | `libssl-dev` | `openssl` | `openssl-devel` | NO | sha256 sum | | OpenSSL | basically any | NO | `libssl-dev` | `openssl` | `openssl-devel` | NO | sha256 sum |
| libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `cppzmq-devel` | NO | ZeroMQ library | | libzmq | 3.0.0 | NO | `libzmq3-dev` | `zeromq` | `zeromq-devel` | NO | ZeroMQ library |
| OpenPGM | ? | NO | `libpgm-dev` | `libpgm` | `openpgm-devel` | NO | For ZeroMQ | | OpenPGM | ? | NO | `libpgm-dev` | `libpgm` | `openpgm-devel` | NO | For ZeroMQ |
| libnorm[2] | ? | NO | `libnorm-dev` | | ` | YES | For ZeroMQ | | libnorm[2] | ? | NO | `libnorm-dev` | | ` | YES | For ZeroMQ |
| libunbound | 1.4.16 | YES | `libunbound-dev` | `unbound` | `unbound-devel` | NO | DNS resolver | | libunbound | 1.4.16 | YES | `libunbound-dev` | `unbound` | `unbound-devel` | NO | DNS resolver |

View file

@ -26,8 +26,10 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp socks_connect.cpp tor_address.cpp) set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp
set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h tor_address.h) socks_connect.cpp tor_address.cpp zmq.cpp)
set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h
tor_address.h zmq.h)
monero_add_library(net ${net_sources} ${net_headers}) monero_add_library(net ${net_sources} ${net_headers})
target_link_libraries(net common epee ${Boost_ASIO_LIBRARY}) target_link_libraries(net common epee ${Boost_ASIO_LIBRARY})

188
src/net/zmq.cpp Normal file
View file

@ -0,0 +1,188 @@
// Copyright (c) 2019, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "net/zmq.h"
#include <cassert>
#include <cerrno>
#include <limits>
#include <utility>
namespace net
{
namespace zmq
{
const std::error_category& error_category() noexcept
{
struct category final : std::error_category
{
virtual const char* name() const noexcept override final
{
return "error::error_category()";
}
virtual std::string message(int value) const override final
{
char const* const msg = zmq_strerror(value);
if (msg)
return msg;
return "zmq_strerror failure";
}
virtual std::error_condition default_error_condition(int value) const noexcept override final
{
// maps specific errors to generic `std::errc` cases.
switch (value)
{
case EFSM:
case ETERM:
break;
default:
/* zmq is using cerrno errors. C++ spec indicates that
`std::errc` values must be identical to the cerrno value.
So just map every zmq specific error to the generic errc
equivalent. zmq extensions must be in the switch or they
map to a non-existent errc enum value. */
return std::errc(value);
}
return std::error_condition{value, *this};
}
};
static const category instance{};
return instance;
}
void terminate::call(void* ptr) noexcept
{
assert(ptr != nullptr); // see header
while (zmq_term(ptr))
{
if (zmq_errno() != EINTR)
break;
}
}
namespace
{
//! RAII wrapper for `zmq_msg_t`.
class message
{
zmq_msg_t handle_;
public:
message() noexcept
: handle_()
{
zmq_msg_init(handle());
}
message(message&& rhs) = delete;
message(const message& rhs) = delete;
message& operator=(message&& rhs) = delete;
message& operator=(const message& rhs) = delete;
~message() noexcept
{
zmq_msg_close(handle());
}
zmq_msg_t* handle() noexcept
{
return std::addressof(handle_);
}
const char* data() noexcept
{
return static_cast<const char*>(zmq_msg_data(handle()));
}
std::size_t size() noexcept
{
return zmq_msg_size(handle());
}
};
struct do_receive
{
/* ZMQ documentation states that message parts are atomic - either
all are received or none are. Looking through ZMQ code and
Github discussions indicates that after part 1 is returned,
`EAGAIN` cannot be returned to meet these guarantees. Unit tests
verify (for the `inproc://` case) that this is the behavior.
Therefore, read errors after the first part are treated as a
failure for the entire message (probably `ETERM`). */
int operator()(std::string& payload, void* const socket, const int flags) const
{
static constexpr const int max_out = std::numeric_limits<int>::max();
const std::string::size_type initial = payload.size();
message part{};
for (;;)
{
int last = 0;
if ((last = zmq_msg_recv(part.handle(), socket, flags)) < 0)
return last;
payload.append(part.data(), part.size());
if (!zmq_msg_more(part.handle()))
break;
}
const std::string::size_type added = payload.size() - initial;
return unsigned(max_out) < added ? max_out : int(added);
}
};
template<typename F, typename... T>
expect<void> retry_op(F op, T&&... args) noexcept(noexcept(op(args...)))
{
for (;;)
{
if (0 <= op(args...))
return success();
const int error = zmq_errno();
if (error != EINTR)
return make_error_code(error);
}
}
} // anonymous
expect<std::string> receive(void* const socket, const int flags)
{
std::string payload{};
MONERO_CHECK(retry_op(do_receive{}, payload, socket, flags));
return {std::move(payload)};
}
expect<void> send(const epee::span<const std::uint8_t> payload, void* const socket, const int flags) noexcept
{
return retry_op(zmq_send, socket, payload.data(), payload.size(), flags);
}
} // zmq
} // net

136
src/net/zmq.h Normal file
View file

@ -0,0 +1,136 @@
// Copyright (c) 2019, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <memory>
#include <string>
#include <system_error>
#include <zmq.h>
#include "common/expect.h"
#include "span.h"
//! If the expression is less than 0, return the current ZMQ error code.
#define MONERO_ZMQ_CHECK(...) \
do \
{ \
if (( __VA_ARGS__ ) < 0) \
return {::net::zmq::get_error_code()}; \
} while (0)
//! Print a message followed by the current ZMQ error message.
#define MONERO_LOG_ZMQ_ERROR(...) \
do \
{ \
MERROR( __VA_ARGS__ << ": " << ::net::zmq::get_error_code().message()); \
} while (0)
//! Throw an exception with a custom `msg`, current ZMQ error code, filename, and line number.
#define MONERO_ZMQ_THROW(msg) \
MONERO_THROW( ::net::zmq::get_error_code(), msg )
namespace net
{
namespace zmq
{
//! \return Category for ZMQ errors.
const std::error_category& error_category() noexcept;
//! \return `code` (usally from zmq_errno()`) using `net::zmq::error_category()`.
inline std::error_code make_error_code(int code) noexcept
{
return std::error_code{code, error_category()};
}
//! \return Error from `zmq_errno()` using `net::zmq::error_category()`.
inline std::error_code get_error_code() noexcept
{
return make_error_code(zmq_errno());
}
//! Calls `zmq_term`
class terminate
{
static void call(void* ptr) noexcept;
public:
void operator()(void* ptr) const noexcept
{
if (ptr)
call(ptr);
}
};
//! Calls `zmq_close`
struct close
{
void operator()(void* ptr) const noexcept
{
if (ptr)
zmq_close(ptr);
}
};
//! Unique ZMQ context handle, calls `zmq_term` on destruction.
using context = std::unique_ptr<void, terminate>;
//! Unique ZMQ socket handle, calls `zmq_close` on destruction.
using socket = std::unique_ptr<void, close>;
/*! Read all parts of the next message on `socket`. Blocks until the entire
next message (all parts) are read, or until `zmq_term` is called on the
`zmq_context` associated with `socket`. If the context is terminated,
`make_error_code(ETERM)` is returned.
\note This will automatically retry on `EINTR`, so exiting on
interrupts requires context termination.
\note If non-blocking behavior is requested on `socket` or by `flags`,
then `net::zmq::make_error_code(EAGAIN)` will be returned if this
would block.
\param socket Handle created with `zmq_socket`.
\param flags See `zmq_msg_read` for possible flags.
\return Message payload read from `socket` or ZMQ error. */
expect<std::string> receive(void* socket, int flags = 0);
/*! Sends `payload` on `socket`. Blocks until the entire message is queued
for sending, or until `zmq_term` is called on the `zmq_context`
associated with `socket`. If the context is terminated,
`make_error_code(ETERM)` is returned.
\note This will automatically retry on `EINTR`, so exiting on
interrupts requires context termination.
\note If non-blocking behavior is requested on `socket` or by `flags`,
then `net::zmq::make_error_code(EAGAIN)` will be returned if this
would block.
\param payload sent as one message on `socket`.
\param socket Handle created with `zmq_socket`.
\param flags See `zmq_send` for possible flags.
\return `success()` if sent, otherwise ZMQ error. */
expect<void> send(epee::span<const std::uint8_t> payload, void* socket, int flags = 0) noexcept;
} // zmq
} // net

View file

@ -26,6 +26,8 @@
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
include_directories(SYSTEM ${ZMQ_INCLUDE_PATH})
set(rpc_base_sources set(rpc_base_sources
rpc_args.cpp) rpc_args.cpp)

View file

@ -28,18 +28,29 @@
#include "zmq_server.h" #include "zmq_server.h"
#include <chrono>
#include <cstdint>
#include <system_error>
namespace cryptonote namespace cryptonote
{ {
namespace
{
constexpr const int num_zmq_threads = 1;
constexpr const std::int64_t max_message_size = 10 * 1024 * 1024; // 10 MiB
constexpr const std::chrono::seconds linger_timeout{2}; // wait period for pending out messages
}
namespace rpc namespace rpc
{ {
ZmqServer::ZmqServer(RpcHandler& h) : ZmqServer::ZmqServer(RpcHandler& h) :
handler(h), handler(h),
stop_signal(false), context(zmq_init(num_zmq_threads))
running(false),
context(DEFAULT_NUM_ZMQ_THREADS) // TODO: make this configurable
{ {
if (!context)
MONERO_ZMQ_THROW("Unable to create ZMQ context");
} }
ZmqServer::~ZmqServer() ZmqServer::~ZmqServer()
@ -48,71 +59,88 @@ ZmqServer::~ZmqServer()
void ZmqServer::serve() void ZmqServer::serve()
{ {
try
{
// socket must close before `zmq_term` will exit.
const net::zmq::socket socket = std::move(rep_socket);
if (!socket)
{
MERROR("ZMQ RPC server reply socket is null");
return;
}
while (1) while (1)
{ {
try const std::string message = MONERO_UNWRAP(net::zmq::receive(socket.get()));
{ MDEBUG("Received RPC request: \"" << message << "\"");
zmq::message_t message; const std::string& response = handler.handle(message);
if (!rep_socket)
{
throw std::runtime_error("ZMQ RPC server reply socket is null");
}
while (rep_socket->recv(&message, 0))
{
std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
std::string response = handler.handle(message_string);
zmq::message_t reply(response.size());
memcpy((void *) reply.data(), response.c_str(), response.size());
rep_socket->send(reply);
MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
MONERO_UNWRAP(net::zmq::send(epee::strspan<std::uint8_t>(response), socket.get()));
MDEBUG("Sent RPC reply: \"" << response << "\"");
} }
} }
catch (const boost::thread_interrupted& e) catch (const std::system_error& e)
{ {
MDEBUG("ZMQ Server thread interrupted."); if (e.code() != net::zmq::make_error_code(ETERM))
MERROR("ZMQ RPC Server Error: " << e.what());
} }
catch (const zmq::error_t& e) catch (const std::exception& e)
{ {
MERROR(std::string("ZMQ error: ") + e.what()); MERROR("ZMQ RPC Server Error: " << e.what());
} }
boost::this_thread::interruption_point(); catch (...)
{
MERROR("Unknown error in ZMQ RPC server");
} }
} }
bool ZmqServer::addIPCSocket(std::string address, std::string port) bool ZmqServer::addIPCSocket(const boost::string_ref address, const boost::string_ref port)
{ {
MERROR("ZmqServer::addIPCSocket not yet implemented!"); MERROR("ZmqServer::addIPCSocket not yet implemented!");
return false; return false;
} }
bool ZmqServer::addTCPSocket(std::string address, std::string port) bool ZmqServer::addTCPSocket(boost::string_ref address, boost::string_ref port)
{ {
try if (!context)
{ {
std::string addr_prefix("tcp://"); MERROR("ZMQ RPC Server already shutdown");
return false;
}
rep_socket.reset(new zmq::socket_t(context, ZMQ_REP)); rep_socket.reset(zmq_socket(context.get(), ZMQ_REP));
if (!rep_socket)
{
MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server socket create failed");
return false;
}
rep_socket->setsockopt(ZMQ_RCVTIMEO, &DEFAULT_RPC_RECV_TIMEOUT_MS, sizeof(DEFAULT_RPC_RECV_TIMEOUT_MS)); if (zmq_setsockopt(rep_socket.get(), ZMQ_MAXMSGSIZE, std::addressof(max_message_size), sizeof(max_message_size)) != 0)
{
MONERO_LOG_ZMQ_ERROR("Failed to set maximum incoming message size");
return false;
}
static constexpr const int linger_value = std::chrono::milliseconds{linger_timeout}.count();
if (zmq_setsockopt(rep_socket.get(), ZMQ_LINGER, std::addressof(linger_value), sizeof(linger_value)) != 0)
{
MONERO_LOG_ZMQ_ERROR("Failed to set linger timeout");
return false;
}
if (address.empty()) if (address.empty())
address = "*"; address = "*";
if (port.empty()) if (port.empty())
port = "*"; port = "*";
std::string bind_address = addr_prefix + address + std::string(":") + port;
rep_socket->bind(bind_address.c_str()); std::string bind_address = "tcp://";
} bind_address.append(address.data(), address.size());
catch (const std::exception& e) bind_address += ":";
bind_address.append(port.data(), port.size());
if (zmq_bind(rep_socket.get(), bind_address.c_str()) < 0)
{ {
MERROR(std::string("Error creating ZMQ Socket: ") + e.what()); MONERO_LOG_ZMQ_ERROR("ZMQ RPC Server bind failed");
return false; return false;
} }
return true; return true;
@ -120,22 +148,16 @@ bool ZmqServer::addTCPSocket(std::string address, std::string port)
void ZmqServer::run() void ZmqServer::run()
{ {
running = true;
run_thread = boost::thread(boost::bind(&ZmqServer::serve, this)); run_thread = boost::thread(boost::bind(&ZmqServer::serve, this));
} }
void ZmqServer::stop() void ZmqServer::stop()
{ {
if (!running) return; if (!run_thread.joinable())
stop_signal = true;
run_thread.interrupt();
run_thread.join();
running = false;
return; return;
context.reset(); // destroying context terminates all calls
run_thread.join();
} }

View file

@ -29,12 +29,10 @@
#pragma once #pragma once
#include <boost/thread/thread.hpp> #include <boost/thread/thread.hpp>
#include <zmq.hpp> #include <boost/utility/string_ref.hpp>
#include <string>
#include <memory>
#include "common/command_line.h" #include "common/command_line.h"
#include "net/zmq.h"
#include "rpc_handler.h" #include "rpc_handler.h"
namespace cryptonote namespace cryptonote
@ -43,9 +41,6 @@ namespace cryptonote
namespace rpc namespace rpc
{ {
static constexpr int DEFAULT_NUM_ZMQ_THREADS = 1;
static constexpr int DEFAULT_RPC_RECV_TIMEOUT_MS = 1000;
class ZmqServer class ZmqServer
{ {
public: public:
@ -58,8 +53,8 @@ class ZmqServer
void serve(); void serve();
bool addIPCSocket(std::string address, std::string port); bool addIPCSocket(boost::string_ref address, boost::string_ref port);
bool addTCPSocket(std::string address, std::string port); bool addTCPSocket(boost::string_ref address, boost::string_ref port);
void run(); void run();
void stop(); void stop();
@ -67,14 +62,11 @@ class ZmqServer
private: private:
RpcHandler& handler; RpcHandler& handler;
volatile bool stop_signal; net::zmq::context context;
volatile bool running;
zmq::context_t context;
boost::thread run_thread; boost::thread run_thread;
std::unique_ptr<zmq::socket_t> rep_socket; net::zmq::socket rep_socket;
}; };

View file

@ -117,7 +117,8 @@ target_link_libraries(unit_tests
${Boost_THREAD_LIBRARY} ${Boost_THREAD_LIBRARY}
${GTEST_LIBRARIES} ${GTEST_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT} ${CMAKE_THREAD_LIBS_INIT}
${EXTRA_LIBRARIES}) ${EXTRA_LIBRARIES}
${ZMQ_LIB})
set_property(TARGET unit_tests set_property(TARGET unit_tests
PROPERTY PROPERTY
FOLDER "tests") FOLDER "tests")

View file

@ -40,6 +40,7 @@
#include <boost/range/adaptor/sliced.hpp> #include <boost/range/adaptor/sliced.hpp>
#include <boost/range/combine.hpp> #include <boost/range/combine.hpp>
#include <boost/system/error_code.hpp> #include <boost/system/error_code.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/thread.hpp> #include <boost/thread/thread.hpp>
#include <boost/uuid/nil_generator.hpp> #include <boost/uuid/nil_generator.hpp>
#include <boost/uuid/random_generator.hpp> #include <boost/uuid/random_generator.hpp>
@ -59,6 +60,7 @@
#include "net/socks_connect.h" #include "net/socks_connect.h"
#include "net/parse.h" #include "net/parse.h"
#include "net/tor_address.h" #include "net/tor_address.h"
#include "net/zmq.h"
#include "p2p/net_peerlist_boost_serialization.h" #include "p2p/net_peerlist_boost_serialization.h"
#include "serialization/keyvalue_serialization.h" #include "serialization/keyvalue_serialization.h"
#include "storages/portable_storage.h" #include "storages/portable_storage.h"
@ -1259,3 +1261,131 @@ TEST(dandelionpp_map, dropped_all_connections)
EXPECT_EQ(3u, entry.second); EXPECT_EQ(3u, entry.second);
} }
} }
TEST(zmq, error_codes)
{
EXPECT_EQ(
std::addressof(net::zmq::error_category()),
std::addressof(net::zmq::make_error_code(0).category())
);
EXPECT_EQ(
std::make_error_condition(std::errc::not_a_socket),
net::zmq::make_error_code(ENOTSOCK)
);
EXPECT_TRUE(
[]() -> expect<void>
{
MONERO_ZMQ_CHECK(zmq_msg_send(nullptr, nullptr, 0));
return success();
}().matches(std::errc::not_a_socket)
);
bool thrown = false;
try
{
MONERO_ZMQ_THROW("stuff");
}
catch (const std::system_error& e)
{
thrown = true;
EXPECT_EQ(std::make_error_condition(std::errc::not_a_socket), e.code());
}
EXPECT_TRUE(thrown);
}
TEST(zmq, read_write)
{
net::zmq::context context{zmq_init(1)};
ASSERT_NE(nullptr, context);
net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
ASSERT_NE(nullptr, send_socket);
ASSERT_NE(nullptr, recv_socket);
ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
std::string message;
message.resize(1024);
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
ASSERT_TRUE(bool(net::zmq::send(epee::strspan<std::uint8_t>(message), send_socket.get())));
const expect<std::string> received = net::zmq::receive(recv_socket.get());
ASSERT_TRUE(bool(received));
EXPECT_EQ(message, *received);
}
TEST(zmq, read_write_multipart)
{
net::zmq::context context{zmq_init(1)};
ASSERT_NE(nullptr, context);
net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
ASSERT_NE(nullptr, send_socket);
ASSERT_NE(nullptr, recv_socket);
ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
std::string message;
message.resize(999);
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
for (unsigned i = 0; i < 3; ++i)
{
const expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
ASSERT_FALSE(bool(received));
EXPECT_EQ(net::zmq::make_error_code(EAGAIN), received.error());
const epee::span<const std::uint8_t> bytes{
reinterpret_cast<const std::uint8_t*>(std::addressof(message[0])) + (i * 333), 333
};
ASSERT_TRUE(bool(net::zmq::send(bytes, send_socket.get(), (i == 2 ? 0 : ZMQ_SNDMORE))));
}
const expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
ASSERT_TRUE(bool(received));
EXPECT_EQ(message, *received);
}
TEST(zmq, read_write_termination)
{
net::zmq::context context{zmq_init(1)};
ASSERT_NE(nullptr, context);
// must be declared before sockets and after context
boost::scoped_thread<> thread{};
net::zmq::socket send_socket{zmq_socket(context.get(), ZMQ_REQ)};
net::zmq::socket recv_socket{zmq_socket(context.get(), ZMQ_REP)};
ASSERT_NE(nullptr, send_socket);
ASSERT_NE(nullptr, recv_socket);
ASSERT_EQ(0u, zmq_bind(recv_socket.get(), "inproc://testing"));
ASSERT_EQ(0u, zmq_connect(send_socket.get(), "inproc://testing"));
std::string message;
message.resize(1024);
crypto::rand(message.size(), reinterpret_cast<std::uint8_t*>(std::addressof(message[0])));
ASSERT_TRUE(bool(net::zmq::send(epee::strspan<std::uint8_t>(message), send_socket.get(), ZMQ_SNDMORE)));
expect<std::string> received = net::zmq::receive(recv_socket.get(), ZMQ_DONTWAIT);
ASSERT_FALSE(bool(received));
EXPECT_EQ(net::zmq::make_error_code(EAGAIN), received.error());
thread = boost::scoped_thread<>{
boost::thread{
[&context] () { context.reset(); }
}
};
received = net::zmq::receive(recv_socket.get());
ASSERT_FALSE(bool(received));
EXPECT_EQ(net::zmq::make_error_code(ETERM), received.error());
}