mirror of
https://github.com/SChernykh/p2pool.git
synced 2025-01-10 12:44:31 +00:00
Fixed data races when using uv_async_send
This commit is contained in:
parent
cfddaf1508
commit
2c5cfb6442
5 changed files with 54 additions and 53 deletions
|
@ -91,6 +91,7 @@ P2PServer::P2PServer(p2pool* pool)
|
||||||
uv_mutex_init_checked(&m_broadcastLock);
|
uv_mutex_init_checked(&m_broadcastLock);
|
||||||
uv_rwlock_init_checked(&m_cachedBlocksLock);
|
uv_rwlock_init_checked(&m_cachedBlocksLock);
|
||||||
uv_mutex_init_checked(&m_connectToPeersLock);
|
uv_mutex_init_checked(&m_connectToPeersLock);
|
||||||
|
uv_mutex_init_checked(&m_showPeersLock);
|
||||||
|
|
||||||
int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast);
|
int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast);
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -149,6 +150,7 @@ P2PServer::~P2PServer()
|
||||||
uv_rwlock_destroy(&m_cachedBlocksLock);
|
uv_rwlock_destroy(&m_cachedBlocksLock);
|
||||||
|
|
||||||
uv_mutex_destroy(&m_connectToPeersLock);
|
uv_mutex_destroy(&m_connectToPeersLock);
|
||||||
|
uv_mutex_destroy(&m_showPeersLock);
|
||||||
|
|
||||||
delete m_block;
|
delete m_block;
|
||||||
delete m_cache;
|
delete m_cache;
|
||||||
|
@ -204,14 +206,13 @@ void P2PServer::store_in_cache(const PoolBlock& block)
|
||||||
}
|
}
|
||||||
|
|
||||||
void P2PServer::connect_to_peers_async(const char* peer_list)
|
void P2PServer::connect_to_peers_async(const char* peer_list)
|
||||||
{
|
|
||||||
{
|
{
|
||||||
MutexLock lock(m_connectToPeersLock);
|
MutexLock lock(m_connectToPeersLock);
|
||||||
|
|
||||||
if (!m_connectToPeersData.empty()) {
|
if (!m_connectToPeersData.empty()) {
|
||||||
m_connectToPeersData.append(1, ',');
|
m_connectToPeersData.append(1, ',');
|
||||||
}
|
}
|
||||||
m_connectToPeersData.append(peer_list);
|
m_connectToPeersData.append(peer_list);
|
||||||
}
|
|
||||||
|
|
||||||
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync))) {
|
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync))) {
|
||||||
uv_async_send(&m_connectToPeersAsync);
|
uv_async_send(&m_connectToPeersAsync);
|
||||||
|
@ -827,35 +828,23 @@ void P2PServer::broadcast(const PoolBlock& block, const PoolBlock* parent)
|
||||||
|
|
||||||
LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->compact_blob.size() << '/' << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (compact/pruned/full)");
|
LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->compact_blob.size() << '/' << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (compact/pruned/full)");
|
||||||
|
|
||||||
{
|
|
||||||
MutexLock lock(m_broadcastLock);
|
MutexLock lock(m_broadcastLock);
|
||||||
m_broadcastQueue.push_back(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync))) {
|
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync))) {
|
||||||
|
delete data;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_broadcastQueue.push_back(data);
|
||||||
|
|
||||||
const int err = uv_async_send(&m_broadcastAsync);
|
const int err = uv_async_send(&m_broadcastAsync);
|
||||||
if (err) {
|
if (err) {
|
||||||
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
|
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
|
||||||
|
|
||||||
bool found = false;
|
m_broadcastQueue.pop_back();
|
||||||
{
|
|
||||||
MutexLock lock(m_broadcastLock);
|
|
||||||
|
|
||||||
auto it = std::find(m_broadcastQueue.begin(), m_broadcastQueue.end(), data);
|
|
||||||
if (it != m_broadcastQueue.end()) {
|
|
||||||
found = true;
|
|
||||||
m_broadcastQueue.erase(it);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (found) {
|
|
||||||
delete data;
|
delete data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
void P2PServer::on_broadcast()
|
void P2PServer::on_broadcast()
|
||||||
{
|
{
|
||||||
|
@ -988,6 +977,8 @@ void P2PServer::print_status()
|
||||||
|
|
||||||
void P2PServer::show_peers_async()
|
void P2PServer::show_peers_async()
|
||||||
{
|
{
|
||||||
|
MutexLock lock(m_showPeersLock);
|
||||||
|
|
||||||
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync))) {
|
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync))) {
|
||||||
uv_async_send(&m_showPeersAsync);
|
uv_async_send(&m_showPeersAsync);
|
||||||
}
|
}
|
||||||
|
@ -1283,10 +1274,19 @@ void P2PServer::on_shutdown()
|
||||||
|
|
||||||
uv_timer_stop(&m_timer);
|
uv_timer_stop(&m_timer);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
|
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
|
||||||
|
{
|
||||||
|
MutexLock lock(m_broadcastLock);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync), nullptr);
|
uv_close(reinterpret_cast<uv_handle_t*>(&m_broadcastAsync), nullptr);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
MutexLock lock(m_connectToPeersLock);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync), nullptr);
|
uv_close(reinterpret_cast<uv_handle_t*>(&m_connectToPeersAsync), nullptr);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
MutexLock lock(m_showPeersLock);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync), nullptr);
|
uv_close(reinterpret_cast<uv_handle_t*>(&m_showPeersAsync), nullptr);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void P2PServer::api_update_local_stats()
|
void P2PServer::api_update_local_stats()
|
||||||
{
|
{
|
||||||
|
|
|
@ -267,6 +267,7 @@ private:
|
||||||
|
|
||||||
static void on_connect_to_peers(uv_async_t* handle);
|
static void on_connect_to_peers(uv_async_t* handle);
|
||||||
|
|
||||||
|
uv_mutex_t m_showPeersLock;
|
||||||
uv_async_t m_showPeersAsync;
|
uv_async_t m_showPeersAsync;
|
||||||
|
|
||||||
static void on_show_peers(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->show_peers(); }
|
static void on_show_peers(uv_async_t* handle) { reinterpret_cast<P2PServer*>(handle->data)->show_peers(); }
|
||||||
|
|
|
@ -100,6 +100,7 @@ void p2pool_api::create_dir(const std::string& path)
|
||||||
|
|
||||||
void p2pool_api::on_stop()
|
void p2pool_api::on_stop()
|
||||||
{
|
{
|
||||||
|
MutexLock lock(m_dumpDataLock);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_dumpToFileAsync), nullptr);
|
uv_close(reinterpret_cast<uv_handle_t*>(&m_dumpToFileAsync), nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,10 +120,8 @@ void p2pool_api::dump_to_file_async_internal(Category category, const char* file
|
||||||
case Category::LOCAL: path = m_localPath + filename; break;
|
case Category::LOCAL: path = m_localPath + filename; break;
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
|
||||||
MutexLock lock(m_dumpDataLock);
|
MutexLock lock(m_dumpDataLock);
|
||||||
m_dumpData[path] = std::move(buf);
|
m_dumpData[path] = std::move(buf);
|
||||||
}
|
|
||||||
|
|
||||||
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_dumpToFileAsync))) {
|
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_dumpToFileAsync))) {
|
||||||
uv_async_send(&m_dumpToFileAsync);
|
uv_async_send(&m_dumpToFileAsync);
|
||||||
|
|
|
@ -63,6 +63,7 @@ StratumServer::StratumServer(p2pool* pool)
|
||||||
m_hashrateData[0] = { seconds_since_epoch(), 0 };
|
m_hashrateData[0] = { seconds_since_epoch(), 0 };
|
||||||
|
|
||||||
uv_mutex_init_checked(&m_blobsQueueLock);
|
uv_mutex_init_checked(&m_blobsQueueLock);
|
||||||
|
uv_mutex_init_checked(&m_showWorkersLock);
|
||||||
uv_mutex_init_checked(&m_rngLock);
|
uv_mutex_init_checked(&m_rngLock);
|
||||||
uv_rwlock_init_checked(&m_hashrateDataLock);
|
uv_rwlock_init_checked(&m_hashrateDataLock);
|
||||||
|
|
||||||
|
@ -91,6 +92,7 @@ StratumServer::~StratumServer()
|
||||||
shutdown_tcp();
|
shutdown_tcp();
|
||||||
|
|
||||||
uv_mutex_destroy(&m_blobsQueueLock);
|
uv_mutex_destroy(&m_blobsQueueLock);
|
||||||
|
uv_mutex_destroy(&m_showWorkersLock);
|
||||||
uv_mutex_destroy(&m_rngLock);
|
uv_mutex_destroy(&m_rngLock);
|
||||||
uv_rwlock_destroy(&m_hashrateDataLock);
|
uv_rwlock_destroy(&m_hashrateDataLock);
|
||||||
|
|
||||||
|
@ -161,29 +163,19 @@ void StratumServer::on_block(const BlockTemplate& block)
|
||||||
|
|
||||||
{
|
{
|
||||||
MutexLock lock(m_blobsQueueLock);
|
MutexLock lock(m_blobsQueueLock);
|
||||||
m_blobsQueue.push_back(blobs_data);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_blobsAsync))) {
|
if (uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_blobsAsync))) {
|
||||||
|
delete blobs_data;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_blobsQueue.push_back(blobs_data);
|
||||||
|
|
||||||
const int err = uv_async_send(&m_blobsAsync);
|
const int err = uv_async_send(&m_blobsAsync);
|
||||||
if (err) {
|
if (err) {
|
||||||
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
|
LOGERR(1, "uv_async_send failed, error " << uv_err_name(err));
|
||||||
|
|
||||||
bool found = false;
|
m_blobsQueue.pop_back();
|
||||||
{
|
|
||||||
MutexLock lock(m_blobsQueueLock);
|
|
||||||
|
|
||||||
auto it = std::find(m_blobsQueue.begin(), m_blobsQueue.end(), blobs_data);
|
|
||||||
if (it != m_blobsQueue.end()) {
|
|
||||||
found = true;
|
|
||||||
m_blobsQueue.erase(it);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (found) {
|
|
||||||
delete blobs_data;
|
delete blobs_data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -487,6 +479,8 @@ void StratumServer::print_status()
|
||||||
|
|
||||||
void StratumServer::show_workers_async()
|
void StratumServer::show_workers_async()
|
||||||
{
|
{
|
||||||
|
MutexLock lock(m_showWorkersLock);
|
||||||
|
|
||||||
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showWorkersAsync))) {
|
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&m_showWorkersAsync))) {
|
||||||
uv_async_send(&m_showWorkersAsync);
|
uv_async_send(&m_showWorkersAsync);
|
||||||
}
|
}
|
||||||
|
@ -1031,9 +1025,15 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/)
|
||||||
|
|
||||||
void StratumServer::on_shutdown()
|
void StratumServer::on_shutdown()
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
MutexLock lock(m_blobsQueueLock);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_blobsAsync), nullptr);
|
uv_close(reinterpret_cast<uv_handle_t*>(&m_blobsAsync), nullptr);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
MutexLock lock(m_showWorkersLock);
|
||||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_showWorkersAsync), nullptr);
|
uv_close(reinterpret_cast<uv_handle_t*>(&m_showWorkersAsync), nullptr);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
StratumServer::StratumClient::StratumClient()
|
StratumServer::StratumClient::StratumClient()
|
||||||
: Client(m_stratumReadBuf, sizeof(m_stratumReadBuf))
|
: Client(m_stratumReadBuf, sizeof(m_stratumReadBuf))
|
||||||
|
|
|
@ -127,6 +127,7 @@ private:
|
||||||
static void on_blobs_ready(uv_async_t* handle) { reinterpret_cast<StratumServer*>(handle->data)->on_blobs_ready(); }
|
static void on_blobs_ready(uv_async_t* handle) { reinterpret_cast<StratumServer*>(handle->data)->on_blobs_ready(); }
|
||||||
void on_blobs_ready();
|
void on_blobs_ready();
|
||||||
|
|
||||||
|
uv_mutex_t m_showWorkersLock;
|
||||||
uv_async_t m_showWorkersAsync;
|
uv_async_t m_showWorkersAsync;
|
||||||
|
|
||||||
static void on_show_workers(uv_async_t* handle) { reinterpret_cast<StratumServer*>(handle->data)->show_workers(); }
|
static void on_show_workers(uv_async_t* handle) { reinterpret_cast<StratumServer*>(handle->data)->show_workers(); }
|
||||||
|
|
Loading…
Reference in a new issue