Merge pull request #2446

6d0ca7d1 Tweak concurrency limits (Howard Chu)
510d0d47 Use a threadpool (Howard Chu)
This commit is contained in:
Riccardo Spagni 2017-09-18 13:19:26 +02:00
commit 1a73843cec
No known key found for this signature in database
GPG key ID: 55432DF31CCD4FCD
15 changed files with 306 additions and 963 deletions

View file

@ -37,8 +37,7 @@ set(common_sources
i18n.cpp i18n.cpp
password.cpp password.cpp
perf_timer.cpp perf_timer.cpp
task_region.cpp threadpool.cpp
thread_group.cpp
updates.cpp) updates.cpp)
if (STACK_TRACE) if (STACK_TRACE)
@ -66,8 +65,7 @@ set(common_private_headers
password.h password.h
perf_timer.h perf_timer.h
stack_trace.h stack_trace.h
task_region.h threadpool.h
thread_group.h
updates.h) updates.h)
monero_private_headers(common monero_private_headers(common

View file

@ -36,6 +36,5 @@ namespace tools
struct login; struct login;
class password_container; class password_container;
class t_http_connection; class t_http_connection;
class task_region; class threadpool;
class thread_group;
} }

View file

@ -1,94 +0,0 @@
// Copyright (c) 2014-2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "common/task_region.h"
#include <boost/thread/locks.hpp>
#include <cassert>
/* `mark_completed` and `wait` can throw in the lock call, but its difficult to
recover from either. An exception in `wait` means the post condition of joining
all threads cannot be achieved, and an exception in `mark_completed` means
certain deadlock. `noexcept` qualifier will force a call to `std::terminate` if
locking throws an exception, which should only happen if a recursive lock
attempt is made (which is not possible since no external function is called
while holding the lock). */
namespace tools
{
void task_region_handle::state::mark_completed(id task_id) noexcept {
assert(task_id != 0 && (task_id & (task_id - 1)) == 0); // power of 2 check
if (pending.fetch_and(~task_id) == task_id) {
// synchronize with wait call, but do not need to hold
boost::unique_lock<boost::mutex>{sync_on_complete};
all_complete.notify_all();
}
}
void task_region_handle::state::abort() noexcept {
state* current = this;
while (current) {
current->ready = 0;
current = current->next.get();
}
}
void task_region_handle::state::wait() noexcept {
state* current = this;
while (current) {
{
boost::unique_lock<boost::mutex> lock{current->sync_on_complete};
current->all_complete.wait(lock, [current] { return current->pending == 0; });
}
current = current->next.get();
}
}
void task_region_handle::state::wait(thread_group& threads) noexcept {
state* current = this;
while (current) {
while (current->pending != 0) {
if (!threads.try_run_one()) {
current->wait();
return;
}
}
current = current->next.get();
}
}
void task_region_handle::create_state() {
st = std::make_shared<state>(std::move(st));
next_id = 1;
}
void task_region_handle::do_wait() noexcept {
assert(st);
const std::shared_ptr<state> temp = std::move(st);
temp->wait(threads);
}
}

View file

@ -1,223 +0,0 @@
// Copyright (c) 2014-2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <atomic>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <memory>
#include <type_traits>
#include <utility>
#include "common/thread_group.h"
namespace tools
{
/*! A model of the fork-join concept. `run(...)` "forks" (i.e. spawns new
tasks), and `~task_region_handle()` or `wait()` "joins" the spawned tasks.
`wait` will block until all tasks have completed, while `~task_region_handle()`
blocks until all tasks have completed or aborted.
Do _NOT_ give this object to separate thread of execution (which includes
`task_region_handle::run(...)`) because joining on a different thread is
undesireable (potential deadlock).
This class cannot be constructed directly, use the function
`task_region(...)` instead.
*/
class task_region_handle
{
struct state
{
using id = unsigned;
explicit state(std::shared_ptr<state> next_src) noexcept
: next(std::move(next_src))
, ready(0)
, pending(0)
, sync_on_complete()
, all_complete() {
}
state(const state&) = default;
state(state&&) = default;
~state() = default;
state& operator=(const state&) = default;
state& operator=(state&&) = default;
void track_id(id task_id) noexcept {
pending |= task_id;
ready |= task_id;
}
//! \return True only once whether a given id can execute
bool can_run(id task_id) noexcept {
return (ready.fetch_and(~task_id) & task_id);
}
//! Mark id as completed, and synchronize with waiting threads
void mark_completed(id task_id) noexcept;
//! Tell all unstarted functions in region to return immediately
void abort() noexcept;
//! Blocks until all functions in region have aborted or completed.
void wait() noexcept;
//! Same as `wait()`, except `this_thread` runs tasks while waiting.
void wait(thread_group& threads) noexcept;
private:
/* This implementation is a bit pessimistic, it ensures that all copies
of a wrapped task can only be executed once. `thread_group` should never
do this, but some variable needs to track whether an abort should be done
anyway... */
std::shared_ptr<state> next;
std::atomic<id> ready; //!< Tracks whether a task has been invoked
std::atomic<id> pending; //!< Tracks when a task has completed or aborted
boost::mutex sync_on_complete;
boost::condition_variable all_complete;
};
template<typename F>
struct wrapper
{
wrapper(state::id id_src, std::shared_ptr<state> st_src, F f_src)
: task_id(id_src), st(std::move(st_src)), f(std::move(f_src)) {
}
wrapper(const wrapper&) = default;
wrapper(wrapper&&) = default;
wrapper& operator=(const wrapper&) = default;
wrapper& operator=(wrapper&&) = default;
void operator()() {
if (st) {
if (st->can_run(task_id)) {
f();
}
st->mark_completed(task_id);
}
}
private:
const state::id task_id;
std::shared_ptr<state> st;
F f;
};
public:
friend struct task_region_;
task_region_handle() = delete;
task_region_handle(const task_region_handle&) = delete;
task_region_handle(task_region_handle&&) = delete;
//! Cancels unstarted pending tasks, and waits for them to respond.
~task_region_handle() noexcept {
if (st) {
st->abort();
st->wait(threads);
}
}
task_region_handle& operator=(const task_region_handle&) = delete;
task_region_handle& operator=(task_region_handle&&) = delete;
/*! If the group has no threads, `f` is immediately run before returning.
Otherwise, `f` is dispatched to the thread_group associated with `this`
region. If `f` is dispatched to another thread, and it throws, the process
will immediately terminate. See std::packaged_task for getting exceptions on
functions executed on other threads. */
template<typename F>
void run(F&& f) {
if (threads.count() == 0) {
f();
} else {
if (!st || next_id == 0) {
create_state();
}
const state::id this_id = next_id;
next_id <<= 1;
st->track_id(this_id);
threads.dispatch(wrapper<F>{this_id, st, std::move(f)});
}
}
//! Wait until all functions provided to `run` have completed.
void wait() noexcept {
if (st) {
do_wait();
}
}
private:
explicit task_region_handle(thread_group& threads_src)
: st(nullptr), threads(threads_src), next_id(0) {
}
void create_state();
void do_wait() noexcept;
std::shared_ptr<state> st;
thread_group& threads;
state::id next_id;
};
/*! Function for creating a `task_region_handle`, which automatically calls
`task_region_handle::wait()` before returning. If a `thread_group` is not
provided, one is created with an optimal number of threads. The callback `f`
must have the signature `void(task_region_handle&)`. */
struct task_region_ {
template<typename F>
void operator()(thread_group& threads, F&& f) const {
static_assert(
std::is_same<void, typename std::result_of<F(task_region_handle&)>::type>::value,
"f cannot have a return value"
);
task_region_handle region{threads};
f(region);
region.wait();
}
template<typename F>
void operator()(thread_group&& threads, F&& f) const {
(*this)(threads, std::forward<F>(f));
}
template<typename F>
void operator()(F&& f) const {
thread_group threads;
(*this)(threads, std::forward<F>(f));
}
};
constexpr const task_region_ task_region{};
}

View file

@ -1,153 +0,0 @@
// Copyright (c) 2014-2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "common/thread_group.h"
#include <boost/thread/locks.hpp>
#include <cassert>
#include <limits>
#include <stdexcept>
#include "cryptonote_config.h"
#include "common/util.h"
namespace tools
{
std::size_t thread_group::optimal() {
static_assert(
std::numeric_limits<unsigned>::max() <= std::numeric_limits<std::size_t>::max(),
"unexpected truncation"
);
const std::size_t hardware = get_max_concurrency();
return hardware ? (hardware - 1) : 0;
}
std::size_t thread_group::optimal_with_max(std::size_t count) {
return count ? std::min(count - 1, optimal()) : 0;
}
thread_group::thread_group(std::size_t count) : internal() {
if (count) {
internal.emplace(count);
}
}
thread_group::data::data(std::size_t count)
: threads()
, head{nullptr}
, last(std::addressof(head))
, mutex()
, has_work()
, stop(false) {
threads.reserve(count);
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
while (count--) {
threads.push_back(boost::thread(attrs, boost::bind(&thread_group::data::run, this)));
}
}
thread_group::data::~data() noexcept {
{
const boost::unique_lock<boost::mutex> lock(mutex);
stop = true;
}
has_work.notify_all();
for (auto& worker : threads) {
try {
worker.join();
}
catch(...) {}
}
}
std::unique_ptr<thread_group::data::work> thread_group::data::get_next() noexcept {
std::unique_ptr<work> rc = std::move(head.ptr);
if (rc != nullptr) {
head.ptr = std::move(rc->next.ptr);
if (head.ptr == nullptr) {
last = std::addressof(head);
}
}
return rc;
}
bool thread_group::data::try_run_one() noexcept {
/* This function and `run()` can both throw when acquiring the lock, or in
dispatched function. It is tough to recover from either, particularly the
lock case. These functions are marked as noexcept so that if either call
throws, the entire process is terminated. Users of the `dispatch` call are
expected to make their functions noexcept, or use std::packaged_task to copy
exceptions so that the process will continue in all but the most pessimistic
cases (std::bad_alloc). This was the existing behavior;
`asio::io_service::run` propogates errors from dispatched calls, and uncaught
exceptions on threads result in process termination. */
std::unique_ptr<work> next = nullptr;
{
const boost::unique_lock<boost::mutex> lock(mutex);
next = get_next();
}
if (next) {
assert(next->f);
next->f();
return true;
}
return false;
}
void thread_group::data::run() noexcept {
// see `try_run_one()` source for additional information
while (true) {
std::unique_ptr<work> next = nullptr;
{
boost::unique_lock<boost::mutex> lock(mutex);
has_work.wait(lock, [this] { return head.ptr != nullptr || stop; });
if (stop) {
return;
}
next = get_next();
}
assert(next != nullptr);
assert(next->f);
next->f();
}
}
void thread_group::data::dispatch(std::function<void()> f) {
std::unique_ptr<work> latest(new work{std::move(f), node{nullptr}});
node* const latest_node = std::addressof(latest->next);
{
const boost::unique_lock<boost::mutex> lock(mutex);
assert(last != nullptr);
assert(last->ptr == nullptr);
last->ptr = std::move(latest);
last = latest_node;
}
has_work.notify_one();
}
}

View file

@ -1,143 +0,0 @@
// Copyright (c) 2014-2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/optional/optional.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <cstddef>
#include <functional>
#include <thread>
#include <utility>
#include <vector>
namespace tools
{
//! Manages zero or more threads for work dispatching.
class thread_group
{
public:
//! \return `get_max_concurrency() ? get_max_concurrency() - 1 : 0`
static std::size_t optimal();
//! \return `count ? min(count - 1, optimal()) : 0`
static std::size_t optimal_with_max(std::size_t count);
//! Create an optimal number of threads.
explicit thread_group() : thread_group(optimal()) {}
//! Create exactly `count` threads.
explicit thread_group(std::size_t count);
thread_group(thread_group const&) = delete;
thread_group(thread_group&&) = delete;
//! Joins threads, but does not necessarily run all dispatched functions.
~thread_group() = default;
thread_group& operator=(thread_group const&) = delete;
thread_group& operator=(thread_group&&) = delete;
//! \return Number of threads owned by `this` group.
std::size_t count() const noexcept {
if (internal) {
return internal->count();
}
return 0;
}
//! \return True iff a function was available and executed (on `this_thread`).
bool try_run_one() noexcept {
if (internal) {
return internal->try_run_one();
}
return false;
}
/*! `f` is invoked immediately if `count() == 0`, otherwise execution of `f`
is queued for next available thread. If `f` is queued, any exception leaving
that function will result in process termination. Use std::packaged_task if
exceptions need to be handled. */
template<typename F>
void dispatch(F&& f) {
if (internal) {
internal->dispatch(std::forward<F>(f));
}
else {
f();
}
}
private:
class data {
public:
data(std::size_t count);
~data() noexcept;
std::size_t count() const noexcept {
return threads.size();
}
bool try_run_one() noexcept;
void dispatch(std::function<void()> f);
private:
struct work;
struct node {
std::unique_ptr<work> ptr;
};
struct work {
std::function<void()> f;
node next;
};
//! Requires lock on `mutex`.
std::unique_ptr<work> get_next() noexcept;
//! Blocks until destructor is invoked, only call from thread.
void run() noexcept;
private:
std::vector<boost::thread> threads;
node head;
node* last;
boost::condition_variable has_work;
boost::mutex mutex;
bool stop;
};
private:
// optionally construct elements, without separate heap allocation
boost::optional<data> internal;
};
}

117
src/common/threadpool.cpp Normal file
View file

@ -0,0 +1,117 @@
// Copyright (c) 2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "common/threadpool.h"
#include <cassert>
#include <limits>
#include <stdexcept>
#include "cryptonote_config.h"
#include "common/util.h"
namespace tools
{
threadpool::threadpool() : running(true), active(0) {
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
max = tools::get_max_concurrency() * 2;
size_t i = max;
while(i--) {
threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this)));
}
}
threadpool::~threadpool() {
{
const boost::unique_lock<boost::mutex> lock(mutex);
running = false;
has_work.notify_all();
}
for (size_t i = 0; i<threads.size(); i++) {
threads[i].join();
}
}
void threadpool::submit(waiter *obj, std::function<void()> f) {
entry e = {obj, f};
boost::unique_lock<boost::mutex> lock(mutex);
if (active == max && !queue.empty()) {
// if all available threads are already running
// and there's work waiting, just run in current thread
lock.unlock();
f();
} else {
if (obj)
obj->inc();
queue.push_back(e);
has_work.notify_one();
}
}
int threadpool::get_max_concurrency() {
return max / 2;
}
void threadpool::waiter::wait() {
boost::unique_lock<boost::mutex> lock(mt);
while(num) cv.wait(lock);
}
void threadpool::waiter::inc() {
const boost::unique_lock<boost::mutex> lock(mt);
num++;
}
void threadpool::waiter::dec() {
const boost::unique_lock<boost::mutex> lock(mt);
num--;
if (!num)
cv.notify_one();
}
void threadpool::run() {
boost::unique_lock<boost::mutex> lock(mutex);
while (running) {
entry e;
while(queue.empty() && running)
has_work.wait(lock);
if (!running) break;
active++;
e = queue.front();
queue.pop_front();
lock.unlock();
e.f();
if (e.wo)
e.wo->dec();
lock.lock();
active--;
}
}
}

87
src/common/threadpool.h Normal file
View file

@ -0,0 +1,87 @@
// Copyright (c) 2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <cstddef>
#include <functional>
#include <utility>
#include <vector>
namespace tools
{
//! A global thread pool
class threadpool
{
public:
static threadpool& getInstance() {
static threadpool instance;
return instance;
}
// The waiter lets the caller know when all of its
// tasks are completed.
class waiter {
boost::mutex mt;
boost::condition_variable cv;
int num;
public:
void inc();
void dec();
void wait(); //! Wait for a set of tasks to finish.
waiter() : num(0){}
~waiter() { wait(); }
};
// Submit a task to the pool. The waiter pointer may be
// NULL if the caller doesn't care to wait for the
// task to finish.
void submit(waiter *waiter, std::function<void()> f);
int get_max_concurrency();
private:
threadpool();
~threadpool();
typedef struct entry {
waiter *wo;
std::function<void()> f;
} entry;
std::deque<entry> queue;
boost::condition_variable has_work;
boost::mutex mutex;
std::vector<boost::thread> threads;
int active;
int max;
bool running;
void run();
};
}

View file

@ -45,6 +45,7 @@
#include "profile_tools.h" #include "profile_tools.h"
#include "file_io_utils.h" #include "file_io_utils.h"
#include "common/int-util.h" #include "common/int-util.h"
#include "common/threadpool.h"
#include "common/boost_serialization_helper.h" #include "common/boost_serialization_helper.h"
#include "warnings.h" #include "warnings.h"
#include "crypto/hash.h" #include "crypto/hash.h"
@ -2563,33 +2564,9 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
std::vector < uint64_t > results; std::vector < uint64_t > results;
results.resize(tx.vin.size(), 0); results.resize(tx.vin.size(), 0);
int threads = tools::get_max_concurrency(); tools::threadpool& tpool = tools::threadpool::getInstance();
tools::threadpool::waiter waiter;
boost::asio::io_service ioservice; int threads = tpool.get_max_concurrency();
boost::thread_group threadpool;
bool ioservice_active = false;
std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
if(threads > 1)
{
for (int i = 0; i < threads; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
ioservice_active = true;
}
#define KILL_IOSERVICE() \
if(ioservice_active) \
{ \
work.reset(); \
while (!ioservice.stopped()) ioservice.poll(); \
threadpool.join_all(); \
ioservice.stop(); \
ioservice_active = false; \
}
epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); });
for (const auto& txin : tx.vin) for (const auto& txin : tx.vin)
{ {
@ -2650,7 +2627,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
{ {
// ND: Speedup // ND: Speedup
// 1. Thread ring signature verification if possible. // 1. Thread ring signature verification if possible.
ioservice.dispatch(boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index]))); tpool.submit(&waiter, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])));
} }
else else
{ {
@ -2673,8 +2650,8 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
sig_index++; sig_index++;
} }
if (tx.version == 1 && threads > 1)
KILL_IOSERVICE(); waiter.wait();
if (tx.version == 1) if (tx.version == 1)
{ {
@ -3749,7 +3726,8 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
return true; return true;
bool blocks_exist = false; bool blocks_exist = false;
uint64_t threads = tools::get_max_concurrency(); tools::threadpool& tpool = tools::threadpool::getInstance();
uint64_t threads = tpool.get_max_concurrency();
if (blocks_entry.size() > 1 && threads > 1 && m_max_prepare_blocks_threads > 1) if (blocks_entry.size() > 1 && threads > 1 && m_max_prepare_blocks_threads > 1)
{ {
@ -3758,15 +3736,12 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
threads = m_max_prepare_blocks_threads; threads = m_max_prepare_blocks_threads;
uint64_t height = m_db->height(); uint64_t height = m_db->height();
std::vector<boost::thread *> thread_list;
int batches = blocks_entry.size() / threads; int batches = blocks_entry.size() / threads;
int extra = blocks_entry.size() % threads; int extra = blocks_entry.size() % threads;
MDEBUG("block_batches: " << batches); MDEBUG("block_batches: " << batches);
std::vector<std::unordered_map<crypto::hash, crypto::hash>> maps(threads); std::vector<std::unordered_map<crypto::hash, crypto::hash>> maps(threads);
std::vector < std::vector < block >> blocks(threads); std::vector < std::vector < block >> blocks(threads);
auto it = blocks_entry.begin(); auto it = blocks_entry.begin();
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
for (uint64_t i = 0; i < threads; i++) for (uint64_t i = 0; i < threads; i++)
{ {
@ -3825,19 +3800,14 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
{ {
m_blocks_longhash_table.clear(); m_blocks_longhash_table.clear();
uint64_t thread_height = height; uint64_t thread_height = height;
tools::threadpool::waiter waiter;
for (uint64_t i = 0; i < threads; i++) for (uint64_t i = 0; i < threads; i++)
{ {
thread_list.push_back(new boost::thread(attrs, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i])))); tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i])));
thread_height += blocks[i].size(); thread_height += blocks[i].size();
} }
for (size_t j = 0; j < thread_list.size(); j++) waiter.wait();
{
thread_list[j]->join();
delete thread_list[j];
}
thread_list.clear();
if (m_cancel) if (m_cancel)
return false; return false;
@ -3961,30 +3931,20 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
// [output] stores all transactions for each tx_out_index::hash found // [output] stores all transactions for each tx_out_index::hash found
std::vector<std::unordered_map<crypto::hash, cryptonote::transaction>> transactions(amounts.size()); std::vector<std::unordered_map<crypto::hash, cryptonote::transaction>> transactions(amounts.size());
threads = tools::get_max_concurrency(); threads = tpool.get_max_concurrency();
if (!m_db->can_thread_bulk_indices()) if (!m_db->can_thread_bulk_indices())
threads = 1; threads = 1;
if (threads > 1) if (threads > 1)
{ {
boost::asio::io_service ioservice; tools::threadpool::waiter waiter;
boost::thread_group threadpool;
std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
for (uint64_t i = 0; i < threads; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
for (size_t i = 0; i < amounts.size(); i++) for (size_t i = 0; i < amounts.size(); i++)
{ {
uint64_t amount = amounts[i]; uint64_t amount = amounts[i];
ioservice.dispatch(boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i]))); tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i])));
} }
waiter.wait();
work.reset();
threadpool.join_all();
ioservice.stop();
} }
else else
{ {

View file

@ -37,7 +37,7 @@ using namespace epee;
#include "common/util.h" #include "common/util.h"
#include "common/updates.h" #include "common/updates.h"
#include "common/download.h" #include "common/download.h"
#include "common/task_region.h" #include "common/threadpool.h"
#include "warnings.h" #include "warnings.h"
#include "crypto/crypto.h" #include "crypto/crypto.h"
#include "cryptonote_config.h" #include "cryptonote_config.h"
@ -74,7 +74,7 @@ namespace cryptonote
m_last_dns_checkpoints_update(0), m_last_dns_checkpoints_update(0),
m_last_json_checkpoints_update(0), m_last_json_checkpoints_update(0),
m_disable_dns_checkpoints(false), m_disable_dns_checkpoints(false),
m_threadpool(tools::thread_group::optimal()), m_threadpool(tools::threadpool::getInstance()),
m_update_download(0) m_update_download(0)
{ {
m_checkpoints_updating.clear(); m_checkpoints_updating.clear();
@ -591,10 +591,10 @@ namespace cryptonote
std::vector<result> results(tx_blobs.size()); std::vector<result> results(tx_blobs.size());
tvc.resize(tx_blobs.size()); tvc.resize(tx_blobs.size());
tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) { tools::threadpool::waiter waiter;
std::list<blobdata>::const_iterator it = tx_blobs.begin(); std::list<blobdata>::const_iterator it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
region.run([&, i, it] { m_threadpool.submit(&waiter, [&, i, it] {
try try
{ {
results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
@ -606,9 +606,8 @@ namespace cryptonote
} }
}); });
} }
}); waiter.wait();
tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) { it = tx_blobs.begin();
std::list<blobdata>::const_iterator it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
if (!results[i].res) if (!results[i].res)
continue; continue;
@ -622,7 +621,7 @@ namespace cryptonote
} }
else else
{ {
region.run([&, i, it] { m_threadpool.submit(&waiter, [&, i, it] {
try try
{ {
results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay); results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
@ -635,10 +634,10 @@ namespace cryptonote
}); });
} }
} }
}); waiter.wait();
bool ok = true; bool ok = true;
std::list<blobdata>::const_iterator it = tx_blobs.begin(); it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) { for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
if (!results[i].res) if (!results[i].res)
{ {

View file

@ -40,7 +40,7 @@
#include "cryptonote_protocol/cryptonote_protocol_handler_common.h" #include "cryptonote_protocol/cryptonote_protocol_handler_common.h"
#include "storages/portable_storage_template_helper.h" #include "storages/portable_storage_template_helper.h"
#include "common/download.h" #include "common/download.h"
#include "common/thread_group.h" #include "common/threadpool.h"
#include "tx_pool.h" #include "tx_pool.h"
#include "blockchain.h" #include "blockchain.h"
#include "cryptonote_basic/miner.h" #include "cryptonote_basic/miner.h"
@ -957,7 +957,7 @@ namespace cryptonote
std::unordered_set<crypto::hash> bad_semantics_txes[2]; std::unordered_set<crypto::hash> bad_semantics_txes[2];
boost::mutex bad_semantics_txes_lock; boost::mutex bad_semantics_txes_lock;
tools::thread_group m_threadpool; tools::threadpool& m_threadpool;
enum { enum {
UPDATES_DISABLED, UPDATES_DISABLED,

View file

@ -30,8 +30,7 @@
#include "misc_log_ex.h" #include "misc_log_ex.h"
#include "common/perf_timer.h" #include "common/perf_timer.h"
#include "common/task_region.h" #include "common/threadpool.h"
#include "common/thread_group.h"
#include "common/util.h" #include "common/util.h"
#include "rctSigs.h" #include "rctSigs.h"
#include "cryptonote_basic/cryptonote_format_utils.h" #include "cryptonote_basic/cryptonote_format_utils.h"
@ -731,17 +730,16 @@ namespace rct {
try try
{ {
if (semantics) { if (semantics) {
tools::threadpool& tpool = tools::threadpool::getInstance();
tools::threadpool::waiter waiter;
std::deque<bool> results(rv.outPk.size(), false); std::deque<bool> results(rv.outPk.size(), false);
tools::thread_group threadpool(tools::thread_group::optimal_with_max(rv.outPk.size()));
tools::task_region(threadpool, [&] (tools::task_region_handle& region) {
DP("range proofs verified?"); DP("range proofs verified?");
for (size_t i = 0; i < rv.outPk.size(); i++) { for (size_t i = 0; i < rv.outPk.size(); i++) {
region.run([&, i] { tpool.submit(&waiter, [&, i] {
results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]);
}); });
} }
}); waiter.wait();
for (size_t i = 0; i < rv.outPk.size(); ++i) { for (size_t i = 0; i < rv.outPk.size(); ++i) {
if (!results[i]) { if (!results[i]) {
@ -794,7 +792,8 @@ namespace rct {
const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size()); const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size());
std::deque<bool> results(threads); std::deque<bool> results(threads);
tools::thread_group threadpool(tools::thread_group::optimal_with_max(threads)); tools::threadpool& tpool = tools::threadpool::getInstance();
tools::threadpool::waiter waiter;
if (semantics) { if (semantics) {
key sumOutpks = identity(); key sumOutpks = identity();
@ -819,13 +818,12 @@ namespace rct {
results.clear(); results.clear();
results.resize(rv.outPk.size()); results.resize(rv.outPk.size());
tools::task_region(threadpool, [&] (tools::task_region_handle& region) {
for (size_t i = 0; i < rv.outPk.size(); i++) { for (size_t i = 0; i < rv.outPk.size(); i++) {
region.run([&, i] { tpool.submit(&waiter, [&, i] {
results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]); results[i] = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]);
}); });
} }
}); waiter.wait();
for (size_t i = 0; i < results.size(); ++i) { for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) { if (!results[i]) {
@ -839,13 +837,12 @@ namespace rct {
results.clear(); results.clear();
results.resize(rv.mixRing.size()); results.resize(rv.mixRing.size());
tools::task_region(threadpool, [&] (tools::task_region_handle& region) {
for (size_t i = 0 ; i < rv.mixRing.size() ; i++) { for (size_t i = 0 ; i < rv.mixRing.size() ; i++) {
region.run([&, i] { tpool.submit(&waiter, [&, i] {
results[i] = verRctMGSimple(message, rv.p.MGs[i], rv.mixRing[i], rv.pseudoOuts[i]); results[i] = verRctMGSimple(message, rv.p.MGs[i], rv.mixRing[i], rv.pseudoOuts[i]);
}); });
} }
}); waiter.wait();
for (size_t i = 0; i < results.size(); ++i) { for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) { if (!results[i]) {

View file

@ -45,6 +45,7 @@ using namespace epee;
#include "cryptonote_basic/cryptonote_basic_impl.h" #include "cryptonote_basic/cryptonote_basic_impl.h"
#include "common/boost_serialization_helper.h" #include "common/boost_serialization_helper.h"
#include "common/command_line.h" #include "common/command_line.h"
#include "common/threadpool.h"
#include "profile_tools.h" #include "profile_tools.h"
#include "crypto/crypto.h" #include "crypto/crypto.h"
#include "serialization/binary_utils.h" #include "serialization/binary_utils.h"
@ -89,14 +90,6 @@ using namespace cryptonote;
#define SECOND_OUTPUT_RELATEDNESS_THRESHOLD 0.0f #define SECOND_OUTPUT_RELATEDNESS_THRESHOLD 0.0f
#define KILL_IOSERVICE() \
do { \
work.reset(); \
while (!ioservice.stopped()) ioservice.poll(); \
threadpool.join_all(); \
ioservice.stop(); \
} while(0)
#define KEY_IMAGE_EXPORT_FILE_MAGIC "Monero key image export\002" #define KEY_IMAGE_EXPORT_FILE_MAGIC "Monero key image export\002"
namespace namespace
@ -684,7 +677,9 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
std::deque<crypto::key_image> ki(tx.vout.size()); std::deque<crypto::key_image> ki(tx.vout.size());
std::deque<uint64_t> amount(tx.vout.size()); std::deque<uint64_t> amount(tx.vout.size());
std::deque<rct::key> mask(tx.vout.size()); std::deque<rct::key> mask(tx.vout.size());
int threads = tools::get_max_concurrency(); tools::threadpool& tpool = tools::threadpool::getInstance();
tools::threadpool::waiter waiter;
int threads = tpool.get_max_concurrency();
const cryptonote::account_keys& keys = m_account.get_keys(); const cryptonote::account_keys& keys = m_account.get_keys();
crypto::key_derivation derivation; crypto::key_derivation derivation;
generate_key_derivation(tx_pub_key, keys.m_view_secret_key, derivation); generate_key_derivation(tx_pub_key, keys.m_view_secret_key, derivation);
@ -720,13 +715,6 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
++num_vouts_received; ++num_vouts_received;
// process the other outs from that tx // process the other outs from that tx
boost::asio::io_service ioservice;
boost::thread_group threadpool;
std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
for (int i = 0; i < threads; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
std::vector<uint64_t> money_transfered(tx.vout.size()); std::vector<uint64_t> money_transfered(tx.vout.size());
std::deque<bool> error(tx.vout.size()); std::deque<bool> error(tx.vout.size());
@ -734,10 +722,10 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
// the first one was already checked // the first one was already checked
for (size_t i = 1; i < tx.vout.size(); ++i) for (size_t i = 1; i < tx.vout.size(); ++i)
{ {
ioservice.dispatch(boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i, tpool.submit(&waiter, boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i,
std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i]))); std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i])));
} }
KILL_IOSERVICE(); waiter.wait();
for (size_t i = 1; i < tx.vout.size(); ++i) for (size_t i = 1; i < tx.vout.size(); ++i)
{ {
if (error[i]) if (error[i])
@ -766,23 +754,18 @@ void wallet2::process_new_transaction(const crypto::hash &txid, const cryptonote
} }
else if (tx.vout.size() > 1 && threads > 1) else if (tx.vout.size() > 1 && threads > 1)
{ {
boost::asio::io_service ioservice; tools::threadpool& tpool = tools::threadpool::getInstance();
boost::thread_group threadpool; tools::threadpool::waiter waiter;
std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
for (int i = 0; i < threads; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
std::vector<uint64_t> money_transfered(tx.vout.size()); std::vector<uint64_t> money_transfered(tx.vout.size());
std::deque<bool> error(tx.vout.size()); std::deque<bool> error(tx.vout.size());
std::deque<bool> received(tx.vout.size()); std::deque<bool> received(tx.vout.size());
for (size_t i = 0; i < tx.vout.size(); ++i) for (size_t i = 0; i < tx.vout.size(); ++i)
{ {
ioservice.dispatch(boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i, tpool.submit(&waiter, boost::bind(&wallet2::check_acc_out_precomp, this, std::cref(keys.m_account_address.m_spend_public_key), std::cref(tx.vout[i]), std::cref(derivation), i,
std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i]))); std::ref(received[i]), std::ref(money_transfered[i]), std::ref(error[i])));
} }
KILL_IOSERVICE(); waiter.wait();
tx_money_got_in_outs = 0; tx_money_got_in_outs = 0;
for (size_t i = 0; i < tx.vout.size(); ++i) for (size_t i = 0; i < tx.vout.size(); ++i)
{ {
@ -1251,7 +1234,8 @@ void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote::
THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "size mismatch"); THROW_WALLET_EXCEPTION_IF(blocks.size() != o_indices.size(), error::wallet_internal_error, "size mismatch");
int threads = tools::get_max_concurrency(); tools::threadpool& tpool = tools::threadpool::getInstance();
int threads = tpool.get_max_concurrency();
if (threads > 1) if (threads > 1)
{ {
std::vector<crypto::hash> round_block_hashes(threads); std::vector<crypto::hash> round_block_hashes(threads);
@ -1262,23 +1246,16 @@ void wallet2::process_blocks(uint64_t start_height, const std::list<cryptonote::
for (size_t b = 0; b < blocks_size; b += threads) for (size_t b = 0; b < blocks_size; b += threads)
{ {
size_t round_size = std::min((size_t)threads, blocks_size - b); size_t round_size = std::min((size_t)threads, blocks_size - b);
tools::threadpool::waiter waiter;
boost::asio::io_service ioservice;
boost::thread_group threadpool;
std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
for (size_t i = 0; i < round_size; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
std::list<block_complete_entry>::const_iterator tmpblocki = blocki; std::list<block_complete_entry>::const_iterator tmpblocki = blocki;
for (size_t i = 0; i < round_size; ++i) for (size_t i = 0; i < round_size; ++i)
{ {
ioservice.dispatch(boost::bind(&wallet2::parse_block_round, this, std::cref(tmpblocki->block), tpool.submit(&waiter, boost::bind(&wallet2::parse_block_round, this, std::cref(tmpblocki->block),
std::ref(round_blocks[i]), std::ref(round_block_hashes[i]), std::ref(error[i]))); std::ref(round_blocks[i]), std::ref(round_block_hashes[i]), std::ref(error[i])));
++tmpblocki; ++tmpblocki;
} }
KILL_IOSERVICE(); waiter.wait();
tmpblocki = blocki; tmpblocki = blocki;
for (size_t i = 0; i < round_size; ++i) for (size_t i = 0; i < round_size; ++i)
{ {
@ -1698,7 +1675,8 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
size_t try_count = 0; size_t try_count = 0;
crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash; crypto::hash last_tx_hash_id = m_transfers.size() ? m_transfers.back().m_txid : null_hash;
std::list<crypto::hash> short_chain_history; std::list<crypto::hash> short_chain_history;
boost::thread pull_thread; tools::threadpool& tpool = tools::threadpool::getInstance();
tools::threadpool::waiter waiter;
uint64_t blocks_start_height; uint64_t blocks_start_height;
std::list<cryptonote::block_complete_entry> blocks; std::list<cryptonote::block_complete_entry> blocks;
std::vector<COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> o_indices; std::vector<COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> o_indices;
@ -1736,11 +1714,11 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
std::list<cryptonote::block_complete_entry> next_blocks; std::list<cryptonote::block_complete_entry> next_blocks;
std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> next_o_indices; std::vector<cryptonote::COMMAND_RPC_GET_BLOCKS_FAST::block_output_indices> next_o_indices;
bool error = false; bool error = false;
pull_thread = boost::thread([&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);}); tpool.submit(&waiter, [&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks, next_o_indices, error);});
process_blocks(blocks_start_height, blocks, o_indices, added_blocks); process_blocks(blocks_start_height, blocks, o_indices, added_blocks);
blocks_fetched += added_blocks; blocks_fetched += added_blocks;
pull_thread.join(); waiter.wait();
if(blocks_start_height == next_blocks_start_height) if(blocks_start_height == next_blocks_start_height)
{ {
m_node_rpc_proxy.set_height(m_blockchain.size()); m_node_rpc_proxy.set_height(m_blockchain.size());
@ -1762,8 +1740,7 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re
catch (const std::exception&) catch (const std::exception&)
{ {
blocks_fetched += added_blocks; blocks_fetched += added_blocks;
if (pull_thread.joinable()) waiter.wait();
pull_thread.join();
if(try_count < 3) if(try_count < 3)
{ {
LOG_PRINT_L1("Another try pull_blocks (try_count=" << try_count << ")..."); LOG_PRINT_L1("Another try pull_blocks (try_count=" << try_count << ")...");

View file

@ -55,7 +55,6 @@ set(unit_tests_sources
test_tx_utils.cpp test_tx_utils.cpp
test_peerlist.cpp test_peerlist.cpp
test_protocol_pack.cpp test_protocol_pack.cpp
thread_group.cpp
hardfork.cpp hardfork.cpp
unbound.cpp unbound.cpp
uri.cpp uri.cpp

View file

@ -1,177 +0,0 @@
// Copyright (c) 2014-2017, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "gtest/gtest.h"
#include <atomic>
#include "common/task_region.h"
#include "common/thread_group.h"
TEST(ThreadGroup, NoThreads)
{
tools::task_region(tools::thread_group(0), [] (tools::task_region_handle& region) {
std::atomic<bool> completed{false};
region.run([&] { completed = true; });
EXPECT_TRUE(completed);
});
{
tools::thread_group group(0);
std::atomic<bool> completed{false};
group.dispatch([&] { completed = true; });
EXPECT_TRUE(completed);
}
}
TEST(ThreadGroup, OneThread)
{
tools::thread_group group(1);
for (unsigned i = 0; i < 3; ++i) {
std::atomic<bool> completed{false};
tools::task_region(group, [&] (tools::task_region_handle& region) {
region.run([&] { completed = true; });
});
EXPECT_TRUE(completed);
}
}
TEST(ThreadGroup, UseActiveThreadOnSync)
{
tools::thread_group group(1);
for (unsigned i = 0; i < 3; ++i) {
std::atomic<bool> completed{false};
tools::task_region(group, [&] (tools::task_region_handle& region) {
region.run([&] { while (!completed); });
region.run([&] { completed = true; });
});
EXPECT_TRUE(completed);
}
}
TEST(ThreadGroup, InOrder)
{
tools::thread_group group(1);
for (unsigned i = 0; i < 3; ++i) {
std::atomic<unsigned> count{0};
std::atomic<bool> completed{false};
tools::task_region(group, [&] (tools::task_region_handle& region) {
region.run([&] { while (!completed); });
region.run([&] { if (count == 0) completed = true; });
region.run([&] { ++count; });
});
EXPECT_TRUE(completed);
EXPECT_EQ(1u, count);
}
}
TEST(ThreadGroup, TwoThreads)
{
tools::thread_group group(2);
for (unsigned i = 0; i < 3; ++i) {
std::atomic<bool> completed{false};
tools::task_region(group, [&] (tools::task_region_handle& region) {
region.run([&] { while (!completed); });
region.run([&] { while (!completed); });
region.run([&] { completed = true; });
});
EXPECT_TRUE(completed);
}
}
TEST(ThreadGroup, Nested) {
struct fib {
unsigned operator()(tools::thread_group& group, unsigned value) const {
if (value == 0 || value == 1) {
return value;
}
unsigned left = 0;
unsigned right = 0;
tools::task_region(group, [&, value] (tools::task_region_handle& region) {
region.run([&, value] { left = fib{}(group, value - 1); });
region.run([&, value] { right = fib{}(group, value - 2); } );
});
return left + right;
}
unsigned operator()(tools::thread_group&& group, unsigned value) const {
return (*this)(group, value);
}
};
// be careful of depth on asynchronous version
EXPECT_EQ(6765, fib{}(tools::thread_group(0), 20));
EXPECT_EQ(377, fib{}(tools::thread_group(1), 14));
}
TEST(ThreadGroup, Many)
{
tools::thread_group group(1);
for (unsigned i = 0; i < 3; ++i) {
std::atomic<unsigned> count{0};
tools::task_region(group, [&] (tools::task_region_handle& region) {
for (unsigned tasks = 0; tasks < 1000; ++tasks) {
region.run([&] { ++count; });
}
});
EXPECT_EQ(1000u, count);
}
}
TEST(ThreadGroup, ThrowInTaskRegion)
{
class test_exception final : std::exception {
public:
explicit test_exception() : std::exception() {}
virtual const char* what() const noexcept override {
return "test_exception";
}
};
tools::thread_group group(1);
for (unsigned i = 0; i < 3; ++i) {
std::atomic<unsigned> count{0};
EXPECT_THROW(
[&] {
tools::task_region(group, [&] (tools::task_region_handle& region) {
for (unsigned tasks = 0; tasks < 1000; ++tasks) {
region.run([&] { ++count; });
}
throw test_exception();
});
}(),
test_exception
);
EXPECT_GE(1000u, count);
}
}