add async framework

This commit is contained in:
koe 2024-05-20 17:16:32 -05:00
parent c8214782fb
commit c5822a61b5
18 changed files with 3429 additions and 0 deletions

View file

@ -83,6 +83,7 @@ endfunction ()
include(Version)
monero_add_library(version SOURCES ${CMAKE_BINARY_DIR}/version.cpp DEPENDS genversion)
add_subdirectory(async)
add_subdirectory(common)
add_subdirectory(crypto)
add_subdirectory(ringct)

52
src/async/CMakeLists.txt Normal file
View file

@ -0,0 +1,52 @@
# Copyright (c) 2021, The Monero Project
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of
# conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
# of conditions and the following disclaimer in the documentation and/or other
# materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors may be
# used to endorse or promote products derived from this software without specific
# prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set(async_sources
sleepy_task_queue.cpp
task_types.cpp
threadpool.cpp
waiter_manager.cpp)
monero_find_all_headers(async_headers, "${CMAKE_CURRENT_SOURCE_DIR}")
monero_add_library(async
${async_sources}
${async_headers})
target_link_libraries(async
PUBLIC
common
epee
PRIVATE
${EXTRA_LIBRARIES})
target_include_directories(async
PUBLIC
"${CMAKE_CURRENT_SOURCE_DIR}"
PRIVATE
${Boost_INCLUDE_DIRS})

87
src/async/misc_utils.h Normal file
View file

@ -0,0 +1,87 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Miscellaneous async utils.
#pragma once
//local headers
#include "common/expect.h"
//third-party headers
//standard headers
#include <chrono>
#include <future>
//forward declarations
namespace async
{
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
template <typename T>
bool future_is_ready(const std::future<T> &future)
{
try
{
if (!future.valid())
return false;
if (future.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
return false;
} catch (...) { return false; }
return true;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
template <typename T>
bool future_is_ready(const std::shared_future<T> &future)
{
try
{
if (!future.valid())
return false;
if (future.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
return false;
} catch (...) { return false; }
return true;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
template <typename R>
expect<R> unwrap_future(std::future<R> &future)
{
if (!future_is_ready(future)) { return std::error_code{}; }
try { return std::move(future.get()); }
catch (std::error_code e) { return e; }
catch (...) { return std::error_code{}; }
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
} //namespace async

96
src/async/mutex.h Normal file
View file

@ -0,0 +1,96 @@
// Copyright (c) 2024, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// mutex
#pragma once
//local headers
#include "misc_language.h"
#include "misc_log_ex.h"
//third-party headers
//standard headers
#include <atomic>
#include <mutex>
#include <string>
#include <thread>
//forward declarations
// Lock the mutex and then unlock it at the end of the local scope
// - if it fails to unlock (either it's already unlocked or owned by a different thread), the exception is ignored
#define SCOPE_LOCK_MUTEX(mutex) \
mutex.lock(); \
auto scope_exit_handler_##mutex = epee::misc_utils::create_scope_leave_handler([this](){ \
CHECK_AND_ASSERT_THROW_MES(mutex.unlock(), "failed to unlock " + std::string(#mutex)); \
})
namespace async
{
/// mutex
class Mutex final
{
public:
/// disable copy/move
Mutex& operator=(Mutex&&) = delete;
//member functions
/// Lock the mutex and claim ownership
void lock()
{
m_mutex.lock();
m_mutex_owner.store(std::this_thread::get_id(), std::memory_order_relaxed);
}
/// Release ownership and unlock the mutex. If this thread does not own the lock already, returns false.
bool unlock()
{
if (!thread_owns_lock())
return false;
m_mutex_owner.store(std::thread::id{}, std::memory_order_relaxed);
m_mutex.unlock();
return true;
}
/// Check if the given thread owns the mutex
bool thread_owns_lock(const std::thread::id thread_id = std::this_thread::get_id()) const
{
return thread_id == m_mutex_owner.load(std::memory_order_relaxed);
}
private:
//member variables
std::mutex m_mutex;
std::atomic<std::thread::id> m_mutex_owner{std::thread::id{}};
};
} //namespace async

View file

@ -0,0 +1,259 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Sean Parent's reference thread pool
/// ref: https://github.com/stlab/libraries/blob/main/stlab/concurrency/default_executor.hpp
#pragma once
//local headers
//third-party headers
//standard headers
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
//forward declarations
namespace parent
{
enum class TokenQueueResult : unsigned char
{
SUCCESS,
SHUTTING_DOWN,
QUEUE_EMPTY,
TRY_LOCK_FAIL
};
/// async token queue
template <typename TokenT>
class TokenQueue final
{
struct element_t final
{
std::size_t index;
TokenT token;
template <class T>
element_t(const std::size_t index, T&& new_element) :
index{index},
token{std::forward<T>(new_element)}
{}
bool operator<(const element_t& other) const
{
return this->index < other.index;
};
};
// must be called under lock with non-empty queue
TokenT pop_not_empty()
{
TokenT temp{std::move(m_queue.front()).token};
std::pop_heap(std::begin(m_queue), std::end(m_queue));
m_queue.pop_back();
return temp;
}
public:
//constructors
/// resurrect default constructor
TokenQueue() = default;
//member functions
/// try to add an element to the top
template <typename T>
TokenQueueResult try_push(T &&new_element_in)
{
{
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return TokenQueueResult::TRY_LOCK_FAIL;
m_queue.emplace_back(m_index_counter++, std::forward<T>(new_element_in));
std::push_heap(std::begin(m_queue), std::end(m_queue));
}
m_condvar.notify_one();
return TokenQueueResult::SUCCESS;
}
/// add an element to the top (always succeeds)
template <typename T>
void force_push(T &&new_element_in)
{
{
std::lock_guard<std::mutex> lock{m_mutex};
m_queue.emplace_back(m_index_counter++, std::forward<T>(new_element_in));
std::push_heap(std::begin(m_queue), std::end(m_queue));
}
m_condvar.notify_one();
}
/// try to remove an element from the bottom
TokenQueueResult try_pop(TokenT &token_out)
{
// try to lock the queue, then check if there are any elements
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return TokenQueueResult::TRY_LOCK_FAIL;
if (m_queue.size() == 0)
return TokenQueueResult::QUEUE_EMPTY;
// pop the bottom element
token_out = pop_not_empty();
return TokenQueueResult::SUCCESS;
}
/// wait until you can remove an element from the bottom
TokenQueueResult force_pop(TokenT &token_out)
{
// lock the queue then wait until there is an element or the queue is shutting down
std::unique_lock<std::mutex> lock{m_mutex};
while (m_queue.size() == 0 && !m_shutting_down)
m_condvar.wait(lock);
if (m_queue.size() == 0 && m_shutting_down)
return TokenQueueResult::SHUTTING_DOWN;
// pop the bottom element
token_out = pop_not_empty();
return TokenQueueResult::SUCCESS;
}
void shut_down()
{
{
std::unique_lock<std::mutex> lock{m_mutex};
m_shutting_down = true;
}
m_condvar.notify_all();
}
private:
//member variables
/// queue status
bool m_shutting_down{false};
/// queue context
std::vector<element_t> m_queue;
std::mutex m_mutex;
std::condition_variable m_condvar;
std::size_t m_index_counter{0};
};
/// Sean Parent's reference threadpool
class ThreadPool final
{
using queue_t = TokenQueue<std::function<void()>>;
public:
ThreadPool(const std::uint16_t num_workers) :
m_queues{static_cast<std::size_t>(num_workers > 0 ? num_workers : 1)}
{
m_workers.reserve(m_queues.size());
for (std::uint16_t worker_index{0}; worker_index < m_queues.size(); ++worker_index)
{
try
{
m_workers.emplace_back(
[this, worker_index]() mutable
{
try { this->run(worker_index); } catch (...) { /* can't do anything */ }
}
);
}
catch (...) { /* can't do anything */ }
}
}
~ThreadPool()
{
this->shut_down();
}
void run(const std::uint16_t worker_index)
{
while (true)
{
// cycle all the queues
std::function<void()> task;
for (std::uint16_t i{0}; i < m_queues.size(); ++i)
{
if (m_queues[(i + worker_index) % m_queues.size()].try_pop(task) == TokenQueueResult::SUCCESS)
break;
}
// fallback: force pop
if (!task &&
m_queues[worker_index].force_pop(task) == TokenQueueResult::SHUTTING_DOWN)
break;
// run the task
try { task(); } catch (...) {}
}
}
template <typename F>
void submit(F &&function)
{
std::uint32_t start_counter{m_submit_rotation_counter.fetch_add(1, std::memory_order_relaxed)};
// cycle all the queues
for (std::uint32_t i{0}; i < m_queues.size() * 40; ++i)
{
if (m_queues[(i + start_counter) % m_queues.size()].try_push(std::forward<F>(function)) ==
TokenQueueResult::SUCCESS)
return;
}
// fallback: force push
m_queues[start_counter % m_queues.size()].force_push(std::forward<F>(function));
}
void shut_down()
{
for (queue_t &queue : m_queues)
try { queue.shut_down(); } catch (...) {}
for (std::thread &worker : m_workers)
try { if (worker.joinable()) worker.join(); } catch (...) {}
}
private:
//member variables
std::vector<queue_t> m_queues;
std::vector<std::thread> m_workers;
std::atomic<std::uint16_t> m_submit_rotation_counter{0};
};
} //namespace parent

373
src/async/rw_lock.h Normal file
View file

@ -0,0 +1,373 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Single-writer/multi-reader value containers.
/// - Accessing a moved-from container will throw.
/// - We do not guarantee that accessing the same container from multiple threads is not UB.
/// - The containers use a shared_ptr internally, so misuse WILL cause reference cycles.
///
/// Implementation notes:
/// - We use a shared_mutex for the context mutex since after a write_lock is released multiple waiting readers may
/// concurrently acquire a shared lock on the value.
#pragma once
//local headers
//third-party headers
#include <boost/optional/optional.hpp>
#include <boost/thread/shared_mutex.hpp>
//standard headers
#include <atomic>
#include <condition_variable>
#include <memory>
#include <stdexcept>
#include <type_traits>
//forward declarations
namespace async
{
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
namespace detail
{
/// enable if nonconst
template <typename, typename = void>
struct enable_if_nonconst;
template <typename T>
struct enable_if_nonconst<T, std::enable_if_t<!std::is_const<T>::value>> {};
template <typename T>
struct enable_if_nonconst<T, std::enable_if_t<std::is_const<T>::value>> final { enable_if_nonconst() = delete; };
/// test a rw_lock pointer
[[noreturn]] inline void rw_lock_ptr_access_error() { throw std::runtime_error{"rw_lock invalid ptr access."}; }
inline void test_rw_ptr(const void *ptr) { if (ptr == nullptr) rw_lock_ptr_access_error(); }
/// value context
template <typename value_t>
struct rw_context final
{
value_t value;
boost::shared_mutex value_mutex;
boost::shared_mutex ctx_mutex;
std::condition_variable_any ctx_condvar;
std::atomic<int> num_readers{0};
};
} //namespace detail
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// declarations
template <typename>
class read_lock;
template <typename>
class write_lock;
template <typename>
class readable;
template <typename>
class writable;
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// READ LOCK (can read the locked value concurrently with other read_locks)
template <typename value_t>
class read_lock final : public detail::enable_if_nonconst<value_t>
{
friend class readable<value_t>;
using ctx_t = detail::rw_context<value_t>;
protected:
//constructors
/// default constructor: disabled
read_lock() = delete;
/// normal constructor: only callable by readable and writable
read_lock(boost::shared_lock<boost::shared_mutex> lock, std::shared_ptr<ctx_t> context) :
m_lock{std::move(lock)},
m_context{std::move(context)}
{}
/// copies: disabled
read_lock(const read_lock<value_t>&) = delete;
read_lock& operator=(const read_lock<value_t>&) = delete;
public:
/// moves: default
read_lock(read_lock<value_t>&&) = default;
read_lock& operator=(read_lock<value_t>&&) = default;
//destructor
~read_lock()
{
if (m_context &&
m_lock.mutex() != nullptr &&
m_lock.owns_lock())
{
{
boost::shared_lock<boost::shared_mutex> ctx_lock{m_context->ctx_mutex};
m_lock.unlock();
}
// if there seem to be no existing readers, notify one waiting writer
// NOTE: this is only an optimization
// - there is a race condition where a new reader is being added/gets added concurrently and the notified
// writer ends up failing to get a lock
// - there is also a race conditon where a writer is added after our .unlock() call above, then a reader gets
// stuck on ctx_condvar, then our notify_one() call here causes that reader to needlessly try to get a lock
if (m_context->num_readers.fetch_sub(1, std::memory_order_relaxed) <= 0)
m_context->ctx_condvar.notify_one(); //notify one waiting writer
}
}
//member functions
/// access the value
const value_t& value() const { detail::test_rw_ptr(m_context.get()); return m_context->value; }
private:
//member variables
boost::shared_lock<boost::shared_mutex> m_lock;
std::shared_ptr<ctx_t> m_context;
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// WRITE LOCK (can mutate the locked value)
template <typename value_t>
class write_lock final : public detail::enable_if_nonconst<value_t>
{
friend class writable<value_t>;
using ctx_t = detail::rw_context<value_t>;
protected:
//constructors
/// default constructor: disabled
write_lock() = delete;
/// normal constructor: only callable by writable
write_lock(boost::unique_lock<boost::shared_mutex> lock, std::shared_ptr<ctx_t> context) :
m_lock{std::move(lock)},
m_context{std::move(context)}
{}
/// copies: disabled
write_lock(const write_lock<value_t>&) = delete;
write_lock& operator=(const write_lock<value_t>&) = delete;
public:
/// moves: default
write_lock(write_lock<value_t>&&) = default;
write_lock& operator=(write_lock<value_t>&&) = default;
//destructor
~write_lock()
{
if (m_context &&
m_lock.mutex() != nullptr &&
m_lock.owns_lock())
{
{
boost::unique_lock<boost::shared_mutex> ctx_lock{m_context->ctx_mutex};
m_lock.unlock();
}
m_context->ctx_condvar.notify_all(); //notify all waiting
}
}
//member functions
/// access the value
value_t& value() { detail::test_rw_ptr(m_context.get()); return m_context->value; }
private:
//member variables
boost::unique_lock<boost::shared_mutex> m_lock;
std::shared_ptr<ctx_t> m_context;
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// READ LOCKABLE (can be copied and spawn read_locks)
template <typename value_t>
class readable final : public detail::enable_if_nonconst<value_t>
{
friend class writable<value_t>;
using ctx_t = detail::rw_context<value_t>;
protected:
//constructors
/// default constructor: disabled
readable() = delete;
/// normal constructor: only callable by writable
readable(std::shared_ptr<ctx_t> context) :
m_context{std::move(context)}
{}
public:
/// normal constructor: from value
readable(const value_t &raw_value) :
m_context{std::make_shared<ctx_t>()}
{
m_context->value = raw_value;
}
readable(value_t &&raw_value) :
m_context{std::make_shared<ctx_t>()}
{
m_context->value = std::move(raw_value);
}
/// moves and copies: default
//member functions
/// try to get a write lock
/// FAILS IF THERE IS A CONCURRENT WRITE LOCK
boost::optional<read_lock<value_t>> try_lock()
{
detail::test_rw_ptr(m_context.get());
boost::shared_lock<boost::shared_mutex> lock{m_context->value_mutex, boost::try_to_lock};
if (!lock.owns_lock()) return boost::none;
else
{
m_context->num_readers.fetch_add(1, std::memory_order_relaxed);
return read_lock<value_t>{std::move(lock), m_context};
}
}
/// get a read lock
/// BLOCKS IF THERE IS A CONCURRENT WRITE LOCK
read_lock<value_t> lock()
{
// cheap attempt
boost::optional<read_lock<value_t>> lock;
if ((lock = this->try_lock()))
return std::move(*lock);
// blocking attempt
detail::test_rw_ptr(m_context.get());
boost::shared_lock<boost::shared_mutex> ctx_lock{m_context->ctx_mutex};
m_context->ctx_condvar.wait(ctx_lock,
[&]() -> bool
{
return (lock = this->try_lock()) != boost::none;
}
);
return std::move(*lock);
}
private:
//member variables
std::shared_ptr<ctx_t> m_context;
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// WRITE LOCKABLE (can spawn readables and write_locks)
template <typename value_t>
class writable final : public detail::enable_if_nonconst<value_t>
{
using ctx_t = detail::rw_context<value_t>;
public:
//constructors
/// default constructor: disabled
writable() = delete;
/// normal constructor: from value
writable(const value_t &raw_value) :
m_context{std::make_shared<ctx_t>()}
{
m_context->value = raw_value;
}
writable(value_t &&raw_value) :
m_context{std::make_shared<ctx_t>()}
{
m_context->value = std::move(raw_value);
}
/// copies: disabled
writable(const writable<value_t>&) = delete;
writable& operator=(const writable<value_t>&) = delete;
/// moves: default
writable(writable<value_t>&&) = default;
writable& operator=(writable<value_t>&&) = default;
//member functions
/// get a readable
readable<value_t> get_readable()
{
detail::test_rw_ptr(m_context.get());
return readable<value_t>{m_context};
}
/// try to get a write lock
/// FAILS IF THERE ARE ANY CONCURRENT WRITE OR READ LOCKS
boost::optional<write_lock<value_t>> try_lock()
{
detail::test_rw_ptr(m_context.get());
boost::unique_lock<boost::shared_mutex> lock{m_context->value_mutex, boost::try_to_lock};
if (!lock.owns_lock()) return boost::none;
else return write_lock<value_t>{std::move(lock), m_context};
}
/// get a write lock
/// BLOCKS IF THERE ARE ANY CONCURRENT WRITE OR READ LOCKS
write_lock<value_t> lock()
{
// cheap attempt
boost::optional<write_lock<value_t>> lock;
if ((lock = this->try_lock()))
return std::move(*lock);
// blocking attempt
detail::test_rw_ptr(m_context.get());
boost::unique_lock<boost::shared_mutex> ctx_lock{m_context->ctx_mutex};
m_context->ctx_condvar.wait(ctx_lock,
[&]() -> bool
{
return (lock = this->try_lock()) != boost::none;
}
);
return std::move(*lock);
}
private:
//member variables
std::shared_ptr<ctx_t> m_context;
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
} //namespace async

View file

@ -0,0 +1,95 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Scoped notification: calls a function when destroyed.
#pragma once
//local headers
//third-party headers
//standard headers
#include <functional>
//forward declarations
namespace async
{
/// scoped notification (notifies on destruction)
/// - only use this if you can GUARANTEE the lifetimes of any references in the notification function are longer
/// than the notification's lifetime
class ScopedNotification final
{
public:
//constructors
/// normal constructor
ScopedNotification(std::function<void()> notification_func) :
m_notification_func{std::move(notification_func)}
{}
/// disable copies (this is a scoped manager)
ScopedNotification(const ScopedNotification&) = delete;
ScopedNotification& operator=(const ScopedNotification&) = delete;
/// moved-from notifications need empty notification functions so they are not called in the destructor
ScopedNotification(ScopedNotification &&other)
{
*this = std::move(other);
}
ScopedNotification& operator=(ScopedNotification &&other)
{
this->notify();
this->m_notification_func = std::move(other).m_notification_func;
other.m_notification_func = nullptr; //nullify the moved-from function
return *this;
}
//destructor
~ScopedNotification()
{
this->notify();
}
private:
//member functions
void notify() noexcept
{
if (m_notification_func)
{
try { m_notification_func(); } catch (...) {}
}
}
//member variables
std::function<void()> m_notification_func;
};
} //namespace async

View file

@ -0,0 +1,166 @@
// Copyright (c) 2022, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//paired header
#include "sleepy_task_queue.h"
//local headers
#include "task_types.h"
//third party headers
//standard headers
#include <atomic>
#include <chrono>
#include <mutex>
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "async"
namespace async
{
//-------------------------------------------------------------------------------------------------------------------
long long time_as_tick_count(const WakeTime &waketime)
{
return wake_time(waketime).time_since_epoch().count();
}
//-------------------------------------------------------------------------------------------------------------------
void SleepyTaskQueue::force_push(SleepyTask &&task)
{
std::lock_guard<std::mutex> lock{m_mutex};;
m_queue.emplace(
time_as_tick_count(task.wake_time),
std::make_unique<SleepingTask>(std::move(task), SleepingTaskStatus::UNCLAIMED)
);
}
//-------------------------------------------------------------------------------------------------------------------
bool SleepyTaskQueue::try_push(SleepyTask &&task)
{
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return false;
m_queue.emplace(
time_as_tick_count(task.wake_time),
std::make_unique<SleepingTask>(std::move(task), SleepingTaskStatus::UNCLAIMED)
);
return true;
}
//-------------------------------------------------------------------------------------------------------------------
bool SleepyTaskQueue::try_swap(const unsigned char max_task_priority, SleepingTask* &task_inout)
{
// initialize the current task's waketime (set to max if there is no task)
auto current_task_waketime_count =
task_inout
? wake_time(task_inout->sleepy_task.wake_time).time_since_epoch().count()
: std::chrono::time_point<std::chrono::steady_clock>::max().time_since_epoch().count();
// lock the queue
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return false;
// try to find an unclaimed task that wakes up sooner than our input task
for (auto &candidate_task : m_queue)
{
const SleepingTaskStatus candidate_status{candidate_task.second->status.load(std::memory_order_acquire)};
// skip reserved and dead tasks
if (candidate_status == SleepingTaskStatus::RESERVED ||
candidate_status == SleepingTaskStatus::DEAD)
continue;
// skip tasks with too-high priority
if (candidate_task.second->sleepy_task.simple_task.priority < max_task_priority)
continue;
// give up: the first unclaimed task does not wake up sooner than our input task
if (current_task_waketime_count <= candidate_task.first)
return false;
// success
// a. release our input task if we have one
if (task_inout)
unclaim_sleeping_task(*task_inout);
// b. acquire this candidate
task_inout = candidate_task.second.get();
reserve_sleeping_task(*task_inout);
return true;
}
return false;
}
//-------------------------------------------------------------------------------------------------------------------
std::list<std::unique_ptr<SleepingTask>> SleepyTaskQueue::try_perform_maintenance(
const std::chrono::time_point<std::chrono::steady_clock> &current_time)
{
// current time
auto now_count = current_time.time_since_epoch().count();
// lock the queue
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return {};
// delete dead tasks and extract awake tasks until the lowest sleeping unclaimed task is encountered
std::list<std::unique_ptr<SleepingTask>> awakened_tasks;
for (auto queue_it = m_queue.begin(); queue_it != m_queue.end();)
{
const SleepingTaskStatus task_status{queue_it->second->status.load(std::memory_order_acquire)};
// skip reserved tasks
if (task_status == SleepingTaskStatus::RESERVED)
{
++queue_it;
continue;
}
// delete dead tasks
if (task_status == SleepingTaskStatus::DEAD)
{
queue_it = m_queue.erase(queue_it);
continue;
}
// extract awake unclaimed tasks
if (queue_it->first <= now_count)
{
awakened_tasks.emplace_back(std::move(queue_it->second));
queue_it = m_queue.erase(queue_it);
continue;
}
// exit when we found an asleep unclaimed task
break;
}
return awakened_tasks;
}
//-------------------------------------------------------------------------------------------------------------------
} //namespace async

View file

@ -0,0 +1,94 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Queue of sleepy tasks.
#pragma once
//local headers
#include "task_types.h"
//third-party headers
#include <boost/container/map.hpp>
//standard headers
#include <chrono>
#include <list>
#include <memory>
#include <mutex>
#include <type_traits>
//forward declarations
namespace async
{
/// SleepyTaskQueue
/// - PRECONDITION: a user of a sleepy task queue with a pointer/reference to a task in that queue should ONLY change
/// the task's status from RESERVED to UNCLAIMED/DEAD (and not any other direction)
/// - once a RESERVED task's status has been changed, the user should assume they no longer have valid access to the
/// task
/// - only change a task's status from RESERVED -> UNCLAIMED if its contents will be left in a valid state after the
/// change (e.g. the internal task shouldn't be in a moved-from state)
class SleepyTaskQueue final
{
public:
//overloaded operators
/// disable copy/move since this may be accessed concurrently from multiple threads
SleepyTaskQueue& operator=(SleepyTaskQueue&&) = delete;
//member functions
/// force push a sleepy task into the queue
void force_push(SleepyTask &&task);
/// try to push a sleepy task into the queue
bool try_push(SleepyTask &&task);
/// try to swap an existing sleepy task with a task that wakes up sooner
/// - this function does not add/remove elements from the queue; instead, it simply adjusts task statuses then
/// swaps pointers
/// - if 'task_inout == nullptr', then we set it to the unclaimed task with the lowest waketime
/// - the cost of this function may be higher than expected if there are many tasks with higher priority than our
/// allowed max priority
bool try_swap(const unsigned char max_task_priority, SleepingTask* &task_inout);
/// try to clean up the queue
/// - remove dead tasks
/// - extract awake unclaimed tasks
std::list<std::unique_ptr<SleepingTask>> try_perform_maintenance(
const std::chrono::time_point<std::chrono::steady_clock> &current_time);
private:
//member variables
/// queue context (sorted by waketime)
boost::container::multimap<std::chrono::time_point<std::chrono::steady_clock>::rep,
std::unique_ptr<SleepingTask>> m_queue;
std::mutex m_mutex;
};
} //namespace async

80
src/async/task_types.cpp Normal file
View file

@ -0,0 +1,80 @@
// Copyright (c) 2022, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//paired header
#include "task_types.h"
//local headers
//third party headers
//standard headers
#include <chrono>
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "async"
namespace async
{
//-------------------------------------------------------------------------------------------------------------------
std::chrono::time_point<std::chrono::steady_clock> wake_time(const WakeTime waketime)
{
return waketime.start_time + waketime.duration;
}
//-------------------------------------------------------------------------------------------------------------------
bool sleepy_task_is_awake(const SleepyTask &task)
{
return wake_time(task.wake_time) <= std::chrono::steady_clock::now();
}
//-------------------------------------------------------------------------------------------------------------------
bool sleeping_task_is_unclaimed(const SleepingTask &task)
{
return task.status.load(std::memory_order_acquire) == SleepingTaskStatus::UNCLAIMED;
}
//-------------------------------------------------------------------------------------------------------------------
bool sleeping_task_is_dead(const SleepingTask &task)
{
return task.status.load(std::memory_order_acquire) == SleepingTaskStatus::DEAD;
}
//-------------------------------------------------------------------------------------------------------------------
void unclaim_sleeping_task(SleepingTask &sleeping_task_inout)
{
sleeping_task_inout.status.store(SleepingTaskStatus::UNCLAIMED, std::memory_order_release);
}
//-------------------------------------------------------------------------------------------------------------------
void reserve_sleeping_task(SleepingTask &sleeping_task_inout)
{
sleeping_task_inout.status.store(SleepingTaskStatus::RESERVED, std::memory_order_release);
}
//-------------------------------------------------------------------------------------------------------------------
void kill_sleeping_task(SleepingTask &sleeping_task_inout)
{
sleeping_task_inout.status.store(SleepingTaskStatus::DEAD, std::memory_order_release);
}
//-------------------------------------------------------------------------------------------------------------------
} //namespace async

266
src/async/task_types.h Normal file
View file

@ -0,0 +1,266 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Task types for a threadpool
#pragma once
//local headers
#include "common/variant.h"
//third-party headers
#include <boost/none.hpp>
//standard headers
#include <atomic>
#include <chrono>
#include <functional>
#include <future>
#include <type_traits>
//forward declarations
namespace async
{
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// waketime
/// - waketime = start time + duration
/// - if 'start time == 0' when a task is received, then the start time will be set to the time at that moment
/// - this allows task-makers to specify either a task's waketime or its sleep duration from the moment it is
/// submitted, e.g. for task continuations that are defined well in advance of when they are submitted
struct WakeTime final
{
std::chrono::time_point<std::chrono::steady_clock> start_time{
std::chrono::time_point<std::chrono::steady_clock>::min()
};
std::chrono::nanoseconds duration{0};
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// possible statuses of a sleepy task in a sleepy queue
enum class SleepingTaskStatus : unsigned char
{
/// task is waiting for a worker
UNCLAIMED,
/// task is reserved by a worker
RESERVED,
/// task has been consumed by a worker
DEAD
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
struct SimpleTask;
struct SleepyTask;
/// task
using TaskVariant = tools::optional_variant<SimpleTask, SleepyTask>;
using task_t = std::function<TaskVariant()>; //tasks auto-return their continuation (or an empty variant)
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// pending task
struct SimpleTask final
{
unsigned char priority;
task_t task;
SimpleTask() = default;
SimpleTask(const SimpleTask&) = delete;
SimpleTask(SimpleTask&&) = default;
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// sleepy task
struct SleepyTask final
{
SimpleTask simple_task;
WakeTime wake_time;
SleepyTask() = default;
SleepyTask(const SleepyTask&) = delete;
SleepyTask(SleepyTask&&) = default;
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// sleeping task
/// note: we need an extra type for sleeping tasks because SleepyTasks are not copy-constructible, and the atomic status
/// is not move-constructible, which means SleepingTasks are very hard to move around
struct SleepingTask final
{
SleepyTask sleepy_task;
std::atomic<SleepingTaskStatus> status{SleepingTaskStatus::UNCLAIMED};
/// normal constructor (this struct is not movable or copyable, so it needs some help...)
SleepingTask(SleepyTask &&sleepy_task, const SleepingTaskStatus status) :
sleepy_task{std::move(sleepy_task)}, status{status}
{}
};
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// copyable tasks
template <
typename F,
typename std::enable_if<
std::is_same<TaskVariant, decltype(std::declval<F>()())>::value
&& std::is_constructible<std::function<TaskVariant()>, F>::value,
bool
>::type = true
>
std::function<TaskVariant()> as_task_function(F &&func)
{
return std::move(func);
}
template <
typename F,
typename std::enable_if<
std::is_same<void, decltype(std::declval<F>()())>::value
&& std::is_constructible<std::function<void()>, F>::value,
bool
>::type = true
>
std::function<TaskVariant()> as_task_function(F &&func)
{
return [l_func = std::move(func)]() -> TaskVariant { l_func(); return boost::none; };
}
/// move-only tasks
//todo: std::packaged_task is inefficient for this use-case but std::move_only_function is C++23
template <
typename F,
typename std::enable_if<
std::is_same<TaskVariant, decltype(std::declval<F>()())>::value
&& !std::is_constructible<std::function<TaskVariant()>, F>::value,
bool
>::type = true
>
std::function<TaskVariant()> as_task_function(F &&func)
{
return
[l_func = std::make_shared<std::packaged_task<TaskVariant()>>(std::move(func))]
() -> TaskVariant
{
if (!l_func) return boost::none;
try { (*l_func)(); return l_func->get_future().get(); } catch (...) {}
return boost::none;
};
}
template <
typename F,
typename std::enable_if<
std::is_same<void, decltype(std::declval<F>()())>::value
&& !std::is_constructible<std::function<void()>, F>::value,
bool
>::type = true
>
std::function<TaskVariant()> as_task_function(F &&func)
{
return
[l_func = std::make_shared<std::packaged_task<void()>>(std::move(func))]
() -> TaskVariant
{
if (!l_func) return boost::none;
try { (*l_func)(); } catch (...) {}
return boost::none;
};
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
/// make simple task
template <typename F>
SimpleTask make_simple_task(const unsigned char priority, F &&func)
{
static_assert(std::is_same<decltype(func()), TaskVariant>::value, "tasks must return task variants");
return SimpleTask{
.priority = priority,
.task = as_task_function(std::forward<F>(func))
};
}
/// make sleepy task
template <typename F>
SleepyTask make_sleepy_task(const unsigned char priority, const WakeTime &waketime, F &&func)
{
return {
make_simple_task(priority, std::forward<F>(func)),
waketime
};
}
template <typename F>
SleepyTask make_sleepy_task(const unsigned char priority, const std::chrono::nanoseconds &duration, F &&func)
{
// note: the start time is left undefined/zero until the task gets scheduled
WakeTime waketime{};
waketime.duration = duration;
return {
make_simple_task(priority, std::forward<F>(func)),
waketime
};
}
template <typename F>
SleepyTask make_sleepy_task(const unsigned char priority,
const std::chrono::time_point<std::chrono::steady_clock> &waketime,
F &&func)
{
return {
make_simple_task(priority, std::forward<F>(func)),
WakeTime{ .start_time = waketime, .duration = std::chrono::nanoseconds{0} }
};
}
//todo
std::chrono::time_point<std::chrono::steady_clock> wake_time(const WakeTime waketime);
//todo
bool sleepy_task_is_awake(const SleepyTask &task);
bool sleeping_task_is_unclaimed(const SleepingTask &task);
bool sleeping_task_is_dead(const SleepingTask &task);
void unclaim_sleeping_task(SleepingTask &sleeping_task_inout);
void reserve_sleeping_task(SleepingTask &sleeping_task_inout);
void kill_sleeping_task(SleepingTask &sleeping_task_inout);
} //namespace async

803
src/async/threadpool.cpp Normal file
View file

@ -0,0 +1,803 @@
// Copyright (c) 2022, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//paired header
#include "threadpool.h"
//local headers
#include "task_types.h"
//third party headers
#include <boost/optional/optional.hpp>
//standard headers
#include <cassert>
#include <list>
#include <thread>
#include <vector>
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "async"
namespace async
{
// start at 1 so each thread's default context id does not match any actual context
static std::atomic<std::uint64_t> s_context_id_counter{1};
static thread_local std::uint64_t tl_context_id{0}; //context this thread is attached to
static thread_local std::uint16_t tl_worker_id{0}; //this thread's id within its context
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static std::uint64_t initialize_threadpool_owner()
{
assert(tl_worker_id == 0); //only threads with id = 0 may own threadpools
// the first time this function is called, initialize with a unique threadpool id
// - a threadpool owner gets its own unique context id to facilitate owning multiple threadpools with
// overlapping lifetimes
static const std::uint64_t id{
[]()
{
tl_context_id = s_context_id_counter.fetch_add(1, std::memory_order_relaxed);
return tl_context_id;
}()
};
return id;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static void initialize_threadpool_worker_thread(const std::uint64_t threadpool_id, const std::uint16_t worker_id)
{
assert(tl_context_id == 0); //only threads without a context may become subthreads of a threadpool
assert(worker_id > 0); //id 0 is reserved for pool owners, who have their own unique context id
tl_context_id = threadpool_id;
tl_worker_id = worker_id;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static std::uint16_t thread_context_id()
{
return tl_context_id;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static std::uint16_t threadpool_worker_id()
{
return tl_worker_id;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static bool test_threadpool_member_invariants(const std::uint64_t threadpool_id, const std::uint64_t owner_id)
{
// if this thread owns the threadpool, its worker id should be 0
if (owner_id == thread_context_id())
return threadpool_worker_id() == 0;
// if this thread doesn't own the threadpool, it should be a subthread of the pool
return (threadpool_id == thread_context_id()) && (threadpool_worker_id() > 0);
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static unsigned char clamp_priority(const unsigned char max_priority_level, const unsigned char priority)
{
if (priority > max_priority_level)
return max_priority_level;
return priority;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static void set_current_time_if_undefined(std::chrono::time_point<std::chrono::steady_clock> &time_inout)
{
// 'undefined' means set to zero
if (time_inout == std::chrono::time_point<std::chrono::steady_clock>::min())
time_inout = std::chrono::steady_clock::now();
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
static TaskVariant execute_task(task_t &task) noexcept
{
try
{
//std::future<TaskVariant> result{task.get_future()};
//task();
//return result.get();
return task();
} catch (...) {}
return boost::none;
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::perform_sleepy_queue_maintenance()
{
// don't do maintenance if there are no unclaimed sleepy tasks (this can allow dead sleepy tasks to linger longer,
// but at the benefit of not performing maintenance when it's not needed)
if (m_num_unclaimed_sleepy_tasks.load(std::memory_order_relaxed) == 0)
return;
// cycle through the sleepy queues once, cleaning up each queue as we go
const std::chrono::time_point<std::chrono::steady_clock> current_time{std::chrono::steady_clock::now()};
for (std::uint16_t queue_index{0}; queue_index < m_num_queues; ++queue_index)
{
// perform maintenance on this queue
std::list<std::unique_ptr<SleepingTask>> awakened_tasks{
m_sleepy_task_queues[queue_index].try_perform_maintenance(current_time)
};
// submit the awakened sleepy tasks
// - note: elements at the bottom of the awakened sleepy tasks are assumed to be higher priority, so we submit
// those first
for (std::unique_ptr<SleepingTask> &task : awakened_tasks)
{
if (!task) continue;
this->submit_simple_task(std::move(task->sleepy_task).simple_task);
}
}
}
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::submit_simple_task(SimpleTask &&simple_task)
{
// spin through the simple task queues at our task's priority level
// - start at the task queue one-after the previous start queue as a naive/simple way to spread tasks out evenly
const unsigned char clamped_priority{clamp_priority(m_max_priority_level, simple_task.priority)};
const std::uint16_t start_counter{m_normal_queue_submission_counter.fetch_add(1, std::memory_order_relaxed)};
std::uint32_t queue_index{0};
for (std::uint32_t i{0}; i < m_num_queues * m_num_submit_cycle_attempts; ++i)
{
// try to push into the specified queue
queue_index = (i + start_counter) % m_num_queues;
// leave if submitting the task succeeded
if (m_task_queues[clamped_priority][queue_index].try_push(std::move(simple_task).task) ==
TokenQueueResult::SUCCESS)
{
m_waiter_manager.notify_one();
return;
}
}
// fallback: force insert
queue_index = start_counter % m_num_queues;
m_task_queues[clamped_priority][queue_index].force_push(std::move(simple_task).task);
m_waiter_manager.notify_one();
}
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::submit_sleepy_task(SleepyTask &&sleepy_task)
{
// set the start time of sleepy tasks with undefined start time
set_current_time_if_undefined(sleepy_task.wake_time.start_time);
// if the sleepy task is awake, unwrap its internal simple task
if (sleepy_task_is_awake(sleepy_task))
{
this->submit(std::move(sleepy_task).simple_task);
return;
}
// cycle the sleepy queues
const std::uint16_t start_counter{m_sleepy_queue_submission_counter.fetch_add(1, std::memory_order_relaxed)};
std::uint32_t queue_index{0};
for (std::uint32_t i{0}; i < m_num_queues * m_num_submit_cycle_attempts; ++i)
{
// try to push into a queue
queue_index = (i + start_counter) % m_num_queues;
if (!m_sleepy_task_queues[queue_index].try_push(std::move(sleepy_task)))
continue;
// success
m_num_unclaimed_sleepy_tasks.fetch_add(1, std::memory_order_relaxed);
m_waiter_manager.notify_one();
return;
}
// fallback: force insert
queue_index = start_counter % m_num_queues;
m_sleepy_task_queues[queue_index].force_push(std::move(sleepy_task));
m_num_unclaimed_sleepy_tasks.fetch_add(1, std::memory_order_relaxed);
m_waiter_manager.notify_one();
}
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
boost::optional<task_t> Threadpool::try_get_simple_task_to_run(const unsigned char max_task_priority,
const std::uint16_t worker_index)
{
// cycle the simple queues once, from highest to lowest priority (starting at the specified max task priority)
// - note: priority '0' is the highest priority so if the threadpool user adds a priority level, all their highest
// priority tasks will remain highest priority until they manually change them
// - note: we include a 'max task priority' so a worker can choose to only work on low-priority tasks (useful for
// purging the queue when you have multiple contending high-priority self-extending task loops)
task_t new_task;
std::uint16_t queue_index {0};
for (unsigned char priority{clamp_priority(m_max_priority_level, max_task_priority)};
priority <= m_max_priority_level;
++priority)
{
for (std::uint16_t i{0}; i < m_num_queues; ++i)
{
queue_index = (i + worker_index) % m_num_queues;
if (m_task_queues[priority][queue_index].try_pop(new_task) == TokenQueueResult::SUCCESS)
return new_task;
}
}
// failure
return boost::none;
}
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
boost::optional<task_t> Threadpool::try_wait_for_sleepy_task_to_run(const unsigned char max_task_priority,
const std::uint16_t worker_index,
const std::function<
WaiterManager::Result(
const std::uint16_t,
const std::chrono::time_point<std::chrono::steady_clock>&,
const WaiterManager::ShutdownPolicy
)
> &custom_wait_until)
{
// wait until we have an awake task while listening to the task notification system
SleepingTask* sleeping_task{nullptr};
boost::optional<task_t> final_task{};
bool found_sleepy_task{false};
std::uint16_t queue_index{0};
while (true)
{
// try to grab a sleepy task with the lowest waketime possible
for (std::uint16_t i{0}; i < m_num_queues; ++i)
{
queue_index = (i + worker_index) % m_num_queues;
m_sleepy_task_queues[queue_index].try_swap(max_task_priority, sleeping_task);
}
// failure: no sleepy task available
if (!sleeping_task)
break;
else if (!found_sleepy_task)
{
// record that there is one fewer unclaimed task in the sleepy queues
m_num_unclaimed_sleepy_tasks.fetch_sub(1, std::memory_order_relaxed);
found_sleepy_task = true;
}
// wait while listening
// - when shutting down, aggressively awaken sleepy tasks (this tends to burn CPU for tasks that really
// do need to wait, but improves shutdown responsiveness)
const WaiterManager::Result wait_result{
custom_wait_until(worker_index,
wake_time(sleeping_task->sleepy_task.wake_time),
WaiterManager::ShutdownPolicy::EXIT_EARLY)
};
// if we stopped waiting due to a wait condition being satisfied, release our sleepy task
if (wait_result == WaiterManager::Result::CONDITION_TRIGGERED)
{
// release our sleepy task
unclaim_sleeping_task(*sleeping_task);
m_num_unclaimed_sleepy_tasks.fetch_add(1, std::memory_order_relaxed);
// notify another worker now that our sleepy task is available again
m_waiter_manager.notify_one();
break;
}
// if our sleepy task is awake then we can extract its internal task
if (sleepy_task_is_awake(sleeping_task->sleepy_task) || wait_result == WaiterManager::Result::SHUTTING_DOWN)
{
// get the task
final_task = std::move(sleeping_task->sleepy_task).simple_task.task;
// kill the sleepy task so it can be cleaned up
kill_sleeping_task(*sleeping_task);
// if we finished waiting due to something other than a timeout, notify another worker
// - if we ended waiting due to a notification, then there is another task in the pool that can be worked
// on, but we are going to work on our awakened sleepy task so we need another worker to grab that new task
// - if we ended waiting due to a shutdown, then we don't want workers to be waiting (unless on a conditional
// wait), so it is fine to aggressively notify in that case
if (wait_result != WaiterManager::Result::TIMEOUT)
m_waiter_manager.notify_one();
break;
}
// try to replace our sleepy task with a simple task
if ((final_task = try_get_simple_task_to_run(max_task_priority, worker_index)))
{
// release our sleepy task
unclaim_sleeping_task(*sleeping_task);
m_num_unclaimed_sleepy_tasks.fetch_add(1, std::memory_order_relaxed);
// notify another worker now that our sleepy task is available again
m_waiter_manager.notify_one();
break;
}
}
return final_task;
}
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
boost::optional<task_t> Threadpool::try_get_task_to_run(const unsigned char max_task_priority,
const std::uint16_t worker_index,
const std::function<
WaiterManager::Result(
const std::uint16_t,
const std::chrono::time_point<std::chrono::steady_clock>&,
const WaiterManager::ShutdownPolicy
)
> &custom_wait_until) noexcept
{
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
try
{
// try to find a simple task
if (auto task = try_get_simple_task_to_run(max_task_priority, worker_index))
return task;
// try to wait on a sleepy task
if (auto task = try_wait_for_sleepy_task_to_run(max_task_priority, worker_index, custom_wait_until))
return task;
} catch (...) {}
// failure
return boost::none;
}
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::run_as_worker_DONT_CALL_ME()
{
// only call run_as_worker_DONT_CALL_ME() from worker subthreads of the threadpool or from the owner when shutting down
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
const std::uint16_t worker_id{threadpool_worker_id()};
assert(worker_id < m_num_queues);
assert(worker_id > 0 ||
(thread_context_id() == m_threadpool_owner_id && m_waiter_manager.is_shutting_down()));
// prepare custom wait-until function
std::function<
WaiterManager::Result(
const std::uint16_t,
const std::chrono::time_point<std::chrono::steady_clock>&,
const WaiterManager::ShutdownPolicy
)
> custom_wait_until{
[this]
(
const std::uint16_t worker_id,
const std::chrono::time_point<std::chrono::steady_clock> &timepoint,
const WaiterManager::ShutdownPolicy shutdown_policy
) mutable -> WaiterManager::Result
{
// low priority wait since this will be used for sitting on sleepy tasks
return m_waiter_manager.wait_until(worker_id, timepoint,
shutdown_policy,
WaiterManager::WaitPriority::LOW);
}
};
while (true)
{
// try to get the next task, then run it and immediately submit its continuation
// - note: we don't immediately run task continuations because we want to always be pulling tasks from
// the bottom of the task pile
if (auto task = this->try_get_task_to_run(0, worker_id, custom_wait_until))
{
this->submit(execute_task(*task));
continue;
}
// we failed to get a task, so wait until some other worker submits a task and notifies us
// - we only test the shutdown condition immediately after failing to get a task because we want the pool to
// continue draining tasks until it is completely empty (users should directly/manually cancel in-flight tasks
// if that is needed)
// - due to race conditions in the waiter manager, it is possible for workers to shut down even with tasks in
// the queues; typically, the worker that submits a task will be able to pick up that task and finish it, but
// as a fall-back the thread that destroys the threadpool will purge the pool of all tasks
// - we periodically wake up to check the queues in case of race conditions around task submission (submitted
// tasks will always be executed eventually, but may be excessively delayed if we don't wake up here)
// - this is a high priority wait since we are not sitting on any tasks here
if (m_waiter_manager.is_shutting_down())
break;
m_waiter_manager.wait_for(worker_id,
m_max_wait_duration,
WaiterManager::ShutdownPolicy::EXIT_EARLY,
WaiterManager::WaitPriority::HIGH);
}
}
//-------------------------------------------------------------------------------------------------------------------
// ThreadPool INTERNAL
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::run_as_fanout_worker_DONT_CALL_ME()
{
// only call run_as_fanout_worker_DONT_CALL_ME() from fanout subthreads of the threadpool
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
assert(threadpool_worker_id() >= m_num_queues);
while (true)
{
// try to get a fanout wait condition
// - failure means we are shutting down
FanoutCondition fanout_condition;
if (m_fanout_condition_queue.force_pop(fanout_condition) != TokenQueueResult::SUCCESS)
return;
// set the fanout worker index
// - we need to set this here since it can't be known in advance
// - don't do any work if we got here after the condition was set
if (!fanout_condition.worker_index ||
fanout_condition.worker_index->exchange(threadpool_worker_id(), std::memory_order_acq_rel) != 0)
continue;
// work while waiting for the fanout condition
this->work_while_waiting(fanout_condition.condition, 0);
}
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
Threadpool::Threadpool(const unsigned char max_priority_level,
const std::uint16_t num_managed_workers,
const unsigned char num_submit_cycle_attempts,
const std::chrono::nanoseconds max_wait_duration) :
m_threadpool_id{s_context_id_counter.fetch_add(1, std::memory_order_relaxed)},
m_threadpool_owner_id{initialize_threadpool_owner()},
m_max_priority_level{max_priority_level},
m_num_queues{static_cast<std::uint16_t>(num_managed_workers + 1)}, //+1 to include the threadpool owner
m_num_submit_cycle_attempts{num_submit_cycle_attempts},
m_max_wait_duration{max_wait_duration},
m_waiter_manager{static_cast<std::uint16_t>(2*m_num_queues)}
{
// create task queues
m_task_queues = std::vector<std::vector<TokenQueue<task_t>>>{static_cast<std::size_t>(m_max_priority_level + 1)};
for (auto &priority_queues : m_task_queues)
priority_queues = std::vector<TokenQueue<task_t>>{static_cast<std::size_t>(m_num_queues)};
// create sleepy task queues
m_sleepy_task_queues = std::vector<SleepyTaskQueue>{m_num_queues};
// launch workers
// - note: we reserve worker index 0 for the threadpool owner
m_workers.reserve(m_num_queues - 1);
for (std::uint16_t worker_index{1}; worker_index < m_num_queues; ++worker_index)
{
try
{
m_workers.emplace_back(
[this, worker_index]() mutable
{
initialize_threadpool_worker_thread(this->threadpool_id(), worker_index);
try { this->run_as_worker_DONT_CALL_ME(); } catch (...) { /* can't do anything */ }
}
);
}
catch (...) { /* can't do anything */ }
}
// launch fanout workers
// - note: we launch one fanout worker for each main worker (additional worker + threadpool owner)
m_fanout_workers.reserve(m_num_queues);
for (std::uint16_t fanout_worker_index{m_num_queues}; fanout_worker_index < 2*m_num_queues; ++fanout_worker_index)
{
try
{
m_fanout_workers.emplace_back(
[this, fanout_worker_index]() mutable
{
initialize_threadpool_worker_thread(this->threadpool_id(), fanout_worker_index);
try { this->run_as_fanout_worker_DONT_CALL_ME(); } catch (...) { /* can't do anything */ }
}
);
}
catch (...) { /* can't do anything */ }
}
}
//-------------------------------------------------------------------------------------------------------------------
Threadpool::~Threadpool()
{
(void)test_threadpool_member_invariants; //suppress unused warning...
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
assert(thread_context_id() == m_threadpool_owner_id); //only the owner may destroy the object
// shut down the pool
try { this->shut_down(); } catch (...) {}
// join all workers
for (std::thread &worker : m_workers)
try { worker.join(); } catch (...) {}
for (std::thread &fanout_worker : m_fanout_workers)
try { fanout_worker.join(); } catch (...) {}
// clear out any tasks lingering in the pool
try { this->run_as_worker_DONT_CALL_ME(); } catch (...) {}
//todo: if there was an exception above then the threadpool may hang or lead to UB, so maybe it would be best to
// just abort when an exception is detected
}
//-------------------------------------------------------------------------------------------------------------------
bool Threadpool::submit(TaskVariant task) noexcept
{
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
// submit the task
try
{
// case: empty task
if (!task) ; //skip ahead to sleepy queue maintenance
// case: simple task
else if (SimpleTask *simpletask = task.try_unwrap<SimpleTask>())
this->submit_simple_task(std::move(*simpletask));
// case: sleepy task
else if (SleepyTask *sleepytask = task.try_unwrap<SleepyTask>())
this->submit_sleepy_task(std::move(*sleepytask));
// maintain the sleepy queues
this->perform_sleepy_queue_maintenance();
} catch (...) { return false; }
return true;
}
//-------------------------------------------------------------------------------------------------------------------
join_signal_t Threadpool::make_join_signal()
{
return std::make_shared<std::atomic<bool>>();
}
//-------------------------------------------------------------------------------------------------------------------
join_token_t Threadpool::get_join_token(join_signal_t &join_signal_inout)
{
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
return std::make_shared<ScopedNotification>(
[
this,
l_waiter_index = threadpool_worker_id(),
l_threadpool_id = m_threadpool_id,
l_threadpool_owner_id = m_threadpool_owner_id,
l_join_signal = join_signal_inout
]() mutable
{
// set the signal early in case the invariants were violated
if (l_join_signal) { try { l_join_signal->store(true, std::memory_order_relaxed); } catch (...) {} }
// check thread id invariants (to avoid accessing a dangling 'this' pointer)
if (!test_threadpool_member_invariants(l_threadpool_id, l_threadpool_owner_id)) return;
// notify any waiter
m_waiter_manager.notify_conditional_waiter(l_waiter_index, [](){});
}
);
}
//-------------------------------------------------------------------------------------------------------------------
join_condition_t Threadpool::get_join_condition(join_signal_t &&join_signal_in, join_token_t &&join_token_in)
{
// clear the joiner's copy of the join token
join_token_in = nullptr;
// create the join condition
return
[l_join_signal = std::move(join_signal_in)]() -> bool
{
return !l_join_signal || l_join_signal->load(std::memory_order_relaxed);
};
}
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::work_while_waiting(const std::chrono::time_point<std::chrono::steady_clock> &deadline,
const unsigned char max_task_priority)
{
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
const std::uint16_t worker_id{threadpool_worker_id()};
// prepare custom wait-until function
std::function<
WaiterManager::Result(
const std::uint16_t,
const std::chrono::time_point<std::chrono::steady_clock>&,
const WaiterManager::ShutdownPolicy
)
> custom_wait_until{
[this, &deadline]
(
const std::uint16_t worker_id,
const std::chrono::time_point<std::chrono::steady_clock> &timepoint,
const WaiterManager::ShutdownPolicy shutdown_policy
) mutable -> WaiterManager::Result
{
const WaiterManager::Result wait_result{
m_waiter_manager.wait_until(worker_id,
(timepoint < deadline) ? timepoint : deadline, //don't wait longer than the deadline
shutdown_policy,
WaiterManager::WaitPriority::LOW) //low priority wait since we are sitting on a timer
};
// treat the deadline as a condition
if (std::chrono::steady_clock::now() >= deadline)
return WaiterManager::Result::CONDITION_TRIGGERED;
return wait_result;
}
};
// work until the deadline
while (std::chrono::steady_clock::now() < deadline)
{
// try to get the next task, then run it and immediately submit its continuation
if (auto task = this->try_get_task_to_run(max_task_priority, worker_id, custom_wait_until))
{
this->submit(execute_task(*task));
continue;
}
// we failed to get a task, so wait until the deadline
const WaiterManager::Result wait_result{
custom_wait_until(worker_id, deadline, WaiterManager::ShutdownPolicy::WAIT)
};
// exit immediately if the deadline condition was triggered (don't re-test it)
if (wait_result == WaiterManager::Result::CONDITION_TRIGGERED)
break;
}
}
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::work_while_waiting(const std::chrono::nanoseconds &duration, const unsigned char max_task_priority)
{
this->work_while_waiting(std::chrono::steady_clock::now() + duration, max_task_priority);
}
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::work_while_waiting(const std::function<bool()> &wait_condition_func,
const unsigned char max_task_priority)
{
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
const std::uint16_t worker_id{threadpool_worker_id()};
// prepare custom wait-until function
std::function<
WaiterManager::Result(
const std::uint16_t,
const std::chrono::time_point<std::chrono::steady_clock>&,
const WaiterManager::ShutdownPolicy
)
> custom_wait_until{
[this, &wait_condition_func]
(
const std::uint16_t worker_id,
const std::chrono::time_point<std::chrono::steady_clock> &timepoint,
const WaiterManager::ShutdownPolicy shutdown_policy
) mutable -> WaiterManager::Result
{
return m_waiter_manager.conditional_wait_until(worker_id,
wait_condition_func,
timepoint,
shutdown_policy);
}
};
// work until the wait condition is satisfied
while (!wait_condition_func())
{
// try to get the next task, then run it and immediately submit its continuation
if (auto task = this->try_get_task_to_run(max_task_priority, worker_id, custom_wait_until))
{
this->submit(execute_task(*task));
continue;
}
// we failed to get a task, so wait until the condition is satisfied
const WaiterManager::Result wait_result{
custom_wait_until(worker_id,
std::chrono::steady_clock::now() + m_max_wait_duration,
WaiterManager::ShutdownPolicy::WAIT)
};
// exit immediately if the condition was triggered (don't re-test it)
if (wait_result == WaiterManager::Result::CONDITION_TRIGGERED)
break;
}
}
//-------------------------------------------------------------------------------------------------------------------
fanout_token_t Threadpool::launch_temporary_worker()
{
assert(test_threadpool_member_invariants(m_threadpool_id, m_threadpool_owner_id));
// 1. join signal
join_signal_t join_signal{this->make_join_signal()};
// 2. worker index channel for targeted notifications
std::shared_ptr<std::atomic<std::uint64_t>> worker_index_channel{std::make_shared<std::atomic<std::uint64_t>>(0)};
// 3. fanout condition
// - when condition is satisfied, return one fanout worker to the fanout pool
m_fanout_condition_queue.force_push(
FanoutCondition{
.worker_index = worker_index_channel,
.condition =
[l_join_signal = join_signal]() -> bool
{
return !l_join_signal || l_join_signal->load(std::memory_order_relaxed);
}
}
);
// 4. fanout token
// - when token is destroyed, the fanout condition will be triggered
return std::make_unique<ScopedNotification>(
[
this,
l_threadpool_id = m_threadpool_id,
l_threadpool_owner_id = m_threadpool_owner_id,
l_join_signal = std::move(join_signal),
l_worker_index_channel = std::move(worker_index_channel)
]() mutable
{
// leave early if we got here before the worker id became available
if (!l_worker_index_channel) return;
const std::uint64_t worker_id{
l_worker_index_channel->exchange(static_cast<std::uint64_t>(-1), std::memory_order_acq_rel)
};
if (worker_id == 0) return;
// set the signal early in case the invariants were violated
if (l_join_signal) { try { l_join_signal->store(true, std::memory_order_relaxed); } catch (...) {} }
// check thread id invariants (to avoid accessing a dangling 'this' pointer)
if (!test_threadpool_member_invariants(l_threadpool_id, l_threadpool_owner_id)) return;
// notify the waiting fanout worker
m_waiter_manager.notify_conditional_waiter(worker_id, [](){});
}
);
}
//-------------------------------------------------------------------------------------------------------------------
void Threadpool::shut_down() noexcept
{
// shut down the fanout queue
m_fanout_condition_queue.shut_down();
// shut down the waiter manager, which should notify any waiting workers
m_waiter_manager.shut_down();
}
//-------------------------------------------------------------------------------------------------------------------
} //namespace async

218
src/async/threadpool.h Normal file
View file

@ -0,0 +1,218 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// threadpool
#pragma once
//local headers
#include "scoped_notification.h"
#include "sleepy_task_queue.h"
#include "task_types.h"
#include "token_queue.h"
#include "waiter_manager.h"
//third-party headers
#include <boost/container/map.hpp>
#include <boost/optional/optional.hpp>
//standard headers
#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
//forward declarations
namespace async
{
/// join signal/token/condition helper types
using join_signal_t = std::shared_ptr<std::atomic<bool>>;
using join_token_t = std::shared_ptr<ScopedNotification>;
using join_condition_t = std::function<bool()>;
/// fanout token helper type
using fanout_token_t = std::unique_ptr<ScopedNotification>;
struct FanoutCondition final
{
std::shared_ptr<std::atomic<std::uint64_t>> worker_index;
std::function<bool()> condition;
};
/// thread pool
class Threadpool final
{
/// clean up pass on the sleepy queues
void perform_sleepy_queue_maintenance();
/// submit task types
void submit_simple_task(SimpleTask &&simple_task);
void submit_sleepy_task(SleepyTask &&sleepy_task);
/// get a task to run
boost::optional<task_t> try_get_simple_task_to_run(const unsigned char max_task_priority,
const std::uint16_t worker_index);
boost::optional<task_t> try_wait_for_sleepy_task_to_run(const unsigned char max_task_priority,
const std::uint16_t worker_index,
const std::function<
WaiterManager::Result(
const std::uint16_t,
const std::chrono::time_point<std::chrono::steady_clock>&,
const WaiterManager::ShutdownPolicy
)
> &custom_wait_until);
boost::optional<task_t> try_get_task_to_run(const unsigned char max_task_priority,
const std::uint16_t worker_index,
const std::function<
WaiterManager::Result(
const std::uint16_t,
const std::chrono::time_point<std::chrono::steady_clock>&,
const WaiterManager::ShutdownPolicy
)
> &custom_wait_until) noexcept;
public:
/// run as a pool worker
/// - this function is only invoked when launching pool workers and from within the pool destructor
void run_as_worker_DONT_CALL_ME();
/// run as a pool fanout worker
/// - this function is only invoked when launching pool fanout workers
void run_as_fanout_worker_DONT_CALL_ME();
//constructors
/// default constructor: disabled
Threadpool() = delete;
/// normal constructor: from config
Threadpool(const unsigned char max_priority_level,
const std::uint16_t num_managed_workers,
const unsigned char num_submit_cycle_attempts,
const std::chrono::nanoseconds max_wait_duration);
/// disable copy/moves so references to this object can't be invalidated until this object's lifetime ends
Threadpool& operator=(Threadpool&&) = delete;
//destructor
/// destroy the threadpool
/// 1) shuts down the pool
/// 2) joins all worker threads
/// 3) clears out any remaining tasks
/// - note that this ensures any ScopedNotifications attached to tasks will be executed before the pool dies,
/// which ensures references in those notifications
~Threadpool();
//member functions
/// submit a task
/// - note: if submit() returns true, then it is guaranteed the submission succeeded; otherwise it is unspecified
/// what happened to the task (it may have been submitted, or an exception may have caused it to be dropped)
bool submit(TaskVariant task) noexcept;
/// toolkit for manually joining on a set of tasks
/// - how to use this:
/// 1) make a new join signal in the thread that will be joining on a set of tasks yet to be launched
/// 2) [get_join_token()]: get a new join token using the join signal
/// 3) save a copy of the token in the lambda capture of each task in the set of tasks that you want to join on
/// 4) [get_join_condition()]: consume the joining thread's copy of the join token and the join signal to get the
/// join condition
/// 5) call ThreadPool::work_while_waiting() from the joining thread, using that join condition
///
/// - PRECONDITION: the thread that joins on a join token must be the same thread that created that token
/// - PRECONDITION: there must be NO living copies of a join token after the corresponding threadpool has died
static join_signal_t make_join_signal();
join_token_t get_join_token(join_signal_t &join_signal_inout);
static join_condition_t get_join_condition(join_signal_t &&join_signal_in, join_token_t &&join_token_in);
void work_while_waiting(const std::chrono::time_point<std::chrono::steady_clock> &deadline,
const unsigned char max_task_priority = 0);
void work_while_waiting(const std::chrono::nanoseconds &duration, const unsigned char max_task_priority = 0);
void work_while_waiting(const std::function<bool()> &wait_condition_func, const unsigned char max_task_priority = 0);
/// launch a temporary worker
fanout_token_t launch_temporary_worker();
/// shut down the threadpool
void shut_down() noexcept;
/// id of this threadpool
std::uint64_t threadpool_id() const { return m_threadpool_id; }
std::uint64_t threadpool_owner_id() const { return m_threadpool_owner_id; }
private:
//member variables
/// config
const std::uint64_t m_threadpool_id; //unique identifier for this threadpool
const std::uint64_t m_threadpool_owner_id; //unique identifier for the thread that owns this threadpool
const unsigned char m_max_priority_level; //note: priority 0 is the 'highest' priority
const std::uint16_t m_num_queues; //num workers + 1 for the threadpool owner
const unsigned char m_num_submit_cycle_attempts;
const std::chrono::nanoseconds m_max_wait_duration;
/// worker context
std::vector<std::thread> m_workers;
std::vector<std::thread> m_fanout_workers; //extra workers that can be manually activated
/// queues
std::vector<std::vector<TokenQueue<task_t>>> m_task_queues; //outer vector: priorities, inner vector: workers
std::vector<SleepyTaskQueue> m_sleepy_task_queues; //vector: workers
TokenQueue<FanoutCondition> m_fanout_condition_queue;
std::atomic<std::uint16_t> m_normal_queue_submission_counter{0};
std::atomic<std::uint16_t> m_sleepy_queue_submission_counter{0};
std::atomic<std::uint64_t> m_num_unclaimed_sleepy_tasks{0};
// waiter manager
WaiterManager m_waiter_manager;
};
/// default priorities
enum DefaultPriorityLevels : unsigned char
{
MAX = 0,
MEDIUM,
LOW,
MIN = LOW
};
/// default threadpool
inline Threadpool& get_default_threadpool()
{
static Threadpool default_threadpool{
static_cast<unsigned char>(DefaultPriorityLevels::MIN),
static_cast<uint16_t>(std::max(2u, std::thread::hardware_concurrency()) - 1),
20,
std::chrono::milliseconds(500)
};
return default_threadpool;
}
} //namespace async

169
src/async/token_queue.h Normal file
View file

@ -0,0 +1,169 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Simple token queue.
#pragma once
//local headers
//third-party headers
//standard headers
#include <atomic>
#include <condition_variable>
#include <list>
#include <mutex>
//forward declarations
namespace async
{
enum class TokenQueueResult : unsigned char
{
SUCCESS,
QUEUE_EMPTY,
TRY_LOCK_FAIL,
SHUTTING_DOWN,
QUEUE_NOT_EMPTY
};
/// async token queue
template <typename TokenT>
class TokenQueue final
{
public:
//member functions
/// try to add an element to the top
template <typename T>
TokenQueueResult try_push(T &&new_element_in)
{
{
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return TokenQueueResult::TRY_LOCK_FAIL;
m_queue.emplace_back(std::forward<T>(new_element_in));
}
m_condvar.notify_one();
return TokenQueueResult::SUCCESS;
}
/// add an element to the top (always succeeds)
template <typename T>
void force_push(T &&new_element_in)
{
{
std::lock_guard<std::mutex> lock{m_mutex};
m_queue.emplace_back(std::forward<T>(new_element_in));
}
m_condvar.notify_one();
}
/// try to remove an element from the bottom
TokenQueueResult try_pop(TokenT &token_out)
{
// try to lock the queue, then check if there are any elements
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return TokenQueueResult::TRY_LOCK_FAIL;
if (m_queue.size() == 0)
return TokenQueueResult::QUEUE_EMPTY;
// pop the bottom element
token_out = std::move(m_queue.front());
m_queue.pop_front();
return TokenQueueResult::SUCCESS;
}
/// remove an element from the bottom (always succeeds)
TokenQueueResult force_pop(TokenT &token_out)
{
// lock and wait until the queue is not empty or the queue is shutting down
std::unique_lock<std::mutex> lock{m_mutex};
m_condvar.wait(lock, [this]() -> bool { return m_queue.size() > 0 || m_is_shutting_down; });
if (m_queue.size() == 0 && m_is_shutting_down)
return TokenQueueResult::SHUTTING_DOWN;
else if (m_queue.size() == 0)
return TokenQueueResult::QUEUE_EMPTY;
// pop the bottom element
token_out = std::move(m_queue.front());
m_queue.pop_front();
return TokenQueueResult::SUCCESS;
}
/// try to remove the minimum element
TokenQueueResult try_remove_min(TokenT &token_out)
{
// try to lock the queue, then check if there are any elements
std::unique_lock<std::mutex> lock{m_mutex, std::try_to_lock};
if (!lock.owns_lock())
return TokenQueueResult::TRY_LOCK_FAIL;
if (m_queue.size() == 0)
return TokenQueueResult::QUEUE_EMPTY;
// find the min element
auto min_elem = m_queue.begin();
for (auto it = m_queue.begin(); it != m_queue.end(); ++it)
{
if (*it < *min_elem)
min_elem = it;
}
token_out = std::move(*min_elem);
m_queue.erase(min_elem);
return TokenQueueResult::SUCCESS;
}
/// shut down the queue
void shut_down()
{
{
std::lock_guard<std::mutex> lock{m_mutex};
m_is_shutting_down = true;
}
m_condvar.notify_all();
}
/// reset the queue (queue must already be empty)
TokenQueueResult reset()
{
std::lock_guard<std::mutex> lock{m_mutex};
if (!m_queue.empty())
return TokenQueueResult::QUEUE_NOT_EMPTY;
m_is_shutting_down = false;
return TokenQueueResult::SUCCESS;
}
private:
//member variables
/// queue context
std::list<TokenT> m_queue;
std::mutex m_mutex;
std::condition_variable m_condvar;
bool m_is_shutting_down{false};
};
} //namespace async

View file

@ -0,0 +1,279 @@
// Copyright (c) 2022, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//paired header
#include "waiter_manager.h"
//local headers
//third party headers
//standard headers
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <functional>
#include <mutex>
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "async"
namespace async
{
//-------------------------------------------------------------------------------------------------------------------
// WaiterManager INTERNAL
//-------------------------------------------------------------------------------------------------------------------
std::uint16_t WaiterManager::clamp_waiter_index(const std::uint16_t nominal_index) noexcept
{
assert(m_num_managed_waiters > 0);
if (nominal_index >= m_num_managed_waiters)
return m_num_managed_waiters - 1;
return nominal_index;
}
//-------------------------------------------------------------------------------------------------------------------
// WaiterManager INTERNAL
// - note: the order of result checks is intentional based on their assumed importance to the caller
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::Result WaiterManager::wait_impl(std::mutex &mutex_inout,
std::condition_variable_any &condvar_inout,
std::atomic<std::int32_t> &counter_inout,
const std::function<bool()> &condition_checker_func,
const std::function<std::cv_status(std::condition_variable_any&, std::unique_lock<std::mutex>&)> &wait_func,
const WaiterManager::ShutdownPolicy shutdown_policy) noexcept
{
try
{
std::unique_lock<std::mutex> lock{mutex_inout};
// pre-wait checks
if (condition_checker_func)
{
try { if (condition_checker_func()) return Result::CONDITION_TRIGGERED; }
catch (...) { return Result::CONDITION_TRIGGERED; }
}
if (shutdown_policy == WaiterManager::ShutdownPolicy::EXIT_EARLY && this->is_shutting_down())
return Result::SHUTTING_DOWN;
// wait
// note: using a signed int for counters means underflow due to reordering of the decrement won't yield a value > 0
counter_inout.fetch_add(1, std::memory_order_relaxed);
const std::cv_status wait_status{wait_func(condvar_inout, lock)};
counter_inout.fetch_sub(1, std::memory_order_relaxed);
// post-wait checks
if (condition_checker_func)
{
try { if (condition_checker_func()) return Result::CONDITION_TRIGGERED; }
catch (...) { return Result::CONDITION_TRIGGERED; }
}
if (this->is_shutting_down()) return Result::SHUTTING_DOWN;
if (wait_status == std::cv_status::timeout) return Result::TIMEOUT;
return Result::DONE_WAITING;
}
catch (...) { return Result::CRITICAL_EXCEPTION; }
}
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::WaiterManager(const std::uint16_t num_managed_waiters) :
//we always want at least one waiter slot to avoid UB
m_num_managed_waiters{static_cast<uint16_t>(num_managed_waiters > 0 ? num_managed_waiters : 1)}
{
m_waiter_mutexes = std::vector<std::mutex>{m_num_managed_waiters};
m_conditional_waiters = std::vector<ConditionalWaiterContext>{m_num_managed_waiters};
}
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::Result WaiterManager::wait(const std::uint16_t waiter_index,
const WaiterManager::ShutdownPolicy shutdown_policy,
const WaitPriority wait_priority) noexcept
{
return this->wait_impl(m_waiter_mutexes[this->clamp_waiter_index(waiter_index)],
(wait_priority == WaitPriority::HIGH) ? m_primary_shared_cond_var : m_secondary_shared_cond_var,
(wait_priority == WaitPriority::HIGH) ? m_num_primary_waiters : m_num_secondary_waiters,
nullptr,
[](std::condition_variable_any &cv_inout, std::unique_lock<std::mutex> &lock) -> std::cv_status
{
cv_inout.wait(lock);
return std::cv_status::no_timeout;
},
shutdown_policy
);
}
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::Result WaiterManager::wait_for(const std::uint16_t waiter_index,
const std::chrono::nanoseconds &duration,
const WaiterManager::ShutdownPolicy shutdown_policy,
const WaitPriority wait_priority) noexcept
{
return this->wait_impl(m_waiter_mutexes[this->clamp_waiter_index(waiter_index)],
(wait_priority == WaitPriority::HIGH) ? m_primary_shared_cond_var : m_secondary_shared_cond_var,
(wait_priority == WaitPriority::HIGH) ? m_num_primary_waiters : m_num_secondary_waiters,
nullptr,
[&duration](std::condition_variable_any &cv_inout, std::unique_lock<std::mutex> &lock_inout) -> std::cv_status
{
return cv_inout.wait_for(lock_inout, duration);
},
shutdown_policy
);
}
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::Result WaiterManager::wait_until(const std::uint16_t waiter_index,
const std::chrono::time_point<std::chrono::steady_clock> &timepoint,
const WaiterManager::ShutdownPolicy shutdown_policy,
const WaitPriority wait_priority) noexcept
{
return this->wait_impl(m_waiter_mutexes[this->clamp_waiter_index(waiter_index)],
(wait_priority == WaitPriority::HIGH) ? m_primary_shared_cond_var : m_secondary_shared_cond_var,
(wait_priority == WaitPriority::HIGH) ? m_num_primary_waiters : m_num_secondary_waiters,
nullptr,
[&timepoint](std::condition_variable_any &cv_inout, std::unique_lock<std::mutex> &lock_inout) -> std::cv_status
{
return cv_inout.wait_until(lock_inout, timepoint);
},
shutdown_policy
);
}
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::Result WaiterManager::conditional_wait(const std::uint16_t waiter_index,
const std::function<bool()> &condition_checker_func,
const WaiterManager::ShutdownPolicy shutdown_policy) noexcept
{
const std::uint16_t clamped_waiter_index{this->clamp_waiter_index(waiter_index)};
return this->wait_impl(m_waiter_mutexes[clamped_waiter_index],
m_conditional_waiters[clamped_waiter_index].cond_var,
m_conditional_waiters[clamped_waiter_index].num_waiting,
condition_checker_func,
[](std::condition_variable_any &cv_inout, std::unique_lock<std::mutex> &lock_inout) -> std::cv_status
{
cv_inout.wait(lock_inout);
return std::cv_status::no_timeout;
},
shutdown_policy
);
}
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::Result WaiterManager::conditional_wait_for(const std::uint16_t waiter_index,
const std::function<bool()> &condition_checker_func,
const std::chrono::nanoseconds &duration,
const WaiterManager::ShutdownPolicy shutdown_policy) noexcept
{
const std::uint16_t clamped_waiter_index{this->clamp_waiter_index(waiter_index)};
return this->wait_impl(m_waiter_mutexes[clamped_waiter_index],
m_conditional_waiters[clamped_waiter_index].cond_var,
m_conditional_waiters[clamped_waiter_index].num_waiting,
condition_checker_func,
[&duration](std::condition_variable_any &cv_inout, std::unique_lock<std::mutex> &lock_inout) -> std::cv_status
{
return cv_inout.wait_for(lock_inout, duration);
},
shutdown_policy
);
}
//-------------------------------------------------------------------------------------------------------------------
WaiterManager::Result WaiterManager::conditional_wait_until(const std::uint16_t waiter_index,
const std::function<bool()> &condition_checker_func,
const std::chrono::time_point<std::chrono::steady_clock> &timepoint,
const WaiterManager::ShutdownPolicy shutdown_policy) noexcept
{
const std::uint16_t clamped_waiter_index{this->clamp_waiter_index(waiter_index)};
return this->wait_impl(m_waiter_mutexes[clamped_waiter_index],
m_conditional_waiters[clamped_waiter_index].cond_var,
m_conditional_waiters[clamped_waiter_index].num_waiting,
condition_checker_func,
[&timepoint](std::condition_variable_any &cv_inout, std::unique_lock<std::mutex> &lock_inout)
-> std::cv_status
{
return cv_inout.wait_until(lock_inout, timepoint);
},
shutdown_policy
);
}
//-------------------------------------------------------------------------------------------------------------------
void WaiterManager::notify_one() noexcept
{
// try to notify a normal waiter
if (m_num_primary_waiters.load(std::memory_order_relaxed) > 0)
{
m_primary_shared_cond_var.notify_one();
return;
}
// try to notify a sleepy waiter
if (m_num_secondary_waiters.load(std::memory_order_relaxed) > 0)
{
m_secondary_shared_cond_var.notify_one();
return;
}
// find a conditional waiter to notify
for (ConditionalWaiterContext &conditional_waiter : m_conditional_waiters)
{
if (conditional_waiter.num_waiting.load(std::memory_order_relaxed) > 0)
{
conditional_waiter.cond_var.notify_one();
break;
}
}
}
//-------------------------------------------------------------------------------------------------------------------
void WaiterManager::notify_all() noexcept
{
m_primary_shared_cond_var.notify_all();
m_secondary_shared_cond_var.notify_all();
for (ConditionalWaiterContext &conditional_waiter : m_conditional_waiters)
conditional_waiter.cond_var.notify_all();
}
//-------------------------------------------------------------------------------------------------------------------
void WaiterManager::notify_conditional_waiter(const std::uint16_t waiter_index,
std::function<void()> condition_setter_func) noexcept
{
const std::uint16_t clamped_waiter_index{this->clamp_waiter_index(waiter_index)};
if (condition_setter_func) try { condition_setter_func(); } catch (...) {}
// tap the mutex here to synchronize with conditional waiters
{ const std::lock_guard<std::mutex> lock{m_waiter_mutexes[clamped_waiter_index]}; };
// notify all because if there are multiple threads waiting on this index (not recommended, but possible),
// we don't know which one actually cares about this condition function
m_conditional_waiters[clamped_waiter_index].cond_var.notify_all();
}
//-------------------------------------------------------------------------------------------------------------------
void WaiterManager::shut_down() noexcept
{
// shut down
m_shutting_down.store(true, std::memory_order_relaxed);
// tap all the mutexes to synchronize with waiters
for (std::mutex &mutex : m_waiter_mutexes)
try { const std::lock_guard<std::mutex> lock{mutex}; } catch (...) {}
// notify all waiters
this->notify_all();
}
//-------------------------------------------------------------------------------------------------------------------
} //namespace async

169
src/async/waiter_manager.h Normal file
View file

@ -0,0 +1,169 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
/// Coordinates async wait/notify for a threadpool.
#pragma once
//local headers
//third-party headers
//standard headers
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <vector>
//forward declarations
namespace async
{
/// WaiterManager
/// - performance will decrease significantly if multiple threads try to claim the same waiter index
/// - notify_one() prioritizes: primary waiters > secondary waiters > conditional waiters
/// - this function has several race conditions that can mean no worker gets notified even if there are several actually
/// waiting (these are non-critical race conditions that marginally reduce throughput under low to moderate load)
/// - there is also a race condition where a conditional waiter gets notified but ends up detecting its condition was
/// triggered, implying it will go down some custom upstream control path instead of the normal path that
/// 'notify_one()' is aimed at (e.g. 'go find a task to work on'); (this marginally reduces throughput under moderate
/// to high load)
/// - conditional waiting is designed so a conditional waiter will never wait after its condition is set if a conditional
/// notify is used to set the condition
/// - COST: the condition setting/checking is protected by a unique lock, so any real system WILL waste time fighting
/// over those locks (to maximize throughput, consider using large task graphs to avoid manual joining and other
/// mechanisms that use conditional waits)
/// - 'shutting down' the manager means
/// A) existing waiters will all be woken up
/// B) future waiters using 'ShutdownPolicy::EXIT_EARLY' will simply exit without waiting
class WaiterManager final
{
public:
enum class ShutdownPolicy : unsigned char
{
WAIT,
EXIT_EARLY
};
enum class WaitPriority : unsigned char
{
HIGH,
LOW
};
enum class Result : unsigned char
{
CONDITION_TRIGGERED,
SHUTTING_DOWN,
TIMEOUT,
DONE_WAITING,
CRITICAL_EXCEPTION
};
private:
struct ConditionalWaiterContext final
{
std::atomic<std::int32_t> num_waiting;
std::condition_variable_any cond_var;
};
std::uint16_t clamp_waiter_index(const std::uint16_t nominal_index) noexcept;
/// wait
Result wait_impl(std::mutex &mutex_inout,
std::condition_variable_any &condvar_inout,
std::atomic<std::int32_t> &counter_inout,
const std::function<bool()> &condition_checker_func,
const std::function<std::cv_status(std::condition_variable_any&, std::unique_lock<std::mutex>&)> &wait_func,
const ShutdownPolicy shutdown_policy) noexcept;
public:
//constructors
WaiterManager(const std::uint16_t num_managed_waiters);
//overloaded operators
/// disable copy/moves so references to this object can't be invalidated until this object's lifetime ends
WaiterManager& operator=(WaiterManager&&) = delete;
//member functions
Result wait(const std::uint16_t waiter_index,
const ShutdownPolicy shutdown_policy,
const WaitPriority wait_priority) noexcept;
Result wait_for(const std::uint16_t waiter_index,
const std::chrono::nanoseconds &duration,
const ShutdownPolicy shutdown_policy,
const WaitPriority wait_priority) noexcept;
Result wait_until(const std::uint16_t waiter_index,
const std::chrono::time_point<std::chrono::steady_clock> &timepoint,
const ShutdownPolicy shutdown_policy,
const WaitPriority wait_priority) noexcept;
Result conditional_wait(const std::uint16_t waiter_index,
const std::function<bool()> &condition_checker_func,
const ShutdownPolicy shutdown_policy) noexcept;
Result conditional_wait_for(const std::uint16_t waiter_index,
const std::function<bool()> &condition_checker_func,
const std::chrono::nanoseconds &duration,
const ShutdownPolicy shutdown_policy) noexcept;
Result conditional_wait_until(const std::uint16_t waiter_index,
const std::function<bool()> &condition_checker_func,
const std::chrono::time_point<std::chrono::steady_clock> &timepoint,
const ShutdownPolicy shutdown_policy) noexcept;
void notify_one() noexcept;
void notify_all() noexcept;
void notify_conditional_waiter(const std::uint16_t waiter_index,
std::function<void()> condition_setter_func) noexcept;
void shut_down() noexcept;
bool is_shutting_down() const noexcept { return m_shutting_down.load(std::memory_order_relaxed); }
private:
//member variables
/// config
const std::uint16_t m_num_managed_waiters;
/// manager status
std::atomic<std::int32_t> m_num_primary_waiters{0};
std::atomic<std::int32_t> m_num_secondary_waiters{0};
std::atomic<bool> m_shutting_down{false};
/// synchronization
std::vector<std::mutex> m_waiter_mutexes;
std::condition_variable_any m_primary_shared_cond_var;
std::condition_variable_any m_secondary_shared_cond_var;
/// conditional waiters
std::vector<ConditionalWaiterContext> m_conditional_waiters;
};
} //namespace async

View file

@ -30,6 +30,7 @@ set(unit_tests_sources
account.cpp
apply_permutation.cpp
address_from_url.cpp
async.cpp
base58.cpp
blockchain_db.cpp
block_queue.cpp
@ -113,6 +114,7 @@ monero_add_minimal_executable(unit_tests
${unit_tests_headers})
target_link_libraries(unit_tests
PRIVATE
async
ringct
cryptonote_protocol
cryptonote_core

220
tests/unit_tests/async.cpp Normal file
View file

@ -0,0 +1,220 @@
// Copyright (c) 2023, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "async/task_types.h"
#include "async/threadpool.h"
#include <gtest/gtest.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <thread>
//-------------------------------------------------------------------------------------------------------------------
TEST(async, hello_world)
{
async::Threadpool pool{1, 0, 40, std::chrono::seconds{1}};
pool.submit(async::make_simple_task(0,
[]() -> async::TaskVariant
{
std::cout << "hello, world!\n";
return boost::none;
}
));
}
//-------------------------------------------------------------------------------------------------------------------
TEST(async, basic_join)
{
async::Threadpool pool{1, 0, 40, std::chrono::seconds{1}};
// 1. make join signal
auto join_signal = pool.make_join_signal();
// 2. get join token
auto join_token = pool.get_join_token(join_signal);
// 3. submit tasks to join on
pool.submit(async::make_simple_task(0,
[join_token]() -> async::TaskVariant
{
std::cout << "A\n";
return boost::none;
}
));
pool.submit(async::make_simple_task(0,
[join_token]() -> async::TaskVariant
{
std::cout << "B\n";
return boost::none;
}
));
// 4. get join condition
auto join_condition = pool.get_join_condition(std::move(join_signal), std::move(join_token));
// 5. join the tasks
pool.work_while_waiting(std::move(join_condition));
std::cout << "joining done!\n";
}
//-------------------------------------------------------------------------------------------------------------------
TEST(async, basic_fanout)
{
async::Threadpool pool{1, 0, 40, std::chrono::seconds{1}};
// launch task in the middle of a fanout
{
async::fanout_token_t fanout_token{pool.launch_temporary_worker()};
pool.submit(async::make_simple_task(0,
[]() -> async::TaskVariant
{
std::cout << "A\n";
return boost::none;
}
));
std::this_thread::sleep_for(std::chrono::milliseconds{500});
}
std::cout << "fanout closed!\n";
}
//-------------------------------------------------------------------------------------------------------------------
TEST(async, basic_multithreaded)
{
async::Threadpool pool{1, 2, 40, std::chrono::seconds{1}};
// 1. submit tasks
pool.submit(async::make_simple_task(0,
[]() -> async::TaskVariant
{
std::cout << "A\n";
return boost::none;
}
));
pool.submit(async::make_simple_task(0,
[]() -> async::TaskVariant
{
std::cout << "B\n";
return boost::none;
}
));
pool.submit(async::make_simple_task(0,
[]() -> async::TaskVariant
{
std::cout << "C\n";
return boost::none;
}
));
// 2. sleep the main thread
std::this_thread::sleep_for(std::chrono::milliseconds{500});
// 3. main thread marker
std::cout << "tasks submitted\n";
}
//-------------------------------------------------------------------------------------------------------------------
TEST(async, multithreaded_only_wait_for_first)
{
async::Threadpool &pool{async::get_default_threadpool()};
// 1. make join signals
auto join_signal_a = pool.make_join_signal();
auto join_signal_b = pool.make_join_signal();
auto join_signal_c = pool.make_join_signal();
// 2. get join tokens
auto join_token_a = pool.get_join_token(join_signal_a);
auto join_token_b = pool.get_join_token(join_signal_b);
auto join_token_c = pool.get_join_token(join_signal_c);
bool first_complete = false;
bool a_complete = false, b_complete = false, c_complete = false;
// 3. submit tasks
pool.submit(async::make_simple_task(0,
[&first_complete, &a_complete, l_join_token = join_token_a]() mutable -> async::TaskVariant
{
std::cout << std::this_thread::get_id() <<": A\n";
std::this_thread::sleep_for(std::chrono::seconds{5});
std::cout << "A\n";
a_complete = true;
first_complete = true;
l_join_token = nullptr;
return boost::none;
}
));
pool.submit(async::make_simple_task(0,
[&first_complete, &b_complete, l_join_token = join_token_b]() mutable -> async::TaskVariant
{
std::cout << std::this_thread::get_id() <<": B\n";
std::this_thread::sleep_for(std::chrono::seconds{5});
std::cout << "B\n";
b_complete = true;
first_complete = true;
l_join_token = nullptr;
return boost::none;
}
));
pool.submit(async::make_simple_task(0,
[&first_complete, &c_complete, l_join_token = join_token_c]() mutable -> async::TaskVariant
{
std::cout << std::this_thread::get_id() <<": C\n";
std::this_thread::sleep_for(std::chrono::milliseconds{100});
std::cout << "C\n";
c_complete = true;
first_complete = true;
l_join_token = nullptr;
return boost::none;
}
));
// 4. get join conditions
auto join_condition_a = pool.get_join_condition(std::move(join_signal_a), std::move(join_token_a));
auto join_condition_b = pool.get_join_condition(std::move(join_signal_b), std::move(join_token_b));
auto join_condition_c = pool.get_join_condition(std::move(join_signal_c), std::move(join_token_c));
auto check_any = [&join_condition_a, &join_condition_b, &join_condition_c]
{
return join_condition_a() || join_condition_b() || join_condition_c();
};
// 5. join the tasks
std::this_thread::sleep_for(std::chrono::milliseconds{100});
pool.work_while_waiting(std::move(check_any));
ASSERT_TRUE(first_complete);
ASSERT_TRUE(c_complete);
ASSERT_FALSE(a_complete);
ASSERT_FALSE(b_complete);
}
//-------------------------------------------------------------------------------------------------------------------