rpc: speedup get_output_distribution

and decrease the amount of data carried around
This commit is contained in:
moneromooo-monero 2018-11-08 18:26:59 +00:00
parent 84dd674cd0
commit b9b307d11a
No known key found for this signature in database
GPG key ID: 686F07454D6CEFC3
4 changed files with 110 additions and 4 deletions

View file

@ -2133,7 +2133,7 @@ namespace cryptonote
return false; return false;
} }
res.distributions.push_back({std::move(*data), amount, req.binary}); res.distributions.push_back({std::move(*data), amount, "", req.binary, req.compress});
} }
} }
catch (const std::exception &e) catch (const std::exception &e)
@ -2147,6 +2147,47 @@ namespace cryptonote
return true; return true;
} }
//------------------------------------------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------------------------------------------
bool core_rpc_server::on_get_output_distribution_bin(const COMMAND_RPC_GET_OUTPUT_DISTRIBUTION::request& req, COMMAND_RPC_GET_OUTPUT_DISTRIBUTION::response& res)
{
PERF_TIMER(on_get_output_distribution_bin);
bool r;
if (use_bootstrap_daemon_if_necessary<COMMAND_RPC_GET_OUTPUT_DISTRIBUTION>(invoke_http_mode::BIN, "/get_output_distribution.bin", req, res, r))
return r;
res.status = "Failed";
if (!req.binary)
{
res.status = "Binary only call";
return false;
}
try
{
// 0 is placeholder for the whole chain
const uint64_t req_to_height = req.to_height ? req.to_height : (m_core.get_current_blockchain_height() - 1);
for (uint64_t amount: req.amounts)
{
auto data = rpc::RpcHandler::get_output_distribution([this](uint64_t amount, uint64_t from, uint64_t to, uint64_t &start_height, std::vector<uint64_t> &distribution, uint64_t &base) { return m_core.get_output_distribution(amount, from, to, start_height, distribution, base); }, amount, req.from_height, req_to_height, req.cumulative);
if (!data)
{
res.status = "Failed to get output distribution";
return false;
}
res.distributions.push_back({std::move(*data), amount, "", req.binary, req.compress});
}
}
catch (const std::exception &e)
{
res.status = "Failed to get output distribution";
return false;
}
res.status = CORE_RPC_STATUS_OK;
return true;
}
//------------------------------------------------------------------------------------------------------------------------------
const command_line::arg_descriptor<std::string, false, true, 2> core_rpc_server::arg_rpc_bind_port = { const command_line::arg_descriptor<std::string, false, true, 2> core_rpc_server::arg_rpc_bind_port = {

View file

@ -117,6 +117,7 @@ namespace cryptonote
MAP_URI_AUTO_JON2_IF("/stop_save_graph", on_stop_save_graph, COMMAND_RPC_STOP_SAVE_GRAPH, !m_restricted) MAP_URI_AUTO_JON2_IF("/stop_save_graph", on_stop_save_graph, COMMAND_RPC_STOP_SAVE_GRAPH, !m_restricted)
MAP_URI_AUTO_JON2("/get_outs", on_get_outs, COMMAND_RPC_GET_OUTPUTS) MAP_URI_AUTO_JON2("/get_outs", on_get_outs, COMMAND_RPC_GET_OUTPUTS)
MAP_URI_AUTO_JON2_IF("/update", on_update, COMMAND_RPC_UPDATE, !m_restricted) MAP_URI_AUTO_JON2_IF("/update", on_update, COMMAND_RPC_UPDATE, !m_restricted)
MAP_URI_AUTO_BIN2("/get_output_distribution.bin", on_get_output_distribution_bin, COMMAND_RPC_GET_OUTPUT_DISTRIBUTION)
BEGIN_JSON_RPC_MAP("/json_rpc") BEGIN_JSON_RPC_MAP("/json_rpc")
MAP_JON_RPC("get_block_count", on_getblockcount, COMMAND_RPC_GETBLOCKCOUNT) MAP_JON_RPC("get_block_count", on_getblockcount, COMMAND_RPC_GETBLOCKCOUNT)
MAP_JON_RPC("getblockcount", on_getblockcount, COMMAND_RPC_GETBLOCKCOUNT) MAP_JON_RPC("getblockcount", on_getblockcount, COMMAND_RPC_GETBLOCKCOUNT)
@ -187,6 +188,7 @@ namespace cryptonote
bool on_start_save_graph(const COMMAND_RPC_START_SAVE_GRAPH::request& req, COMMAND_RPC_START_SAVE_GRAPH::response& res); bool on_start_save_graph(const COMMAND_RPC_START_SAVE_GRAPH::request& req, COMMAND_RPC_START_SAVE_GRAPH::response& res);
bool on_stop_save_graph(const COMMAND_RPC_STOP_SAVE_GRAPH::request& req, COMMAND_RPC_STOP_SAVE_GRAPH::response& res); bool on_stop_save_graph(const COMMAND_RPC_STOP_SAVE_GRAPH::request& req, COMMAND_RPC_STOP_SAVE_GRAPH::response& res);
bool on_update(const COMMAND_RPC_UPDATE::request& req, COMMAND_RPC_UPDATE::response& res); bool on_update(const COMMAND_RPC_UPDATE::request& req, COMMAND_RPC_UPDATE::response& res);
bool on_get_output_distribution_bin(const COMMAND_RPC_GET_OUTPUT_DISTRIBUTION::request& req, COMMAND_RPC_GET_OUTPUT_DISTRIBUTION::response& res);
//json_rpc //json_rpc
bool on_getblockcount(const COMMAND_RPC_GETBLOCKCOUNT::request& req, COMMAND_RPC_GETBLOCKCOUNT::response& res); bool on_getblockcount(const COMMAND_RPC_GETBLOCKCOUNT::request& req, COMMAND_RPC_GETBLOCKCOUNT::response& res);

View file

@ -34,6 +34,40 @@
#include "cryptonote_basic/difficulty.h" #include "cryptonote_basic/difficulty.h"
#include "crypto/hash.h" #include "crypto/hash.h"
#include "rpc/rpc_handler.h" #include "rpc/rpc_handler.h"
#include "common/varint.h"
#include "common/perf_timer.h"
namespace
{
template<typename T>
std::string compress_integer_array(const std::vector<T> &v)
{
std::string s;
s.resize(v.size() * (sizeof(T) * 8 / 7 + 1));
char *ptr = (char*)s.data();
for (const T &t: v)
tools::write_varint(ptr, t);
s.resize(ptr - s.data());
return s;
}
template<typename T>
std::vector<T> decompress_integer_array(const std::string &s)
{
std::vector<T> v;
v.reserve(s.size());
int read = 0;
const std::string::const_iterator end = s.end();
for (std::string::const_iterator i = s.begin(); i != end; std::advance(i, read))
{
T t;
read = tools::read_varint(std::string::const_iterator(i), s.end(), t);
CHECK_AND_ASSERT_THROW_MES(read > 0 && read <= 256, "Error decompressing data");
v.push_back(t);
}
return v;
}
}
namespace cryptonote namespace cryptonote
{ {
@ -2222,6 +2256,7 @@ namespace cryptonote
uint64_t to_height; uint64_t to_height;
bool cumulative; bool cumulative;
bool binary; bool binary;
bool compress;
BEGIN_KV_SERIALIZE_MAP() BEGIN_KV_SERIALIZE_MAP()
KV_SERIALIZE(amounts) KV_SERIALIZE(amounts)
@ -2229,6 +2264,7 @@ namespace cryptonote
KV_SERIALIZE_OPT(to_height, (uint64_t)0) KV_SERIALIZE_OPT(to_height, (uint64_t)0)
KV_SERIALIZE_OPT(cumulative, false) KV_SERIALIZE_OPT(cumulative, false)
KV_SERIALIZE_OPT(binary, true) KV_SERIALIZE_OPT(binary, true)
KV_SERIALIZE_OPT(compress, false)
END_KV_SERIALIZE_MAP() END_KV_SERIALIZE_MAP()
}; };
@ -2236,14 +2272,38 @@ namespace cryptonote
{ {
rpc::output_distribution_data data; rpc::output_distribution_data data;
uint64_t amount; uint64_t amount;
std::string compressed_data;
bool binary; bool binary;
bool compress;
BEGIN_KV_SERIALIZE_MAP() BEGIN_KV_SERIALIZE_MAP()
KV_SERIALIZE(amount) KV_SERIALIZE(amount)
KV_SERIALIZE_N(data.start_height, "start_height") KV_SERIALIZE_N(data.start_height, "start_height")
KV_SERIALIZE(binary) KV_SERIALIZE(binary)
KV_SERIALIZE(compress)
if (this_ref.binary) if (this_ref.binary)
{
if (is_store)
{
if (this_ref.compress)
{
const_cast<std::string&>(this_ref.compressed_data) = compress_integer_array(this_ref.data.distribution);
KV_SERIALIZE(compressed_data)
}
else
KV_SERIALIZE_CONTAINER_POD_AS_BLOB_N(data.distribution, "distribution") KV_SERIALIZE_CONTAINER_POD_AS_BLOB_N(data.distribution, "distribution")
}
else
{
if (this_ref.compress)
{
KV_SERIALIZE(compressed_data)
const_cast<std::vector<uint64_t>&>(this_ref.data.distribution) = decompress_integer_array<uint64_t>(this_ref.compressed_data);
}
else
KV_SERIALIZE_CONTAINER_POD_AS_BLOB_N(data.distribution, "distribution")
}
}
else else
KV_SERIALIZE_N(data.distribution, "distribution") KV_SERIALIZE_N(data.distribution, "distribution")
KV_SERIALIZE_N(data.base, "base") KV_SERIALIZE_N(data.base, "base")

View file

@ -2852,10 +2852,11 @@ bool wallet2::get_rct_distribution(uint64_t &start_height, std::vector<uint64_t>
cryptonote::COMMAND_RPC_GET_OUTPUT_DISTRIBUTION::response res = AUTO_VAL_INIT(res); cryptonote::COMMAND_RPC_GET_OUTPUT_DISTRIBUTION::response res = AUTO_VAL_INIT(res);
req.amounts.push_back(0); req.amounts.push_back(0);
req.from_height = 0; req.from_height = 0;
req.cumulative = true; req.cumulative = false;
req.binary = true; req.binary = true;
req.compress = true;
m_daemon_rpc_mutex.lock(); m_daemon_rpc_mutex.lock();
bool r = net_utils::invoke_http_json_rpc("/json_rpc", "get_output_distribution", req, res, m_http_client, rpc_timeout); bool r = net_utils::invoke_http_bin("/get_output_distribution.bin", req, res, m_http_client, rpc_timeout);
m_daemon_rpc_mutex.unlock(); m_daemon_rpc_mutex.unlock();
if (!r) if (!r)
{ {
@ -2882,6 +2883,8 @@ bool wallet2::get_rct_distribution(uint64_t &start_height, std::vector<uint64_t>
MWARNING("Failed to request output distribution: results are not for amount 0"); MWARNING("Failed to request output distribution: results are not for amount 0");
return false; return false;
} }
for (size_t i = 1; i < res.distributions[0].data.distribution.size(); ++i)
res.distributions[0].data.distribution[i] += res.distributions[0].data.distribution[i-1];
start_height = res.distributions[0].data.start_height; start_height = res.distributions[0].data.start_height;
distribution = std::move(res.distributions[0].data.distribution); distribution = std::move(res.distributions[0].data.distribution);
return true; return true;