mirror of
https://github.com/monero-project/monero.git
synced 2025-01-25 20:15:58 +00:00
Fix tx flush callback queueing
This commit is contained in:
parent
850edfe419
commit
dff1d8067c
1 changed files with 12 additions and 23 deletions
|
@ -283,9 +283,10 @@ namespace levin
|
||||||
strand(io_service),
|
strand(io_service),
|
||||||
map(),
|
map(),
|
||||||
channels(),
|
channels(),
|
||||||
flush_time(std::chrono::steady_clock::time_point::max()),
|
|
||||||
connection_count(0),
|
connection_count(0),
|
||||||
|
flush_callbacks(0),
|
||||||
nzone(zone),
|
nzone(zone),
|
||||||
|
is_public(is_public),
|
||||||
pad_txs(pad_txs),
|
pad_txs(pad_txs),
|
||||||
fluffing(false)
|
fluffing(false)
|
||||||
{
|
{
|
||||||
|
@ -300,9 +301,10 @@ namespace levin
|
||||||
boost::asio::io_service::strand strand;
|
boost::asio::io_service::strand strand;
|
||||||
net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems
|
net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems
|
||||||
std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand`
|
std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand`
|
||||||
std::chrono::steady_clock::time_point flush_time; //!< Next expected Dandelion++ fluff flush
|
|
||||||
std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time
|
std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time
|
||||||
|
std::uint32_t flush_callbacks; //!< Number of active fluff flush callbacks queued
|
||||||
const epee::net_utils::zone nzone; //!< Zone is public ipv4/ipv6 connections, or i2p or tor
|
const epee::net_utils::zone nzone; //!< Zone is public ipv4/ipv6 connections, or i2p or tor
|
||||||
|
const bool is_public; //!< Zone is public ipv4/ipv6 connections
|
||||||
const bool pad_txs; //!< Pad txs to the next boundary for privacy
|
const bool pad_txs; //!< Pad txs to the next boundary for privacy
|
||||||
bool fluffing; //!< Zone is in Dandelion++ fluff epoch
|
bool fluffing; //!< Zone is in Dandelion++ fluff epoch
|
||||||
};
|
};
|
||||||
|
@ -348,7 +350,6 @@ namespace levin
|
||||||
struct fluff_flush
|
struct fluff_flush
|
||||||
{
|
{
|
||||||
std::shared_ptr<detail::zone> zone_;
|
std::shared_ptr<detail::zone> zone_;
|
||||||
std::chrono::steady_clock::time_point flush_time_;
|
|
||||||
|
|
||||||
static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time)
|
static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time)
|
||||||
{
|
{
|
||||||
|
@ -356,29 +357,22 @@ namespace levin
|
||||||
assert(zone->strand.running_in_this_thread());
|
assert(zone->strand.running_in_this_thread());
|
||||||
|
|
||||||
detail::zone& this_zone = *zone;
|
detail::zone& this_zone = *zone;
|
||||||
this_zone.flush_time = flush_time;
|
++this_zone.flush_callbacks;
|
||||||
this_zone.flush_txs.expires_at(flush_time);
|
this_zone.flush_txs.expires_at(flush_time);
|
||||||
this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone), flush_time}));
|
this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone)}));
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator()(const boost::system::error_code error)
|
void operator()(const boost::system::error_code error)
|
||||||
{
|
{
|
||||||
if (!zone_ || !zone_->p2p)
|
if (!zone_ || !zone_->flush_callbacks || --zone_->flush_callbacks || !zone_->p2p)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
assert(zone_->strand.running_in_this_thread());
|
assert(zone_->strand.running_in_this_thread());
|
||||||
|
|
||||||
const bool timer_error = bool(error);
|
const bool timer_error = bool(error);
|
||||||
if (timer_error)
|
if (timer_error && error != boost::system::errc::operation_canceled)
|
||||||
{
|
|
||||||
if (error != boost::system::errc::operation_canceled)
|
|
||||||
throw boost::system::system_error{error, "fluff_flush timer failed"};
|
throw boost::system::system_error{error, "fluff_flush timer failed"};
|
||||||
|
|
||||||
// new timer canceled this one set in future
|
|
||||||
if (zone_->flush_time < flush_time_)
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto now = std::chrono::steady_clock::now();
|
const auto now = std::chrono::steady_clock::now();
|
||||||
auto next_flush = std::chrono::steady_clock::time_point::max();
|
auto next_flush = std::chrono::steady_clock::time_point::max();
|
||||||
std::vector<std::pair<std::vector<blobdata>, boost::uuids::uuid>> connections{};
|
std::vector<std::pair<std::vector<blobdata>, boost::uuids::uuid>> connections{};
|
||||||
|
@ -413,8 +407,6 @@ namespace levin
|
||||||
|
|
||||||
if (next_flush != std::chrono::steady_clock::time_point::max())
|
if (next_flush != std::chrono::steady_clock::time_point::max())
|
||||||
fluff_flush::queue(std::move(zone_), next_flush);
|
fluff_flush::queue(std::move(zone_), next_flush);
|
||||||
else
|
|
||||||
zone_->flush_time = next_flush; // signal that no timer is set
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -449,13 +441,11 @@ namespace levin
|
||||||
|
|
||||||
MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing");
|
MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing");
|
||||||
|
|
||||||
bool available = false;
|
zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context)
|
||||||
zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush, &available] (detail::p2p_context& context)
|
|
||||||
{
|
{
|
||||||
// When i2p/tor, only fluff to outbound connections
|
// When i2p/tor, only fluff to outbound connections
|
||||||
if (source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income))
|
if (source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income))
|
||||||
{
|
{
|
||||||
available = true;
|
|
||||||
if (context.fluff_txs.empty())
|
if (context.fluff_txs.empty())
|
||||||
context.flush_time = now + (context.m_is_income ? in_duration() : out_duration());
|
context.flush_time = now + (context.m_is_income ? in_duration() : out_duration());
|
||||||
|
|
||||||
|
@ -467,10 +457,9 @@ namespace levin
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!available)
|
if (next_flush == std::chrono::steady_clock::time_point::max())
|
||||||
MWARNING("Unable to send transaction(s), no available connections");
|
MWARNING("Unable to send transaction(s), no available connections");
|
||||||
|
else if (!zone->flush_callbacks || next_flush < zone->flush_txs.expires_at())
|
||||||
if (next_flush < zone->flush_time)
|
|
||||||
fluff_flush::queue(std::move(zone), next_flush);
|
fluff_flush::queue(std::move(zone), next_flush);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue