mirror of
https://github.com/vtnerd/monero-lws.git
synced 2025-01-10 20:54:35 +00:00
:RMQ not yet working
This commit is contained in:
parent
246c905e37
commit
680c9ab304
8 changed files with 248 additions and 6 deletions
|
@ -34,6 +34,11 @@ set(CMAKE_CXX_STANDARD 14)
|
||||||
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
||||||
|
|
||||||
option(BUILD_TESTS "Build Tests" OFF)
|
option(BUILD_TESTS "Build Tests" OFF)
|
||||||
|
option(WITH_RMQ "Build with RMQ publish support" OFF)
|
||||||
|
if (WITH_RMQ)
|
||||||
|
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DMLWS_RMQ_ENABLED")
|
||||||
|
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DMLWS_RMQ_ENABLED")
|
||||||
|
endif()
|
||||||
|
|
||||||
set(MONERO_LIBRARIES
|
set(MONERO_LIBRARIES
|
||||||
daemon_messages
|
daemon_messages
|
||||||
|
@ -160,6 +165,13 @@ if (NOT (Boost_THREAD_LIBRARY STREQUAL monero_Boost_THREAD_LIBRARY_RELEASE))
|
||||||
message(FATAL_ERROR "Boost libraries for monero build differs from this project")
|
message(FATAL_ERROR "Boost libraries for monero build differs from this project")
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
if (WITH_RMQ)
|
||||||
|
find_path(RMQ_INCLUDE_DIR "amqp.h")
|
||||||
|
find_library(RMQ_LIBRARY rabbitmq REQUIRED)
|
||||||
|
else()
|
||||||
|
set(RMQ_INCLUDE_DIR "")
|
||||||
|
set(RMQ_LIBRARY "")
|
||||||
|
endif()
|
||||||
|
|
||||||
set(LMDB_INCLUDE "${monero_LMDB_INCLUDE}")
|
set(LMDB_INCLUDE "${monero_LMDB_INCLUDE}")
|
||||||
set(LMDB_LIB_PATH "monero::lmdb")
|
set(LMDB_LIB_PATH "monero::lmdb")
|
||||||
|
@ -212,6 +224,7 @@ if(APPLE)
|
||||||
list(APPEND IMPORTED_MONERO_LIBRARIES ${IOKIT_LIBRARY})
|
list(APPEND IMPORTED_MONERO_LIBRARIES ${IOKIT_LIBRARY})
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
|
||||||
add_library(monero::libraries INTERFACE IMPORTED)
|
add_library(monero::libraries INTERFACE IMPORTED)
|
||||||
set_property(TARGET monero::libraries PROPERTY
|
set_property(TARGET monero::libraries PROPERTY
|
||||||
INTERFACE_INCLUDE_DIRECTORIES
|
INTERFACE_INCLUDE_DIRECTORIES
|
||||||
|
|
|
@ -97,6 +97,8 @@ namespace lws
|
||||||
return "An unknown in-process message was received";
|
return "An unknown in-process message was received";
|
||||||
case error::system_clock_invalid_range:
|
case error::system_clock_invalid_range:
|
||||||
return "System clock is out of range for account storage format";
|
return "System clock is out of range for account storage format";
|
||||||
|
case error::rmq_failure:
|
||||||
|
return "Failure within the RMQ library";
|
||||||
case error::tx_relay_failed:
|
case error::tx_relay_failed:
|
||||||
return "The daemon failed to relay transaction from REST client";
|
return "The daemon failed to relay transaction from REST client";
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -62,6 +62,7 @@ namespace lws
|
||||||
signal_abort_scan, //!< In process ZMQ PUB to abort the scan was received
|
signal_abort_scan, //!< In process ZMQ PUB to abort the scan was received
|
||||||
signal_unknown, //!< An unknown in process ZMQ PUB was received
|
signal_unknown, //!< An unknown in process ZMQ PUB was received
|
||||||
system_clock_invalid_range, //!< System clock is out of range for storage format
|
system_clock_invalid_range, //!< System clock is out of range for storage format
|
||||||
|
rmq_failure, //!< Error within RMQ library
|
||||||
tx_relay_failed //!< Daemon failed to relayed tx from REST client
|
tx_relay_failed //!< Daemon failed to relayed tx from REST client
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -30,4 +30,5 @@ set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp li
|
||||||
set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h)
|
set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h rates.h)
|
||||||
|
|
||||||
add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers})
|
add_library(monero-lws-rpc ${monero-lws-rpc_sources} ${monero-lws-rpc_headers})
|
||||||
target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper)
|
target_include_directories(monero-lws-rpc PRIVATE ${RMQ_INCLUDE_DIR})
|
||||||
|
target_link_libraries(monero-lws-rpc monero::libraries monero-lws-wire-json monero-lws-wire-wrapper ${RMQ_LIBRARY})
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
|
|
||||||
#include "client.h"
|
#include "client.h"
|
||||||
|
|
||||||
|
#include <boost/algorithm/string/predicate.hpp>
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/utility/string_ref.hpp>
|
#include <boost/utility/string_ref.hpp>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
@ -38,6 +39,10 @@
|
||||||
#include "net/http_client.h" // monero/contrib/epee/include
|
#include "net/http_client.h" // monero/contrib/epee/include
|
||||||
#include "net/zmq.h" // monero/src
|
#include "net/zmq.h" // monero/src
|
||||||
#include "serialization/json_object.h" // monero/src
|
#include "serialization/json_object.h" // monero/src
|
||||||
|
#if MLWS_RMQ_ENABLED
|
||||||
|
#include <amqp.h>
|
||||||
|
#include <amqp_tcp_socket.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace lws
|
namespace lws
|
||||||
{
|
{
|
||||||
|
@ -72,6 +77,38 @@ namespace rpc
|
||||||
};
|
};
|
||||||
using zcontext = std::unique_ptr<void, terminate>;
|
using zcontext = std::unique_ptr<void, terminate>;
|
||||||
|
|
||||||
|
#ifdef MLWS_RMQ_ENABLED
|
||||||
|
constexpr const unsigned rmq_channel = 1;
|
||||||
|
struct rdestroy
|
||||||
|
{
|
||||||
|
void operator()(amqp_connection_state_t ptr) const noexcept
|
||||||
|
{
|
||||||
|
if (ptr)
|
||||||
|
{
|
||||||
|
amqp_channel_close(ptr, rmq_channel, AMQP_REPLY_SUCCESS);
|
||||||
|
amqp_connection_close(ptr, AMQP_REPLY_SUCCESS);
|
||||||
|
amqp_destroy_connection(ptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
struct rcontext
|
||||||
|
{
|
||||||
|
using connection = std::unique_ptr<amqp_connection_state_t_, rdestroy>;
|
||||||
|
|
||||||
|
bool is_available() const noexcept { return conn != nullptr; }
|
||||||
|
|
||||||
|
connection conn;
|
||||||
|
std::string exchange;
|
||||||
|
std::string routing;
|
||||||
|
};
|
||||||
|
#else // !MLWS_RMQ_ENABLED
|
||||||
|
|
||||||
|
struct rcontext
|
||||||
|
{
|
||||||
|
static constexpr bool is_available() noexcept { return false; }
|
||||||
|
};
|
||||||
|
#endif
|
||||||
|
|
||||||
expect<void> do_wait(void* daemon, void* signal_sub, short events, std::chrono::milliseconds timeout) noexcept
|
expect<void> do_wait(void* daemon, void* signal_sub, short events, std::chrono::milliseconds timeout) noexcept
|
||||||
{
|
{
|
||||||
if (timeout <= std::chrono::seconds{0})
|
if (timeout <= std::chrono::seconds{0})
|
||||||
|
@ -135,9 +172,11 @@ namespace rpc
|
||||||
{
|
{
|
||||||
struct context
|
struct context
|
||||||
{
|
{
|
||||||
explicit context(zcontext comm, socket signal_pub, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
|
explicit context(zcontext comm, socket signal_pub, socket external_pub, rcontext rmq, std::string daemon_addr, std::string sub_addr, std::chrono::minutes interval)
|
||||||
: comm(std::move(comm))
|
: comm(std::move(comm))
|
||||||
, signal_pub(std::move(signal_pub))
|
, signal_pub(std::move(signal_pub))
|
||||||
|
, external_pub(std::move(external_pub))
|
||||||
|
, rmq(std::move(rmq))
|
||||||
, daemon_addr(std::move(daemon_addr))
|
, daemon_addr(std::move(daemon_addr))
|
||||||
, sub_addr(std::move(sub_addr))
|
, sub_addr(std::move(sub_addr))
|
||||||
, rates_conn()
|
, rates_conn()
|
||||||
|
@ -152,6 +191,8 @@ namespace rpc
|
||||||
|
|
||||||
zcontext comm;
|
zcontext comm;
|
||||||
socket signal_pub;
|
socket signal_pub;
|
||||||
|
socket external_pub;
|
||||||
|
rcontext rmq;
|
||||||
const std::string daemon_addr;
|
const std::string daemon_addr;
|
||||||
const std::string sub_addr;
|
const std::string sub_addr;
|
||||||
http::http_simple_client rates_conn;
|
http::http_simple_client rates_conn;
|
||||||
|
@ -243,6 +284,11 @@ namespace rpc
|
||||||
client::~client() noexcept
|
client::~client() noexcept
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
bool client::has_publish() const noexcept
|
||||||
|
{
|
||||||
|
return ctx && (ctx->external_pub || ctx->rmq.is_available());
|
||||||
|
}
|
||||||
|
|
||||||
expect<void> client::watch_scan_signals() noexcept
|
expect<void> client::watch_scan_signals() noexcept
|
||||||
{
|
{
|
||||||
MONERO_PRECOND(ctx != nullptr);
|
MONERO_PRECOND(ctx != nullptr);
|
||||||
|
@ -330,6 +376,39 @@ namespace rpc
|
||||||
return success();
|
return success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expect<void> client::publish(epee::byte_slice payload)
|
||||||
|
{
|
||||||
|
MONERO_PRECOND(ctx != nullptr);
|
||||||
|
assert(daemon != nullptr);
|
||||||
|
if (ctx->external_pub == nullptr && !ctx->rmq.is_available())
|
||||||
|
return success();
|
||||||
|
|
||||||
|
expect<void> rc = success();
|
||||||
|
const boost::unique_lock<boost::mutex> guard{ctx->sync_pub};
|
||||||
|
if (ctx->external_pub)
|
||||||
|
rc = net::zmq::send(payload.clone(), ctx->external_pub.get(), 0);
|
||||||
|
|
||||||
|
#ifdef MLWS_RMQ_ENABLED
|
||||||
|
if (ctx->rmq.is_available() && boost::algorithm::starts_with(payload, boost::string_ref{"json"}))
|
||||||
|
{
|
||||||
|
const auto topic = reinterpret_cast<const std::uint8_t*>(std::memchr(payload.data(), ':', payload.size()));
|
||||||
|
if (topic && topic >= payload.data())
|
||||||
|
payload.remove_prefix(topic - payload.data() + 1);
|
||||||
|
|
||||||
|
amqp_bytes_t message{};
|
||||||
|
message.len = payload.size();
|
||||||
|
message.bytes = const_cast<std::uint8_t*>(payload.data());
|
||||||
|
const int rmq_rc = amqp_basic_publish(ctx->rmq.conn.get(), rmq_channel, amqp_cstring_bytes(ctx->rmq.exchange.c_str()), amqp_cstring_bytes(ctx->rmq.routing.c_str()), 1, 1, nullptr, message);
|
||||||
|
if (rmq_rc != 0)
|
||||||
|
{
|
||||||
|
MERROR("Failed RMQ Publish with return code: " << rmq_rc);
|
||||||
|
return {error::rmq_failure};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
expect<rates> client::get_rates() const
|
expect<rates> client::get_rates() const
|
||||||
{
|
{
|
||||||
MONERO_PRECOND(ctx != nullptr);
|
MONERO_PRECOND(ctx != nullptr);
|
||||||
|
@ -343,7 +422,7 @@ namespace rpc
|
||||||
return ctx->cached;
|
return ctx->cached;
|
||||||
}
|
}
|
||||||
|
|
||||||
context context::make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval)
|
context context::make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval)
|
||||||
{
|
{
|
||||||
zcontext comm{zmq_init(1)};
|
zcontext comm{zmq_init(1)};
|
||||||
if (comm == nullptr)
|
if (comm == nullptr)
|
||||||
|
@ -355,9 +434,68 @@ namespace rpc
|
||||||
if (zmq_bind(pub.get(), signal_endpoint) < 0)
|
if (zmq_bind(pub.get(), signal_endpoint) < 0)
|
||||||
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
|
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
|
||||||
|
|
||||||
|
detail::socket external_pub = nullptr;
|
||||||
|
if (!pub_addr.empty())
|
||||||
|
{
|
||||||
|
external_pub = detail::socket{zmq_socket(comm.get(), ZMQ_PUB)};
|
||||||
|
if (external_pub == nullptr)
|
||||||
|
MONERO_THROW(net::zmq::get_error_code(), "zmq_socket");
|
||||||
|
if (zmq_bind(external_pub.get(), pub_addr.c_str()) < 0)
|
||||||
|
MONERO_THROW(net::zmq::get_error_code(), "zmq_bind");
|
||||||
|
}
|
||||||
|
|
||||||
|
rcontext rmq{};
|
||||||
|
#ifdef MLWS_RMQ_ENABLED
|
||||||
|
if (!rmq_info.address.empty())
|
||||||
|
{
|
||||||
|
rmq.exchange = std::move(rmq_info.exchange);
|
||||||
|
rmq.routing = std::move(rmq_info.routing);
|
||||||
|
epee::net_utils::http::url_content url{};
|
||||||
|
if (!epee::net_utils::parse_url(rmq_info.address, url))
|
||||||
|
MONERO_THROW(error::configuration, "Invalid URL spec given for RMQ");
|
||||||
|
if (url.port == 0)
|
||||||
|
MONERO_THROW(error::configuration, "No port specified for RMQ");
|
||||||
|
if (url.uri.empty())
|
||||||
|
url.uri = "/";
|
||||||
|
|
||||||
|
std::string user;
|
||||||
|
std::string pass;
|
||||||
|
boost::regex expression{"^\\w+:\\w+$"};
|
||||||
|
boost::smatch matcher;
|
||||||
|
if (boost::regex_search(url.host, matcher, expression))
|
||||||
|
{
|
||||||
|
user = matcher[0];
|
||||||
|
pass = matcher[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
rmq.conn.reset(amqp_new_connection());
|
||||||
|
const auto socket = amqp_tcp_socket_new(rmq.conn.get());
|
||||||
|
if (!socket)
|
||||||
|
MONERO_THROW(error::configuration, "Unable to create RMQ socket");
|
||||||
|
|
||||||
|
int status = amqp_socket_open(socket, url.host.c_str(), url.port);
|
||||||
|
if (status != 0)
|
||||||
|
{
|
||||||
|
MERROR("Unable to open RMQ socket: " << status);
|
||||||
|
MONERO_THROW(error::rmq_failure, "Unable to open RMQ socket");
|
||||||
|
}
|
||||||
|
if (!user.empty() || !pass.empty())
|
||||||
|
{
|
||||||
|
if (amqp_login(rmq.conn.get(), url.uri.c_str(), 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), pass.c_str()).reply_type != AMQP_RESPONSE_NORMAL)
|
||||||
|
MONERO_THROW(error::rmq_failure, "Failure to login RMQ socket");
|
||||||
|
}
|
||||||
|
if (amqp_channel_open(rmq.conn.get(), rmq_channel) != nullptr)
|
||||||
|
MONERO_THROW(error::rmq_failure, "Unabe to open RMQ channel");
|
||||||
|
MINFO("Connected to RMQ server " << url.host << ":" << url.port);
|
||||||
|
}
|
||||||
|
#else // !MLWS_RMQ_ENABLED
|
||||||
|
if (!rmq_info.address.empty() || !rmq_info.exchange.empty() || !rmq_info.routing.empty())
|
||||||
|
MONERO_THROW(error::configuration, "RabbitMQ support not enabled");
|
||||||
|
#endif
|
||||||
|
|
||||||
return context{
|
return context{
|
||||||
std::make_shared<detail::context>(
|
std::make_shared<detail::context>(
|
||||||
std::move(comm), std::move(pub), std::move(daemon_addr), std::move(sub_addr), rates_interval
|
std::move(comm), std::move(pub), std::move(external_pub), std::move(rmq), std::move(daemon_addr), std::move(sub_addr), rates_interval
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,14 @@ namespace rpc
|
||||||
struct context;
|
struct context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct rmq_details
|
||||||
|
{
|
||||||
|
std::string address;
|
||||||
|
std::string credentials;
|
||||||
|
std::string exchange;
|
||||||
|
std::string routing;
|
||||||
|
};
|
||||||
|
|
||||||
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`.
|
//! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`.
|
||||||
class client
|
class client
|
||||||
{
|
{
|
||||||
|
@ -112,6 +120,9 @@ namespace rpc
|
||||||
return ctx != nullptr;
|
return ctx != nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//! True if an external pub/sub was setup
|
||||||
|
bool has_publish() const noexcept;
|
||||||
|
|
||||||
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
|
//! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`.
|
||||||
expect<void> watch_scan_signals() noexcept;
|
expect<void> watch_scan_signals() noexcept;
|
||||||
|
|
||||||
|
@ -171,10 +182,12 @@ namespace rpc
|
||||||
\note All errors are exceptions; no recovery can occur.
|
\note All errors are exceptions; no recovery can occur.
|
||||||
|
|
||||||
\param daemon_addr Location of ZMQ enabled `monerod` RPC.
|
\param daemon_addr Location of ZMQ enabled `monerod` RPC.
|
||||||
|
\param pub_addr Bind location for publishing ZMQ events.
|
||||||
|
\param rmq_info Required information for RMQ publishing (if enabled)
|
||||||
\param rates_interval Frequency to retrieve exchange rates. Set value to
|
\param rates_interval Frequency to retrieve exchange rates. Set value to
|
||||||
`<= 0` to disable exchange rate retrieval.
|
`<= 0` to disable exchange rate retrieval.
|
||||||
*/
|
*/
|
||||||
static context make(std::string daemon_addr, std::string sub_addr, std::chrono::minutes rates_interval);
|
static context make(std::string daemon_addr, std::string sub_addr, std::string pub_addr, rmq_details rmq_info, std::chrono::minutes rates_interval);
|
||||||
|
|
||||||
context(context&&) = default;
|
context(context&&) = default;
|
||||||
context(context const&) = delete;
|
context(context const&) = delete;
|
||||||
|
|
|
@ -243,6 +243,46 @@ namespace lws
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct zmq_index
|
||||||
|
{
|
||||||
|
const std::uint64_t index;
|
||||||
|
const epee::span<const db::webhook_tx_confirmation> events;
|
||||||
|
};
|
||||||
|
|
||||||
|
void write_bytes(wire::writer& dest, const zmq_index& self)
|
||||||
|
{
|
||||||
|
wire::object(dest, WIRE_FIELD(index), WIRE_FIELD(events));
|
||||||
|
}
|
||||||
|
|
||||||
|
void send_via_zmq(rpc::client& client, const epee::span<const db::webhook_tx_confirmation> events)
|
||||||
|
{
|
||||||
|
struct zmq_order
|
||||||
|
{
|
||||||
|
std::uint64_t current;
|
||||||
|
boost::mutex sync;
|
||||||
|
|
||||||
|
zmq_order()
|
||||||
|
: current(0), sync()
|
||||||
|
{}
|
||||||
|
};
|
||||||
|
|
||||||
|
static zmq_order ordering{};
|
||||||
|
|
||||||
|
//! \TODO monitor XPUB to cull the serialization
|
||||||
|
if (!events.empty() && client.has_publish())
|
||||||
|
{
|
||||||
|
// make sure the event is queued to zmq in order.
|
||||||
|
const boost::unique_lock<boost::mutex> guard{ordering.sync};
|
||||||
|
const zmq_index index{ordering.current++, events};
|
||||||
|
MINFO("Sending ZMQ/RMQ PUB topics json-full-hooks and msgpack-full-hooks");
|
||||||
|
expect<void> result = success();
|
||||||
|
if (!(result = client.publish<wire::json>("json-full-hooks:", index)))
|
||||||
|
MERROR("Failed to serialize+send json-full-hooks: " << result.error().message());
|
||||||
|
if (!(result = client.publish<wire::msgpack>("msgpack-full-hooks:", index)))
|
||||||
|
MERROR("Failed to serialize+send msgpack-full-hooks: " << result.error().message());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct by_height
|
struct by_height
|
||||||
{
|
{
|
||||||
bool operator()(account const& left, account const& right) const noexcept
|
bool operator()(account const& left, account const& right) const noexcept
|
||||||
|
|
|
@ -56,6 +56,13 @@ namespace
|
||||||
{
|
{
|
||||||
const command_line::arg_descriptor<std::string> daemon_rpc;
|
const command_line::arg_descriptor<std::string> daemon_rpc;
|
||||||
const command_line::arg_descriptor<std::string> daemon_sub;
|
const command_line::arg_descriptor<std::string> daemon_sub;
|
||||||
|
const command_line::arg_descriptor<std::string> zmq_pub;
|
||||||
|
#ifdef MLWS_RMQ_ENABLED
|
||||||
|
const command_line::arg_descriptor<std::string> rmq_address;
|
||||||
|
const command_line::arg_descriptor<std::string> rmq_credentials;
|
||||||
|
const command_line::arg_descriptor<std::string> rmq_exchange;
|
||||||
|
const command_line::arg_descriptor<std::string> rmq_routing;
|
||||||
|
#endif
|
||||||
const command_line::arg_descriptor<std::vector<std::string>> rest_servers;
|
const command_line::arg_descriptor<std::vector<std::string>> rest_servers;
|
||||||
const command_line::arg_descriptor<std::vector<std::string>> admin_rest_servers;
|
const command_line::arg_descriptor<std::vector<std::string>> admin_rest_servers;
|
||||||
const command_line::arg_descriptor<std::string> rest_ssl_key;
|
const command_line::arg_descriptor<std::string> rest_ssl_key;
|
||||||
|
@ -90,6 +97,13 @@ namespace
|
||||||
: lws::options()
|
: lws::options()
|
||||||
, daemon_rpc{"daemon", "<protocol>://<address>:<port> of a monerod ZMQ RPC", get_default_zmq()}
|
, daemon_rpc{"daemon", "<protocol>://<address>:<port> of a monerod ZMQ RPC", get_default_zmq()}
|
||||||
, daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""}
|
, daemon_sub{"sub", "tcp://address:port or ipc://path of a monerod ZMQ Pub", ""}
|
||||||
|
, zmq_pub{"zmq-pub", "tcp://address:port or ipc://path of a bind location for ZMQ pub events", ""}
|
||||||
|
#ifdef MLWS_RMQ_ENABLED
|
||||||
|
, rmq_address{"rmq-address", "tcp://<host>/[vhost]"}
|
||||||
|
, rmq_credentials{"rmq-credentials", "<user>:<pass>"}
|
||||||
|
, rmq_exchange{"rmq-exchange", "Name of the RMQ exchange"}
|
||||||
|
, rmq_routing{"rmq-routing", "Routing for the specified exchange"}
|
||||||
|
#endif
|
||||||
, rest_servers{"rest-server", "[(https|http)://<address>:]<port>[/<prefix>] for incoming connections, multiple declarations allowed"}
|
, rest_servers{"rest-server", "[(https|http)://<address>:]<port>[/<prefix>] for incoming connections, multiple declarations allowed"}
|
||||||
, admin_rest_servers{"admin-rest-server", "[(https|http])://<address>:]<port>[/<prefix>] for incoming admin connections, multiple declarations allowed"}
|
, admin_rest_servers{"admin-rest-server", "[(https|http])://<address>:]<port>[/<prefix>] for incoming admin connections, multiple declarations allowed"}
|
||||||
, rest_ssl_key{"rest-ssl-key", "<path> to PEM formatted SSL key for https REST server", ""}
|
, rest_ssl_key{"rest-ssl-key", "<path> to PEM formatted SSL key for https REST server", ""}
|
||||||
|
@ -112,6 +126,13 @@ namespace
|
||||||
lws::options::prepare(description);
|
lws::options::prepare(description);
|
||||||
command_line::add_arg(description, daemon_rpc);
|
command_line::add_arg(description, daemon_rpc);
|
||||||
command_line::add_arg(description, daemon_sub);
|
command_line::add_arg(description, daemon_sub);
|
||||||
|
command_line::add_arg(description, zmq_pub);
|
||||||
|
#ifdef MLWS_RMQ_ENABLED
|
||||||
|
command_line::add_arg(description, rmq_address);
|
||||||
|
command_line::add_arg(description, rmq_credentials);
|
||||||
|
command_line::add_arg(description, rmq_exchange);
|
||||||
|
command_line::add_arg(description, rmq_routing);
|
||||||
|
#endif
|
||||||
description.add_options()(rest_servers.name, boost::program_options::value<std::vector<std::string>>()->default_value({rest_default}, rest_default), rest_servers.description);
|
description.add_options()(rest_servers.name, boost::program_options::value<std::vector<std::string>>()->default_value({rest_default}, rest_default), rest_servers.description);
|
||||||
command_line::add_arg(description, admin_rest_servers);
|
command_line::add_arg(description, admin_rest_servers);
|
||||||
command_line::add_arg(description, rest_ssl_key);
|
command_line::add_arg(description, rest_ssl_key);
|
||||||
|
@ -136,6 +157,8 @@ namespace
|
||||||
lws::rest_server::configuration rest_config;
|
lws::rest_server::configuration rest_config;
|
||||||
std::string daemon_rpc;
|
std::string daemon_rpc;
|
||||||
std::string daemon_sub;
|
std::string daemon_sub;
|
||||||
|
std::string zmq_pub;
|
||||||
|
lws::rpc::rmq_details rmq;
|
||||||
std::string webhook_ssl_verification;
|
std::string webhook_ssl_verification;
|
||||||
std::chrono::minutes rates_interval;
|
std::chrono::minutes rates_interval;
|
||||||
std::size_t scan_threads;
|
std::size_t scan_threads;
|
||||||
|
@ -189,6 +212,17 @@ namespace
|
||||||
},
|
},
|
||||||
command_line::get_arg(args, opts.daemon_rpc),
|
command_line::get_arg(args, opts.daemon_rpc),
|
||||||
command_line::get_arg(args, opts.daemon_sub),
|
command_line::get_arg(args, opts.daemon_sub),
|
||||||
|
command_line::get_arg(args, opts.zmq_pub),
|
||||||
|
#ifdef MLWS_RMQ_ENABLED
|
||||||
|
lws::rpc::rmq_details{
|
||||||
|
command_line::get_arg(args, opts.rmq_address),
|
||||||
|
command_line::get_arg(args, opts.rmq_credentials),
|
||||||
|
command_line::get_arg(args, opts.rmq_exchange),
|
||||||
|
command_line::get_arg(args, opts.rmq_routing)
|
||||||
|
},
|
||||||
|
#else
|
||||||
|
lws::rpc::rmq_details{},
|
||||||
|
#endif
|
||||||
command_line::get_arg(args, opts.webhook_ssl_verification),
|
command_line::get_arg(args, opts.webhook_ssl_verification),
|
||||||
std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)},
|
std::chrono::minutes{command_line::get_arg(args, opts.rates_interval)},
|
||||||
command_line::get_arg(args, opts.scan_threads),
|
command_line::get_arg(args, opts.scan_threads),
|
||||||
|
@ -210,7 +244,7 @@ namespace
|
||||||
|
|
||||||
boost::filesystem::create_directories(prog.db_path);
|
boost::filesystem::create_directories(prog.db_path);
|
||||||
auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max);
|
auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max);
|
||||||
auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), prog.rates_interval);
|
auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval);
|
||||||
|
|
||||||
MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address());
|
MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address());
|
||||||
auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();
|
auto client = lws::scanner::sync(disk.clone(), ctx.connect().value()).value();
|
||||||
|
|
Loading…
Reference in a new issue