Remove payload copy in all outgoing p2p messages

This commit is contained in:
Lee Clagett 2020-10-13 09:27:20 +00:00
parent 0b6bfb1fd8
commit 23aae5571b
20 changed files with 319 additions and 215 deletions

View file

@ -31,6 +31,7 @@
#include <cstdint> #include <cstdint>
#include "byte_stream.h"
#include "net_utils_base.h" #include "net_utils_base.h"
#include "span.h" #include "span.h"
@ -83,11 +84,12 @@ namespace levin
#define LEVIN_PROTOCOL_VER_0 0 #define LEVIN_PROTOCOL_VER_0 0
#define LEVIN_PROTOCOL_VER_1 1 #define LEVIN_PROTOCOL_VER_1 1
template<class t_connection_context = net_utils::connection_context_base> template<class t_connection_context = net_utils::connection_context_base>
struct levin_commands_handler struct levin_commands_handler
{ {
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, t_connection_context& context)=0; virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, t_connection_context& context)=0;
virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0; virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0;
virtual void callback(t_connection_context& context){}; virtual void callback(t_connection_context& context){};
@ -125,12 +127,41 @@ namespace levin
} }
} }
//! Provides space for levin (p2p) header, so that payload can be sent without copy
class message_writer
{
byte_slice finalize(uint32_t command, uint32_t flags, uint32_t return_code, bool expect_response);
public:
using header = bucket_head2;
explicit message_writer(std::size_t reserve = 8192);
message_writer(const message_writer&) = delete;
message_writer(message_writer&&) = default;
~message_writer() = default;
message_writer& operator=(const message_writer&) = delete;
message_writer& operator=(message_writer&&) = default;
//! \return Size of payload (excludes header size).
std::size_t payload_size() const noexcept
{
return buffer.size() < sizeof(header) ? 0 : buffer.size() - sizeof(header);
}
byte_slice finalize_invoke(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, true); }
byte_slice finalize_notify(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, false); }
byte_slice finalize_response(uint32_t command, uint32_t return_code)
{
return finalize(command, LEVIN_PACKET_RESPONSE, return_code, false);
}
//! Has space for levin header until a finalize method is used
byte_stream buffer;
};
//! \return Intialized levin header. //! \return Intialized levin header.
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept;
//! \return A levin notification message.
byte_slice make_notify(int command, epee::span<const std::uint8_t> payload);
/*! Generate a dummy levin message. /*! Generate a dummy levin message.
\param noise_bytes Total size of the returned `byte_slice`. \param noise_bytes Total size of the returned `byte_slice`.
@ -140,12 +171,11 @@ namespace levin
/*! Generate 1+ levin messages that are identical to the noise message size. /*! Generate 1+ levin messages that are identical to the noise message size.
\param noise Each levin message will be identical to the size of this \param noise_size Each levin message will be identical to this value.
message. The bytes from this message will be used for padding.
\return `nullptr` if `noise.size()` is less than the levin header size. \return `nullptr` if `noise.size()` is less than the levin header size.
Otherwise, a levin notification message OR 2+ levin fragment messages. Otherwise, a levin notification message OR 2+ levin fragment messages.
Each message is `noise.size()` in length. */ Each message is `noise.size()` in length. */
byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); byte_slice make_fragmented_notify(const std::size_t noise_size, int command, message_writer message);
} }
} }

View file

@ -51,6 +51,21 @@
#define MIN_BYTES_WANTED 512 #define MIN_BYTES_WANTED 512
#endif #endif
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category)
{
MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
<< " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
}
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
{
char buf[32];
snprintf(buf, sizeof(buf), "command-%u", command);
on_levin_traffic(context, initiator, sent, error, bytes, buf);
}
namespace epee namespace epee
{ {
namespace levin namespace levin
@ -88,11 +103,10 @@ public:
uint64_t m_max_packet_size; uint64_t m_max_packet_size;
uint64_t m_invoke_timeout; uint64_t m_invoke_timeout;
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id); int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id);
template<class callback_t> template<class callback_t>
int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
bool close(boost::uuids::uuid connection_id); bool close(boost::uuids::uuid connection_id);
bool update_connection_context(const t_connection_context& contxt); bool update_connection_context(const t_connection_context& contxt);
@ -122,12 +136,17 @@ class async_protocol_handler
{ {
std::string m_fragment_buffer; std::string m_fragment_buffer;
bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) bool send_message(byte_slice message)
{ {
const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); if (message.size() < sizeof(message_writer::header))
if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
return false; return false;
message_writer::header head;
std::memcpy(std::addressof(head), message.data(), sizeof(head));
if(!m_pservice_endpoint->do_send(std::move(message)))
return false;
on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command);
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags << ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data << ", r?=" << head.m_have_to_return_data
@ -523,26 +542,17 @@ public:
{ {
if(m_current_head.m_have_to_return_data) if(m_current_head.m_have_to_return_data)
{ {
byte_slice return_buff; levin::message_writer return_message{32 * 1024};
const uint32_t return_code = m_config.m_pcommands_handler->invoke( const uint32_t return_code = m_config.m_pcommands_handler->invoke(
m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context
); );
// peer_id remains unset if dropped // peer_id remains unset if dropped
if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete()) if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete())
m_max_packet_size = m_config.m_max_packet_size; m_max_packet_size = m_config.m_max_packet_size;
bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code)))
head.m_return_code = SWAP32LE(return_code);
if(!m_pservice_endpoint->do_send(byte_slice{{epee::as_byte_span(head), epee::to_span(return_buff)}}))
return false; return false;
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
<<", cmd = " << head.m_command
<< ", ver=" << head.m_protocol_version);
} }
else else
m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
@ -619,7 +629,7 @@ public:
} }
template<class callback_t> template<class callback_t>
bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{ {
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this)); boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -638,7 +648,7 @@ public:
if (command == m_connection_context.handshake_command()) if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size; m_max_packet_size = m_config.m_max_packet_size;
if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) if(!send_message(in_msg.finalize_invoke(command)))
{ {
LOG_ERROR_CC(m_connection_context, "Failed to do_send"); LOG_ERROR_CC(m_connection_context, "Failed to do_send");
err_code = LEVIN_ERROR_CONNECTION; err_code = LEVIN_ERROR_CONNECTION;
@ -664,7 +674,7 @@ public:
return true; return true;
} }
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out) int invoke(int command, message_writer in_msg, std::string& buff_out)
{ {
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this)); boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -676,7 +686,7 @@ public:
if (command == m_connection_context.handshake_command()) if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size; m_max_packet_size = m_config.m_max_packet_size;
if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) if (!send_message(in_msg.finalize_invoke(command)))
{ {
LOG_ERROR_CC(m_connection_context, "Failed to send request"); LOG_ERROR_CC(m_connection_context, "Failed to send request");
return LEVIN_ERROR_CONNECTION; return LEVIN_ERROR_CONNECTION;
@ -713,25 +723,9 @@ public:
return m_invoke_result_code; return m_invoke_result_code;
} }
int notify(int command, const epee::span<const uint8_t> in_buff) /*! Sends `message` without adding a levin header. The message must have been
{ created with `make_noise_notify`, `make_fragmented_notify`, or
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( `message_writer::finalize_notify`. See additional instructions for
boost::bind(&async_protocol_handler::finish_outer_call, this));
CRITICAL_REGION_LOCAL(m_call_lock);
if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false))
{
LOG_ERROR_CC(m_connection_context, "Failed to send notify message");
return -1;
}
return 1;
}
/*! Sends `message` without adding a levin header. The message must have
been created with `make_notify`, `make_noise_notify` or
`make_fragmented_notify`. See additional instructions for
`make_fragmented_notify`. `make_fragmented_notify`.
\return 1 on success */ \return 1 on success */
@ -741,14 +735,11 @@ public:
boost::bind(&async_protocol_handler::finish_outer_call, this) boost::bind(&async_protocol_handler::finish_outer_call, this)
); );
const std::size_t length = message.size(); if (!send_message(std::move(message)))
if (!m_pservice_endpoint->do_send(std::move(message)))
{ {
LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
return -1; return -1;
} }
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]");
return 1; return 1;
} }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
@ -842,19 +833,19 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio
} }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
template<class t_connection_context> template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id) int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id)
{ {
async_protocol_handler<t_connection_context>* aph; async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph); int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r; return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r;
} }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t> template<class t_connection_context> template<class callback_t>
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
{ {
async_protocol_handler<t_connection_context>* aph; async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph); int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r; return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
} }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t> template<class t_connection_context> template<class callback_t>
@ -935,14 +926,6 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm
} }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
template<class t_connection_context> template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
{ {
async_protocol_handler<t_connection_context>* aph; async_protocol_handler<t_connection_context>* aph;

View file

@ -37,21 +37,14 @@
#undef MONERO_DEFAULT_LOG_CATEGORY #undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net" #define MONERO_DEFAULT_LOG_CATEGORY "net"
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category);
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command);
namespace namespace
{ {
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category)
{
MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
<< " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
}
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
{
char buf[32];
snprintf(buf, sizeof(buf), "command-%u", command);
return on_levin_traffic(context, initiator, sent, error, bytes, buf);
}
static const constexpr epee::serialization::portable_storage::limits_t default_levin_limits = { static const constexpr epee::serialization::portable_storage::limits_t default_levin_limits = {
8192, // objects 8192, // objects
16384, // fields 16384, // fields
@ -117,12 +110,11 @@ namespace epee
const boost::uuids::uuid &conn_id = context.m_connection_id; const boost::uuids::uuid &conn_id = context.m_connection_id;
typename serialization::portable_storage stg; typename serialization::portable_storage stg;
out_struct.store(stg); out_struct.store(stg);
byte_slice buff_to_send; levin::message_writer to_send{16 * 1024};
std::string buff_to_recv; std::string buff_to_recv;
stg.store_to_binary(buff_to_send, 16 * 1024); stg.store_to_binary(to_send.buffer);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command); int res = transport.invoke(command, std::move(to_send), buff_to_recv, conn_id);
int res = transport.invoke(command, boost::string_ref{reinterpret_cast<const char*>(buff_to_send.data()), buff_to_send.size()}, buff_to_recv, conn_id);
if( res <=0 ) if( res <=0 )
{ {
LOG_PRINT_L1("Failed to invoke command " << command << " return code " << res); LOG_PRINT_L1("Failed to invoke command " << command << " return code " << res);
@ -145,10 +137,9 @@ namespace epee
const boost::uuids::uuid &conn_id = context.m_connection_id; const boost::uuids::uuid &conn_id = context.m_connection_id;
typename serialization::portable_storage stg; typename serialization::portable_storage stg;
const_cast<t_arg&>(out_struct).store(stg);//TODO: add true const support to searilzation const_cast<t_arg&>(out_struct).store(stg);//TODO: add true const support to searilzation
byte_slice buff_to_send; levin::message_writer to_send{16 * 1024};
stg.store_to_binary(buff_to_send, 16 * 1024); stg.store_to_binary(to_send.buffer);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command); int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool
int res = transport.invoke_async(command, epee::to_span(buff_to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool
{ {
t_result result_struct = AUTO_VAL_INIT(result_struct); t_result result_struct = AUTO_VAL_INIT(result_struct);
if( code <=0 ) if( code <=0 )
@ -192,11 +183,10 @@ namespace epee
const boost::uuids::uuid &conn_id = context.m_connection_id; const boost::uuids::uuid &conn_id = context.m_connection_id;
serialization::portable_storage stg; serialization::portable_storage stg;
out_struct.store(stg); out_struct.store(stg);
byte_slice buff_to_send; levin::message_writer to_send;
stg.store_to_binary(buff_to_send); stg.store_to_binary(to_send.buffer);
on_levin_traffic(context, true, true, false, buff_to_send.size(), command); int res = transport.send(to_send.finalize_notify(command), conn_id);
int res = transport.notify(command, epee::to_span(buff_to_send), conn_id);
if(res <=0 ) if(res <=0 )
{ {
MERROR("Failed to notify command " << command << " return code " << res); MERROR("Failed to notify command " << command << " return code " << res);
@ -207,7 +197,7 @@ namespace epee
//---------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------- //----------------------------------------------------------------------------------------------------
template<class t_owner, class t_in_type, class t_out_type, class t_context, class callback_t> template<class t_owner, class t_in_type, class t_out_type, class t_context, class callback_t>
int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, callback_t cb, t_context& context ) int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, callback_t cb, t_context& context )
{ {
serialization::portable_storage strg; serialization::portable_storage strg;
if(!strg.load_from_binary(in_buff, &default_levin_limits)) if(!strg.load_from_binary(in_buff, &default_levin_limits))
@ -230,12 +220,11 @@ namespace epee
serialization::portable_storage strg_out; serialization::portable_storage strg_out;
static_cast<t_out_type&>(out_struct).store(strg_out); static_cast<t_out_type&>(out_struct).store(strg_out);
if(!strg_out.store_to_binary(buff_out, 32 * 1024)) if(!strg_out.store_to_binary(buff_out))
{ {
LOG_ERROR("Failed to store_to_binary in command" << command); LOG_ERROR("Failed to store_to_binary in command" << command);
return -1; return -1;
} }
on_levin_traffic(context, false, true, false, buff_out.size(), command);
return res; return res;
} }
@ -262,7 +251,7 @@ namespace epee
} }
#define CHAIN_LEVIN_INVOKE_MAP2(context_type) \ #define CHAIN_LEVIN_INVOKE_MAP2(context_type) \
int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, context_type& context) \ int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, context_type& context) \
{ \ { \
bool handled = false; \ bool handled = false; \
return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \ return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \
@ -271,13 +260,13 @@ namespace epee
#define CHAIN_LEVIN_NOTIFY_MAP2(context_type) \ #define CHAIN_LEVIN_NOTIFY_MAP2(context_type) \
int notify(int command, const epee::span<const uint8_t> in_buff, context_type& context) \ int notify(int command, const epee::span<const uint8_t> in_buff, context_type& context) \
{ \ { \
bool handled = false; epee::byte_slice fake_str; \ bool handled = false; epee::byte_stream fake_str; \
return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \ return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \
} }
#define CHAIN_LEVIN_INVOKE_MAP() \ #define CHAIN_LEVIN_INVOKE_MAP() \
int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, epee::net_utils::connection_context_base& context) \ int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, epee::net_utils::connection_context_base& context) \
{ \ { \
bool handled = false; \ bool handled = false; \
return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \ return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \
@ -297,7 +286,7 @@ namespace epee
} }
#define BEGIN_INVOKE_MAP2(owner_type) \ #define BEGIN_INVOKE_MAP2(owner_type) \
template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, t_context& context, bool& handled) \ template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, t_context& context, bool& handled) \
{ \ { \
try { \ try { \
typedef owner_type internal_owner_type_name; typedef owner_type internal_owner_type_name;

View file

@ -34,6 +34,7 @@
namespace epee namespace epee
{ {
class byte_slice; class byte_slice;
class byte_stream;
namespace serialization namespace serialization
{ {
/************************************************************************/ /************************************************************************/
@ -83,8 +84,13 @@ namespace epee
//------------------------------------------------------------------------------- //-------------------------------------------------------------------------------
bool store_to_binary(byte_slice& target, std::size_t initial_buffer_size = 8192); bool store_to_binary(byte_slice& target, std::size_t initial_buffer_size = 8192);
bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = NULL); bool store_to_binary(byte_stream& ss);
bool load_from_binary(const std::string& target, const limits_t *limits = NULL); bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = nullptr);
bool load_from_binary(const std::string& target, const limits_t *limits = nullptr)
{
return load_from_binary(epee::strspan<uint8_t>(target), limits);
}
template<class trace_policy> template<class trace_policy>
bool dump_as_xml(std::string& targetObj, const std::string& root_name = ""); bool dump_as_xml(std::string& targetObj, const std::string& root_name = "");
bool dump_as_json(std::string& targetObj, size_t indent = 0, bool insert_newlines = true); bool dump_as_json(std::string& targetObj, size_t indent = 0, bool insert_newlines = true);

View file

@ -36,6 +36,8 @@
namespace epee namespace epee
{ {
class byte_stream;
namespace serialization namespace serialization
{ {
//----------------------------------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------------------------------
@ -127,5 +129,14 @@ namespace epee
store_t_to_binary(str_in, binary_buff, initial_buffer_size); store_t_to_binary(str_in, binary_buff, initial_buffer_size);
return binary_buff; return binary_buff;
} }
//-----------------------------------------------------------------------------------------------------------
template<class t_struct>
bool store_t_to_binary(t_struct& str_in, byte_stream& binary_buff)
{
portable_storage ps;
str_in.store(ps);
return ps.store_to_binary(binary_buff);
}
} }
} }

View file

@ -34,6 +34,25 @@ namespace epee
{ {
namespace levin namespace levin
{ {
message_writer::message_writer(const std::size_t reserve)
: buffer()
{
buffer.reserve(reserve);
buffer.put_n(0, sizeof(header));
}
byte_slice message_writer::finalize(const uint32_t command, const uint32_t flags, const uint32_t return_code, const bool expect_response)
{
if (buffer.size() < sizeof(header))
throw std::runtime_error{"levin_writer::finalize already called"};
header head = make_header(command, payload_size(), flags, expect_response);
head.m_return_code = SWAP32LE(return_code);
std::memcpy(buffer.tellp() - buffer.size(), std::addressof(head), sizeof(head));
return byte_slice{std::move(buffer)};
}
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept
{ {
bucket_head2 head = {0}; bucket_head2 head = {0};
@ -47,12 +66,6 @@ namespace levin
return head; return head;
} }
byte_slice make_notify(int command, epee::span<const std::uint8_t> payload)
{
const bucket_head2 head = make_header(command, payload.size(), LEVIN_PACKET_REQUEST, false);
return byte_slice{epee::as_byte_span(head), payload};
}
byte_slice make_noise_notify(const std::size_t noise_bytes) byte_slice make_noise_notify(const std::size_t noise_bytes)
{ {
static constexpr const std::uint32_t flags = static constexpr const std::uint32_t flags =
@ -68,46 +81,40 @@ namespace levin
return byte_slice{std::move(buffer)}; return byte_slice{std::move(buffer)};
} }
byte_slice make_fragmented_notify(const byte_slice& noise_message, int command, epee::span<const std::uint8_t> payload) byte_slice make_fragmented_notify(const std::size_t noise_size, const int command, message_writer message)
{ {
const size_t noise_size = noise_message.size();
if (noise_size < sizeof(bucket_head2) * 2) if (noise_size < sizeof(bucket_head2) * 2)
return nullptr; return nullptr;
if (payload.size() <= noise_size - sizeof(bucket_head2)) if (message.buffer.size() <= noise_size)
{ {
/* The entire message can be sent at once, and the levin binary parser /* The entire message can be sent at once, and the levin binary parser
will ignore extra bytes. So just pad with zeroes and otherwise send will ignore extra bytes. So just pad with zeroes and otherwise send
a "normal", not fragmented message. */ a "normal", not fragmented message. */
const size_t padding = noise_size - sizeof(bucket_head2) - payload.size();
const span<const uint8_t> padding_bytes{noise_message.end() - padding, padding};
const bucket_head2 head = make_header(command, noise_size - sizeof(bucket_head2), LEVIN_PACKET_REQUEST, false); message.buffer.put_n(0, noise_size - message.buffer.size());
return byte_slice{as_byte_span(head), payload, padding_bytes}; return message.finalize_notify(command);
} }
// fragment message // fragment message
const byte_slice payload_bytes = message.finalize_notify(command);
span<const std::uint8_t> payload = to_span(payload_bytes);
const size_t payload_space = noise_size - sizeof(bucket_head2); const size_t payload_space = noise_size - sizeof(bucket_head2);
const size_t expected_fragments = ((payload.size() - 2) / payload_space) + 1; const size_t expected_fragments = ((payload.size() - 2) / payload_space) + 1;
std::string buffer{}; byte_stream buffer{};
buffer.reserve((expected_fragments + 1) * noise_size); // +1 here overselects for internal bucket_head2 value buffer.reserve(expected_fragments * noise_size);
bucket_head2 head = make_header(0, noise_size - sizeof(bucket_head2), LEVIN_PACKET_BEGIN, false); bucket_head2 head = make_header(0, payload_space, LEVIN_PACKET_BEGIN, false);
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); buffer.write(as_byte_span(head));
head.m_command = command; // internal levin header is in payload already
head.m_flags = LEVIN_PACKET_REQUEST;
head.m_cb = payload.size();
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head));
size_t copy_size = payload.remove_prefix(payload_space - sizeof(bucket_head2)); size_t copy_size = payload.remove_prefix(payload_space);
buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size); buffer.write(payload.data() - copy_size, copy_size);
head.m_command = 0;
head.m_flags = 0; head.m_flags = 0;
head.m_cb = noise_size - sizeof(bucket_head2);
while (!payload.empty()) while (!payload.empty())
{ {
copy_size = payload.remove_prefix(payload_space); copy_size = payload.remove_prefix(payload_space);
@ -115,12 +122,12 @@ namespace levin
if (payload.empty()) if (payload.empty())
head.m_flags = LEVIN_PACKET_END; head.m_flags = LEVIN_PACKET_END;
buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); buffer.write(as_byte_span(head));
buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size); buffer.write(payload.data() - copy_size, copy_size);
} }
const size_t padding = noise_size - copy_size - sizeof(bucket_head2); const size_t padding = noise_size - copy_size - sizeof(bucket_head2);
buffer.append(reinterpret_cast<const char*>(noise_message.end()) - padding, padding); buffer.put_n(0, padding);
return byte_slice{std::move(buffer)}; return byte_slice{std::move(buffer)};
} }

View file

@ -48,15 +48,23 @@ namespace serialization
TRY_ENTRY(); TRY_ENTRY();
byte_stream ss; byte_stream ss;
ss.reserve(initial_buffer_size); ss.reserve(initial_buffer_size);
store_to_binary(ss);
target = epee::byte_slice{std::move(ss)};
return true;
CATCH_ENTRY("portable_storage::store_to_binary", false);
}
bool portable_storage::store_to_binary(byte_stream& ss)
{
TRY_ENTRY();
storage_block_header sbh{}; storage_block_header sbh{};
sbh.m_signature_a = SWAP32LE(PORTABLE_STORAGE_SIGNATUREA); sbh.m_signature_a = SWAP32LE(PORTABLE_STORAGE_SIGNATUREA);
sbh.m_signature_b = SWAP32LE(PORTABLE_STORAGE_SIGNATUREB); sbh.m_signature_b = SWAP32LE(PORTABLE_STORAGE_SIGNATUREB);
sbh.m_ver = PORTABLE_STORAGE_FORMAT_VER; sbh.m_ver = PORTABLE_STORAGE_FORMAT_VER;
ss.write(epee::as_byte_span(sbh)); ss.write(epee::as_byte_span(sbh));
pack_entry_to_buff(ss, m_root); pack_entry_to_buff(ss, m_root);
target = epee::byte_slice{std::move(ss)};
return true; return true;
CATCH_ENTRY("portable_storage::store_to_binary", false) CATCH_ENTRY("portable_storage::store_to_binary", false);
} }
bool portable_storage::dump_as_json(std::string& buff, size_t indent, bool insert_newlines) bool portable_storage::dump_as_json(std::string& buff, size_t indent, bool insert_newlines)
@ -76,11 +84,6 @@ namespace serialization
CATCH_ENTRY("portable_storage::load_from_json", false) CATCH_ENTRY("portable_storage::load_from_json", false)
} }
bool portable_storage::load_from_binary(const std::string& target, const limits_t *limits)
{
return load_from_binary(epee::strspan<uint8_t>(target), limits);
}
bool portable_storage::load_from_binary(const epee::span<const uint8_t> source, const limits_t *limits) bool portable_storage::load_from_binary(const epee::span<const uint8_t> source, const limits_t *limits)
{ {
m_root.m_entries.clear(); m_root.m_entries.clear();

View file

@ -57,6 +57,7 @@
#include "common/notify.h" #include "common/notify.h"
#include "common/varint.h" #include "common/varint.h"
#include "common/pruning.h" #include "common/pruning.h"
#include "time_helper.h"
#undef MONERO_DEFAULT_LOG_CATEGORY #undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "blockchain" #define MONERO_DEFAULT_LOG_CATEGORY "blockchain"

View file

@ -46,6 +46,8 @@
#include "block_queue.h" #include "block_queue.h"
#include "common/perf_timer.h" #include "common/perf_timer.h"
#include "cryptonote_basic/connection_context.h" #include "cryptonote_basic/connection_context.h"
#include "net/levin_base.h"
#include "p2p/net_node_common.h"
#include <boost/circular_buffer.hpp> #include <boost/circular_buffer.hpp>
PUSH_WARNINGS PUSH_WARNINGS
@ -195,10 +197,11 @@ namespace cryptonote
bool post_notify(typename t_parameter::request& arg, cryptonote_connection_context& context) bool post_notify(typename t_parameter::request& arg, cryptonote_connection_context& context)
{ {
LOG_PRINT_L2("[" << epee::net_utils::print_connection_context_short(context) << "] post " << typeid(t_parameter).name() << " -->"); LOG_PRINT_L2("[" << epee::net_utils::print_connection_context_short(context) << "] post " << typeid(t_parameter).name() << " -->");
epee::byte_slice blob;
epee::serialization::store_t_to_binary(arg, blob, 256 * 1024); // optimize for block responses epee::levin::message_writer out{256 * 1024}; // optimize for block responses
epee::serialization::store_t_to_binary(arg, out.buffer);
//handler_response_blocks_now(blob.size()); // XXX //handler_response_blocks_now(blob.size()); // XXX
return m_p2p->invoke_notify_to_peer(t_parameter::ID, epee::to_span(blob), context); return m_p2p->invoke_notify_to_peer(t_parameter::ID, std::move(out), context);
} }
}; };

View file

@ -2699,15 +2699,15 @@ skip:
// send fluffy ones first, we want to encourage people to run that // send fluffy ones first, we want to encourage people to run that
if (!fluffyConnections.empty()) if (!fluffyConnections.empty())
{ {
epee::byte_slice fluffyBlob; epee::levin::message_writer fluffyBlob{32 * 1024};
epee::serialization::store_t_to_binary(fluffy_arg, fluffyBlob, 32 * 1024); epee::serialization::store_t_to_binary(fluffy_arg, fluffyBlob.buffer);
m_p2p->relay_notify_to_list(NOTIFY_NEW_FLUFFY_BLOCK::ID, epee::to_span(fluffyBlob), std::move(fluffyConnections)); m_p2p->relay_notify_to_list(NOTIFY_NEW_FLUFFY_BLOCK::ID, std::move(fluffyBlob), std::move(fluffyConnections));
} }
if (!fullConnections.empty()) if (!fullConnections.empty())
{ {
epee::byte_slice fullBlob; epee::levin::message_writer fullBlob{128 * 1024};
epee::serialization::store_t_to_binary(arg, fullBlob, 128 * 1024); epee::serialization::store_t_to_binary(arg, fullBlob.buffer);
m_p2p->relay_notify_to_list(NOTIFY_NEW_BLOCK::ID, epee::to_span(fullBlob), std::move(fullConnections)); m_p2p->relay_notify_to_list(NOTIFY_NEW_BLOCK::ID, std::move(fullBlob), std::move(fullConnections));
} }
return true; return true;

View file

@ -30,8 +30,8 @@
#pragma once #pragma once
#include "p2p/net_node_common.h"
#include "cryptonote_protocol/cryptonote_protocol_defs.h" #include "cryptonote_protocol/cryptonote_protocol_defs.h"
#include "cryptonote_protocol/enums.h"
#include "cryptonote_basic/connection_context.h" #include "cryptonote_basic/connection_context.h"
namespace cryptonote namespace cryptonote
{ {

View file

@ -159,7 +159,7 @@ namespace levin
return get_out_connections(p2p, get_blockchain_height(p2p, core)); return get_out_connections(p2p, get_blockchain_height(p2p, core));
} }
epee::byte_slice make_tx_payload(std::vector<blobdata>&& txs, const bool pad, const bool fluff) epee::levin::message_writer make_tx_message(std::vector<blobdata>&& txs, const bool pad, const bool fluff)
{ {
NOTIFY_NEW_TRANSACTIONS::request request{}; NOTIFY_NEW_TRANSACTIONS::request request{};
request.txs = std::move(txs); request.txs = std::move(txs);
@ -193,21 +193,17 @@ namespace levin
// if the size of _ moved enough, we might lose byte in size encoding, we don't care // if the size of _ moved enough, we might lose byte in size encoding, we don't care
} }
epee::byte_slice fullBlob; epee::levin::message_writer out;
if (!epee::serialization::store_t_to_binary(request, fullBlob)) if (!epee::serialization::store_t_to_binary(request, out.buffer))
throw std::runtime_error{"Failed to serialize to epee binary format"}; throw std::runtime_error{"Failed to serialize to epee binary format"};
return fullBlob; return out;
} }
bool make_payload_send_txs(connections& p2p, std::vector<blobdata>&& txs, const boost::uuids::uuid& destination, const bool pad, const bool fluff) bool make_payload_send_txs(connections& p2p, std::vector<blobdata>&& txs, const boost::uuids::uuid& destination, const bool pad, const bool fluff)
{ {
const epee::byte_slice blob = make_tx_payload(std::move(txs), pad, fluff); epee::byte_slice blob = make_tx_message(std::move(txs), pad, fluff).finalize_notify(NOTIFY_NEW_TRANSACTIONS::ID);
p2p.for_connection(destination, [&blob](detail::p2p_context& context) { return p2p.send(std::move(blob), destination);
on_levin_traffic(context, true, true, false, blob.size(), NOTIFY_NEW_TRANSACTIONS::ID);
return true;
});
return p2p.notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::to_span(blob), destination);
} }
/* The current design uses `asio::strand`s. The documentation isn't as clear /* The current design uses `asio::strand`s. The documentation isn't as clear
@ -653,10 +649,6 @@ namespace levin
else else
message = zone_->noise.clone(); message = zone_->noise.clone();
zone_->p2p->for_connection(channel.connection, [&](detail::p2p_context& context) {
on_levin_traffic(context, true, true, false, message.size(), "noise");
return true;
});
if (zone_->p2p->send(std::move(message), channel.connection)) if (zone_->p2p->send(std::move(message), channel.connection))
{ {
if (!channel.queue.empty() && channel.active.empty()) if (!channel.queue.empty() && channel.active.empty())
@ -816,9 +808,8 @@ namespace levin
// Padding is not useful when using noise mode. Send as stem so receiver // Padding is not useful when using noise mode. Send as stem so receiver
// forwards in Dandelion++ mode. // forwards in Dandelion++ mode.
const epee::byte_slice payload = make_tx_payload(std::move(txs), false, false);
epee::byte_slice message = epee::levin::make_fragmented_notify( epee::byte_slice message = epee::levin::make_fragmented_notify(
zone_->noise, NOTIFY_NEW_TRANSACTIONS::ID, epee::to_span(payload) zone_->noise.size(), NOTIFY_NEW_TRANSACTIONS::ID, make_tx_message(std::move(txs), false, false)
); );
if (CRYPTONOTE_MAX_FRAGMENTS * zone_->noise.size() < message.size()) if (CRYPTONOTE_MAX_FRAGMENTS * zone_->noise.size() < message.size())
{ {

View file

@ -344,10 +344,9 @@ namespace nodetool
virtual void on_connection_close(p2p_connection_context& context); virtual void on_connection_close(p2p_connection_context& context);
virtual void callback(p2p_connection_context& context); virtual void callback(p2p_connection_context& context);
//----------------- i_p2p_endpoint ------------------------------------------------------------- //----------------- i_p2p_endpoint -------------------------------------------------------------
virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections); virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) final;
virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay); virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay);
virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context); virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context) final;
virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context);
virtual bool drop_connection(const epee::net_utils::connection_context_base& context); virtual bool drop_connection(const epee::net_utils::connection_context_base& context);
virtual void request_callback(const epee::net_utils::connection_context_base& context); virtual void request_callback(const epee::net_utils::connection_context_base& context);
virtual void for_each_connection(std::function<bool(typename t_payload_net_handler::connection_context&, peerid_type, uint32_t)> f); virtual void for_each_connection(std::function<bool(typename t_payload_net_handler::connection_context&, peerid_type, uint32_t)> f);

View file

@ -2169,8 +2169,9 @@ namespace nodetool
} }
//----------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------
template<class t_payload_net_handler> template<class t_payload_net_handler>
bool node_server<t_payload_net_handler>::relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) bool node_server<t_payload_net_handler>::relay_notify_to_list(int command, epee::levin::message_writer data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)
{ {
epee::byte_slice message = data_buff.finalize_notify(command);
std::sort(connections.begin(), connections.end()); std::sort(connections.begin(), connections.end());
auto zone = m_network_zones.begin(); auto zone = m_network_zones.begin();
for(const auto& c_id: connections) for(const auto& c_id: connections)
@ -2188,7 +2189,7 @@ namespace nodetool
++zone; ++zone;
} }
if (zone->first == c_id.first) if (zone->first == c_id.first)
zone->second.m_net_server.get_config_object().notify(command, data_buff, c_id.second); zone->second.m_net_server.get_config_object().send(message.clone(), c_id.second);
} }
return true; return true;
} }
@ -2255,24 +2256,13 @@ namespace nodetool
} }
//----------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------
template<class t_payload_net_handler> template<class t_payload_net_handler>
bool node_server<t_payload_net_handler>::invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context) bool node_server<t_payload_net_handler>::invoke_notify_to_peer(const int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context)
{ {
if(is_filtered_command(context.m_remote_address, command)) if(is_filtered_command(context.m_remote_address, command))
return false; return false;
network_zone& zone = m_network_zones.at(context.m_remote_address.get_zone()); network_zone& zone = m_network_zones.at(context.m_remote_address.get_zone());
int res = zone.m_net_server.get_config_object().notify(command, req_buff, context.m_connection_id); int res = zone.m_net_server.get_config_object().send(message.finalize_notify(command), context.m_connection_id);
return res > 0;
}
//-----------------------------------------------------------------------------------
template<class t_payload_net_handler>
bool node_server<t_payload_net_handler>::invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context)
{
if(is_filtered_command(context.m_remote_address, command))
return false;
network_zone& zone = m_network_zones.at(context.m_remote_address.get_zone());
int res = zone.m_net_server.get_config_object().invoke(command, req_buff, resp_buff, context.m_connection_id);
return res > 0; return res > 0;
} }
//----------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------

View file

@ -40,6 +40,8 @@
#include "net/net_utils_base.h" #include "net/net_utils_base.h"
#include "p2p_protocol_defs.h" #include "p2p_protocol_defs.h"
namespace epee { namespace levin { class message_writer; } }
namespace nodetool namespace nodetool
{ {
@ -49,10 +51,9 @@ namespace nodetool
template<class t_connection_context> template<class t_connection_context>
struct i_p2p_endpoint struct i_p2p_endpoint
{ {
virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)=0; virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)=0;
virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay)=0; virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay)=0;
virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context)=0; virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context)=0;
virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context)=0;
virtual bool drop_connection(const epee::net_utils::connection_context_base& context)=0; virtual bool drop_connection(const epee::net_utils::connection_context_base& context)=0;
virtual void request_callback(const epee::net_utils::connection_context_base& context)=0; virtual void request_callback(const epee::net_utils::connection_context_base& context)=0;
virtual uint64_t get_public_connections_count()=0; virtual uint64_t get_public_connections_count()=0;
@ -71,7 +72,7 @@ namespace nodetool
template<class t_connection_context> template<class t_connection_context>
struct p2p_endpoint_stub: public i_p2p_endpoint<t_connection_context> struct p2p_endpoint_stub: public i_p2p_endpoint<t_connection_context>
{ {
virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)
{ {
return false; return false;
} }
@ -79,11 +80,7 @@ namespace nodetool
{ {
return epee::net_utils::zone::invalid; return epee::net_utils::zone::invalid;
} }
virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context) virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context)
{
return false;
}
virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context)
{ {
return true; return true;
} }

View file

@ -68,13 +68,13 @@ namespace
{ {
} }
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_levin_connection_context& context) virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_levin_connection_context& context)
{ {
m_invoke_counter.inc(); m_invoke_counter.inc();
boost::unique_lock<boost::mutex> lock(m_mutex); boost::unique_lock<boost::mutex> lock(m_mutex);
m_last_command = command; m_last_command = command;
m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size()); m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size());
buff_out = m_invoke_out_buf.clone(); buff_out.write(epee::to_span(m_invoke_out_buf));
return m_return_code; return m_return_code;
} }

View file

@ -67,7 +67,7 @@ namespace net_load_tests
{ {
} }
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_connection_context& context) virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_connection_context& context)
{ {
//m_invoke_counter.inc(); //m_invoke_counter.inc();
//std::unique_lock<std::mutex> lock(m_mutex); //std::unique_lock<std::mutex> lock(m_mutex);

View file

@ -153,7 +153,7 @@ TEST(test_epee_connection, test_lifetime)
delay(delay), delay(delay),
on_connection_close_f(on_connection_close_f) on_connection_close_f(on_connection_close_f)
{} {}
virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_slice&, context_t&) override { epee::misc_utils::sleep_no_w(delay); return {}; } virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_stream&, context_t&) override { epee::misc_utils::sleep_no_w(delay); return {}; }
virtual int notify(int, const epee::span<const uint8_t>, context_t&) override { return {}; } virtual int notify(int, const epee::span<const uint8_t>, context_t&) override { return {}; }
virtual void callback(context_t&) override {} virtual void callback(context_t&) override {}
virtual void on_connection_new(context_t&) override {} virtual void on_connection_new(context_t&) override {}
@ -281,7 +281,7 @@ TEST(test_epee_connection, test_lifetime)
for (auto i = 0; i < N; ++i) { for (auto i = 0; i < N; ++i) {
tag = create_connection(); tag = create_connection();
ASSERT_TRUE(shared_state->get_connections_count() == 1); ASSERT_TRUE(shared_state->get_connections_count() == 1);
success = shared_state->invoke_async(1, {}, tag, [](int, const epee::span<const uint8_t>, context_t&){}, TIMEOUT); success = shared_state->invoke_async(1, epee::levin::message_writer{}, tag, [](int, const epee::span<const uint8_t>, context_t&){}, TIMEOUT);
ASSERT_TRUE(success); ASSERT_TRUE(success);
while (shared_state->sock_count == 1) { while (shared_state->sock_count == 1) {
success = shared_state->foreach_connection([&shared_state, &tag](context_t&){ success = shared_state->foreach_connection([&shared_state, &tag](context_t&){

View file

@ -59,13 +59,13 @@ namespace
{ {
} }
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_levin_connection_context& context) virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_levin_connection_context& context)
{ {
m_invoke_counter.inc(); m_invoke_counter.inc();
boost::unique_lock<boost::mutex> lock(m_mutex); boost::unique_lock<boost::mutex> lock(m_mutex);
m_last_command = command; m_last_command = command;
m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size()); m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size());
buff_out = m_invoke_out_buf.clone(); buff_out.write(epee::to_span(m_invoke_out_buf));
return m_return_code; return m_return_code;
} }
@ -434,8 +434,11 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process
const int expected_command = 4673261; const int expected_command = 4673261;
const std::string in_data(256, 'e'); const std::string in_data(256, 'e');
epee::levin::message_writer message{};
message.buffer.write(epee::to_span(in_data));
const epee::byte_slice noise = epee::levin::make_noise_notify(1024); const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data)); const epee::byte_slice notify = message.finalize_notify(expected_command);
test_connection_ptr conn = create_connection(); test_connection_ptr conn = create_connection();
@ -468,11 +471,16 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process
const int expected_command = 4673261; const int expected_command = 4673261;
const int expected_fragmented_command = 46732; const int expected_fragmented_command = 46732;
const std::string in_data(256, 'e'); const std::string in_data(256, 'e');
std::string in_fragmented_data(1024 * 4, 'c');
epee::levin::message_writer message{};
message.buffer.write(epee::to_span(in_data));
epee::levin::message_writer in_fragmented_data;
in_fragmented_data.buffer.put_n('c', 1024 * 4);
const epee::byte_slice noise = epee::levin::make_noise_notify(1024); const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data)); const epee::byte_slice notify = message.finalize_notify(expected_command);
epee::byte_slice fragmented = epee::levin::make_fragmented_notify(noise, expected_fragmented_command, epee::strspan<std::uint8_t>(in_fragmented_data)); epee::byte_slice fragmented = epee::levin::make_fragmented_notify(noise.size(), expected_fragmented_command, std::move(in_fragmented_data));
EXPECT_EQ(5u, fragmented.size() / 1024); EXPECT_EQ(5u, fragmented.size() / 1024);
EXPECT_EQ(0u, fragmented.size() % 1024); EXPECT_EQ(0u, fragmented.size() % 1024);
@ -497,11 +505,13 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process
ASSERT_TRUE(conn->m_protocol_handler.handle_recv(next.data(), next.size())); ASSERT_TRUE(conn->m_protocol_handler.handle_recv(next.data(), next.size()));
} }
in_fragmented_data.resize(((1024 - sizeof(epee::levin::bucket_head2)) * 5) - sizeof(epee::levin::bucket_head2)); // add padding zeroes std::string compare_buffer(1024 * 4, 'c');
compare_buffer.resize(((1024 - sizeof(epee::levin::bucket_head2)) * 5) - sizeof(epee::levin::bucket_head2)); // add padding zeroes
ASSERT_EQ(4u, m_commands_handler.notify_counter()); ASSERT_EQ(4u, m_commands_handler.notify_counter());
ASSERT_EQ(0u, m_commands_handler.invoke_counter()); ASSERT_EQ(0u, m_commands_handler.invoke_counter());
ASSERT_EQ(expected_fragmented_command, m_commands_handler.last_command()); ASSERT_EQ(expected_fragmented_command, m_commands_handler.last_command());
ASSERT_EQ(in_fragmented_data, m_commands_handler.last_in_buf()); ASSERT_EQ(compare_buffer, m_commands_handler.last_in_buf());
ASSERT_EQ(0u, conn->send_counter()); ASSERT_EQ(0u, conn->send_counter());
ASSERT_TRUE(conn->last_send_data().empty()); ASSERT_TRUE(conn->last_send_data().empty());

View file

@ -245,9 +245,9 @@ namespace
return out; return out;
} }
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, cryptonote::levin::detail::p2p_context& context) override final virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, cryptonote::levin::detail::p2p_context& context) override final
{ {
buff_out = nullptr; buff_out.clear();
invoked_.push_back( invoked_.push_back(
{context.m_connection_id, command, std::string{reinterpret_cast<const char*>(in_buff.data()), in_buff.size()}} {context.m_connection_id, command, std::string{reinterpret_cast<const char*>(in_buff.data()), in_buff.size()}}
); );
@ -384,21 +384,50 @@ TEST(make_header, expect_return)
EXPECT_EQ(0u, header1.m_flags); EXPECT_EQ(0u, header1.m_flags);
} }
TEST(make_notify, empty_payload) TEST(message_writer, invoke_with_empty_payload)
{ {
const epee::byte_slice message = epee::levin::make_notify(443, nullptr); const epee::byte_slice message = epee::levin::message_writer{}.finalize_invoke(443);
const epee::levin::bucket_head2 header =
epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, true);
ASSERT_EQ(sizeof(header), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
}
TEST(message_writer, invoke_with_payload)
{
std::string bytes(100, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
epee::levin::message_writer writer{};
writer.buffer.write(epee::to_span(bytes));
const epee::byte_slice message = writer.finalize_invoke(443);
const epee::levin::bucket_head2 header =
epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, true);
ASSERT_EQ(sizeof(header) + bytes.size(), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0);
}
TEST(message_writer, notify_with_empty_payload)
{
const epee::byte_slice message = epee::levin::message_writer{}.finalize_notify(443);
const epee::levin::bucket_head2 header = const epee::levin::bucket_head2 header =
epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, false); epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, false);
ASSERT_EQ(sizeof(header), message.size()); ASSERT_EQ(sizeof(header), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
} }
TEST(make_notify, with_payload) TEST(message_writer, notify_with_payload)
{ {
std::string bytes(100, 'a'); std::string bytes(100, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
const epee::byte_slice message = epee::levin::make_notify(443, epee::strspan<std::uint8_t>(bytes)); epee::levin::message_writer writer{};
writer.buffer.write(epee::to_span(bytes));
const epee::byte_slice message = writer.finalize_notify(443);
const epee::levin::bucket_head2 header = const epee::levin::bucket_head2 header =
epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, false); epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, false);
@ -407,6 +436,44 @@ TEST(make_notify, with_payload)
EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0); EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0);
} }
TEST(message_writer, response_with_empty_payload)
{
const epee::byte_slice message = epee::levin::message_writer{}.finalize_response(443, 1);
epee::levin::bucket_head2 header =
epee::levin::make_header(443, 0, LEVIN_PACKET_RESPONSE, false);
header.m_return_code = SWAP32LE(1);
ASSERT_EQ(sizeof(header), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
}
TEST(message_writer, response_with_payload)
{
std::string bytes(100, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
epee::levin::message_writer writer{};
writer.buffer.write(epee::to_span(bytes));
const epee::byte_slice message = writer.finalize_response(443, 6450);
epee::levin::bucket_head2 header =
epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_RESPONSE, false);
header.m_return_code = SWAP32LE(6450);
ASSERT_EQ(sizeof(header) + bytes.size(), message.size());
EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0);
EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0);
}
TEST(message_writer, error)
{
epee::levin::message_writer writer{};
writer.buffer.clear();
EXPECT_THROW(writer.finalize_invoke(0), std::runtime_error);
EXPECT_THROW(writer.finalize_notify(0), std::runtime_error);
EXPECT_THROW(writer.finalize_response(0, 0), std::runtime_error);
}
TEST(make_noise, invalid) TEST(make_noise, invalid)
{ {
EXPECT_TRUE(epee::levin::make_noise_notify(sizeof(epee::levin::bucket_head2) - 1).empty()); EXPECT_TRUE(epee::levin::make_noise_notify(sizeof(epee::levin::bucket_head2) - 1).empty());
@ -428,13 +495,13 @@ TEST(make_noise, valid)
TEST(make_fragment, invalid) TEST(make_fragment, invalid)
{ {
EXPECT_TRUE(epee::levin::make_fragmented_notify(nullptr, 0, nullptr).empty()); EXPECT_TRUE(epee::levin::make_fragmented_notify(0, 0, epee::levin::message_writer{}).empty());
} }
TEST(make_fragment, single) TEST(make_fragment, single)
{ {
const epee::byte_slice noise = epee::levin::make_noise_notify(1024); const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
const epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 11, nullptr); const epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise.size(), 11, epee::levin::message_writer{});
const epee::levin::bucket_head2 header = const epee::levin::bucket_head2 header =
epee::levin::make_header(11, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_REQUEST, false); epee::levin::make_header(11, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_REQUEST, false);
@ -449,8 +516,13 @@ TEST(make_fragment, multiple)
std::string bytes(1024 * 3 - 150, 'a'); std::string bytes(1024 * 3 - 150, 'a');
std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); std::generate(bytes.begin(), bytes.end(), crypto::random_device{});
epee::levin::message_writer message;
message.buffer.write(epee::to_span(bytes));
const epee::byte_slice noise = epee::levin::make_noise_notify(1024); const epee::byte_slice noise = epee::levin::make_noise_notify(1024);
epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 114, epee::strspan<std::uint8_t>(bytes)); epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise.size(), 114, std::move(message));
EXPECT_EQ(1024 * 3, fragment.size());
epee::levin::bucket_head2 header = epee::levin::bucket_head2 header =
epee::levin::make_header(0, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_BEGIN, false); epee::levin::make_header(0, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_BEGIN, false);
@ -497,6 +569,7 @@ TEST(make_fragment, multiple)
fragment.take_slice(bytes.size()); fragment.take_slice(bytes.size());
EXPECT_EQ(18, fragment.size());
EXPECT_EQ(18, std::count(fragment.cbegin(), fragment.cend(), 0)); EXPECT_EQ(18, std::count(fragment.cbegin(), fragment.cend(), 0));
} }
@ -2161,20 +2234,31 @@ TEST_F(levin_notify, command_max_bytes)
add_connection(true); add_connection(true);
std::string bytes(4096, 'h'); std::string payload(4096, 'h');
epee::byte_slice bytes;
{
epee::levin::message_writer dest{};
dest.buffer.write(epee::to_span(payload));
bytes = dest.finalize_notify(ping_command);
}
EXPECT_EQ(1, get_connections().notify(ping_command, epee::strspan<std::uint8_t>(bytes), contexts_.front().get_id())); EXPECT_EQ(1, get_connections().send(bytes.clone(), contexts_.front().get_id()));
EXPECT_EQ(1u, contexts_.front().process_send_queue(true)); EXPECT_EQ(1u, contexts_.front().process_send_queue(true));
EXPECT_EQ(1u, receiver_.notified_size()); EXPECT_EQ(1u, receiver_.notified_size());
const received_message msg = receiver_.get_raw_notification(); const received_message msg = receiver_.get_raw_notification();
EXPECT_EQ(ping_command, msg.command); EXPECT_EQ(ping_command, msg.command);
EXPECT_EQ(contexts_.front().get_id(), msg.connection); EXPECT_EQ(contexts_.front().get_id(), msg.connection);
EXPECT_EQ(bytes, msg.payload); EXPECT_EQ(payload, msg.payload);
bytes.push_back('e'); {
payload.push_back('h');
epee::levin::message_writer dest{};
dest.buffer.write(epee::to_span(payload));
bytes = dest.finalize_notify(ping_command);
}
EXPECT_EQ(1, get_connections().notify(ping_command, epee::strspan<std::uint8_t>(bytes), contexts_.front().get_id())); EXPECT_EQ(1, get_connections().send(std::move(bytes), contexts_.front().get_id()));
EXPECT_EQ(1u, contexts_.front().process_send_queue(false)); EXPECT_EQ(1u, contexts_.front().process_send_queue(false));
EXPECT_EQ(0u, receiver_.notified_size()); EXPECT_EQ(0u, receiver_.notified_size());
} }