Do not exit when monerod returns json-rpc error for block fetching (#78)

This commit is contained in:
Lee *!* Clagett 2023-08-09 10:30:36 -04:00 committed by Lee *!* Clagett
parent 4ce21686d7
commit 524e26e1a4
4 changed files with 69 additions and 25 deletions

View file

@ -85,6 +85,8 @@ namespace lws
return "Unspecified error when retrieving exchange rates";
case error::http_server:
return "HTTP server failed";
case error::json_rpc:
return "Error returned by JSON-RPC server";
case error::exchange_rates_old:
return "Exchange rates are older than cache interval";
case error::not_enough_mixin:

View file

@ -57,6 +57,7 @@ namespace lws
exchange_rates_fetch, //!< Exchange rates fetching failed
exchange_rates_old, //!< Exchange rates are older than cache interval
http_server, //!< HTTP server failure (init or run)
json_rpc, //!< Error returned by JSON-RPC server
not_enough_mixin, //!< Not enough outputs to meet mixin count
signal_abort_process, //!< In process ZMQ PUB to abort the process was received
signal_abort_scan, //!< In process ZMQ PUB to abort the scan was received

View file

@ -26,6 +26,7 @@
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "wire/json.h"
#include "wire/wrapper/variant.h"
namespace lws
{
@ -65,6 +66,20 @@ namespace rpc
wire::object(dest, WIRE_FIELD_COPY(id), WIRE_FIELD_COPY(jsonrpc), WIRE_FIELD_COPY(method), WIRE_FIELD(params));
}
struct json_error
{
json_error()
: code(0), message()
{}
std::int32_t code;
std::string message;
};
inline void read_bytes(wire::json_reader& source, json_error& self)
{
wire::object(source, WIRE_FIELD(code), WIRE_FIELD(message));
}
//! \tparam R implements the READ concept
template<typename R>
@ -73,13 +88,18 @@ namespace rpc
json_response() = delete;
unsigned id;
R result;
boost::variant<json_error, R> state;
};
template<typename R>
inline void read_bytes(wire::json_reader& source, json_response<R>& self)
{
wire::object(source, WIRE_FIELD(id), WIRE_FIELD(result));
auto state = wire::variant(std::ref(self.state));
wire::object(source,
WIRE_FIELD(id),
WIRE_OPTION("result", R, state),
WIRE_OPTION("error", json_error, state)
);
}
@ -92,5 +112,26 @@ namespace rpc
using request = json_request<typename M::request, M>;
using response = json_response<typename M::response>;
};
//! \tparam M must implement the METHOD concept.
template<typename M, typename R = typename M::response>
inline expect<R> parse_json_response(std::string&& source)
{
json_response<R> out{};
std::error_code error = wire::json::from_bytes(std::move(source), out);
if (error)
return error;
json_error const* const rpc_error = boost::get<json_error>(std::addressof(out.state));
if (rpc_error)
{
MERROR("JSON-RPC server sent error code " << rpc_error->code << " with message: " << rpc_error->message);
return {error::json_rpc};
}
return {boost::get<R>(std::move(out.state))};
}
} // rpc
} // lws

View file

@ -362,11 +362,10 @@ namespace lws
return false;
}
rpc::json<rpc::get_transaction_pool>::response txpool{};
const std::error_code err = wire::json::from_bytes(std::move(*resp), txpool);
if (err)
MONERO_THROW(err, "Invalid json-rpc");
for (auto& tx : txpool.result.transactions)
auto txpool = rpc::parse_json_response<rpc::get_transaction_pool>(std::move(*resp));
if (!txpool)
MONERO_THROW(txpool.error(), "Failed fetching transaction pool");
for (auto& tx : txpool->transactions)
txpool_.emplace(get_transaction_prefix_hash(tx.tx), tx.tx_hash);
}
@ -651,26 +650,27 @@ namespace lws
MONERO_THROW(resp.error(), "Failed to retrieve blocks from daemon");
}
rpc::json<rpc::get_blocks_fast>::response fetched{};
auto fetched = rpc::parse_json_response<rpc::get_blocks_fast>(std::move(*resp));
if (!fetched)
{
const std::error_code error = wire::json::from_bytes(std::move(*resp), fetched);
if (error)
throw std::system_error{error};
MERROR("Failed to retrieve next blocks: " << fetched.error().message() << ". Resetting state and trying again");
return;
}
if (fetched.result.blocks.empty())
if (fetched->blocks.empty())
throw std::runtime_error{"Daemon unexpectedly returned zero blocks"};
if (fetched.result.start_height != req.start_height)
if (fetched->start_height != req.start_height)
{
MWARNING("Daemon sent wrong blocks, resetting state");
return;
}
// prep for next blocks retrieval
req.start_height = fetched.result.start_height + fetched.result.blocks.size() - 1;
req.start_height = fetched->start_height + fetched->blocks.size() - 1;
block_request = rpc::client::make_message("get_blocks_fast", req);
if (fetched.result.blocks.size() <= 1)
if (fetched->blocks.size() <= 1)
{
// synced to top of chain, wait for next blocks
for (bool wait_for_block = true; wait_for_block; )
@ -712,26 +712,26 @@ namespace lws
if (!send(client, block_request.clone()))
return;
if (fetched.result.blocks.size() != fetched.result.output_indices.size())
if (fetched->blocks.size() != fetched->output_indices.size())
throw std::runtime_error{"Bad daemon response - need same number of blocks and indices"};
blockchain.push_back(cryptonote::get_block_hash(fetched.result.blocks.front().block));
blockchain.push_back(cryptonote::get_block_hash(fetched->blocks.front().block));
auto blocks = epee::to_span(fetched.result.blocks);
auto indices = epee::to_span(fetched.result.output_indices);
auto blocks = epee::to_span(fetched->blocks);
auto indices = epee::to_span(fetched->output_indices);
if (fetched.result.start_height != 1)
if (fetched->start_height != 1)
{
// skip overlap block
blocks.remove_prefix(1);
indices.remove_prefix(1);
}
else
fetched.result.start_height = 0;
fetched->start_height = 0;
for (auto block_data : boost::combine(blocks, indices))
{
++(fetched.result.start_height);
++(fetched->start_height);
cryptonote::block const& block = boost::get<0>(block_data).block;
auto const& txes = boost::get<0>(block_data).transactions;
@ -749,7 +749,7 @@ namespace lws
scan_transaction(
epee::to_mut_span(users),
db::block_id(fetched.result.start_height),
db::block_id(fetched->start_height),
block.timestamp,
miner_tx_hash,
block.miner_tx,
@ -764,7 +764,7 @@ namespace lws
{
scan_transaction(
epee::to_mut_span(users),
db::block_id(fetched.result.start_height),
db::block_id(fetched->start_height),
block.timestamp,
boost::get<0>(tx_data),
boost::get<1>(tx_data),
@ -798,7 +798,7 @@ namespace lws
}
for (account& user : users)
user.updated(db::block_id(fetched.result.start_height));
user.updated(db::block_id(fetched->start_height));
}
}
catch (std::exception const& e)