core: thread most of handle_incoming_tx

This commit is contained in:
moneromooo-monero 2017-07-10 15:38:56 +01:00
parent f57ee382b8
commit 158c3ecff3
No known key found for this signature in database
GPG key ID: 686F07454D6CEFC3
4 changed files with 114 additions and 31 deletions
src
tests/unit_tests

View file

@ -37,6 +37,7 @@ using namespace epee;
#include "common/util.h"
#include "common/updates.h"
#include "common/download.h"
#include "common/task_region.h"
#include "warnings.h"
#include "crypto/crypto.h"
#include "cryptonote_config.h"
@ -76,6 +77,7 @@ namespace cryptonote
m_checkpoints_path(""),
m_last_dns_checkpoints_update(0),
m_last_json_checkpoints_update(0),
m_threadpool(tools::thread_group::optimal()),
m_update_download(0)
{
m_checkpoints_updating.clear();
@ -483,11 +485,9 @@ namespace cryptonote
return false;
}
//-----------------------------------------------------------------------------------------------
bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
bool core::handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay)
{
tvc = boost::value_initialized<tx_verification_context>();
//want to process all transactions sequentially
CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
if(tx_blob.size() > get_max_tx_size())
{
@ -497,9 +497,8 @@ namespace cryptonote
return false;
}
crypto::hash tx_hash = null_hash;
crypto::hash tx_prefixt_hash = null_hash;
transaction tx;
tx_hash = null_hash;
tx_prefixt_hash = null_hash;
if(!parse_tx_from_blob(tx, tx_hash, tx_prefixt_hash, tx_blob))
{
@ -509,15 +508,18 @@ namespace cryptonote
}
//std::cout << "!"<< tx.vin.size() << std::endl;
bad_semantics_txes_lock.lock();
for (int idx = 0; idx < 2; ++idx)
{
if (bad_semantics_txes[idx].find(tx_hash) != bad_semantics_txes[idx].end())
{
bad_semantics_txes_lock.unlock();
LOG_PRINT_L1("Transaction already seen with bad semantics, rejected");
tvc.m_verifivation_failed = true;
return false;
}
}
bad_semantics_txes_lock.unlock();
uint8_t version = m_blockchain_storage.get_current_hard_fork_version();
const size_t max_tx_version = version == 1 ? 1 : 2;
@ -528,18 +530,11 @@ namespace cryptonote
return false;
}
if(m_mempool.have_tx(tx_hash))
{
LOG_PRINT_L2("tx " << tx_hash << "already have transaction in tx_pool");
return true;
}
if(m_blockchain_storage.have_tx(tx_hash))
{
LOG_PRINT_L2("tx " << tx_hash << " already have transaction in blockchain");
return true;
}
return true;
}
//-----------------------------------------------------------------------------------------------
bool core::handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay)
{
if(!check_tx_syntax(tx))
{
LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " syntax, rejected");
@ -568,23 +563,84 @@ namespace cryptonote
{
LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " semantic, rejected");
tvc.m_verifivation_failed = true;
bad_semantics_txes_lock.lock();
bad_semantics_txes[0].insert(tx_hash);
if (bad_semantics_txes[0].size() >= BAD_SEMANTICS_TXES_MAX_SIZE)
{
std::swap(bad_semantics_txes[0], bad_semantics_txes[1]);
bad_semantics_txes[0].clear();
}
bad_semantics_txes_lock.unlock();
return false;
}
bool r = add_new_tx(tx, tx_hash, tx_prefixt_hash, tx_blob.size(), tvc, keeped_by_block, relayed, do_not_relay);
if(tvc.m_verifivation_failed)
{MERROR_VER("Transaction verification failed: " << tx_hash);}
else if(tvc.m_verifivation_impossible)
{MERROR_VER("Transaction verification impossible: " << tx_hash);}
return true;
}
//-----------------------------------------------------------------------------------------------
bool core::handle_incoming_txs(const std::list<blobdata>& tx_blobs, std::vector<tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
{
struct result { bool res; cryptonote::transaction tx; crypto::hash hash; crypto::hash prefix_hash; bool in_txpool; bool in_blockchain; };
std::vector<result> results(tx_blobs.size());
if(tvc.m_added_to_pool)
MDEBUG("tx added: " << tx_hash);
tvc.resize(tx_blobs.size());
tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
std::list<blobdata>::const_iterator it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
region.run([&, i, it] {
results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
});
}
});
tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
std::list<blobdata>::const_iterator it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
if (!results[i].res)
continue;
if(m_mempool.have_tx(results[i].hash))
{
LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool");
}
else if(m_blockchain_storage.have_tx(results[i].hash))
{
LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain");
}
else
{
region.run([&, i, it] {
results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
});
}
}
});
bool ok = true;
std::list<blobdata>::const_iterator it = tx_blobs.begin();
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
if (!results[i].res)
{
ok = false;
continue;
}
ok &= add_new_tx(results[i].tx, results[i].hash, results[i].prefix_hash, it->size(), tvc[i], keeped_by_block, relayed, do_not_relay);
if(tvc[i].m_verifivation_failed)
{MERROR_VER("Transaction verification failed: " << results[i].hash);}
else if(tvc[i].m_verifivation_impossible)
{MERROR_VER("Transaction verification impossible: " << results[i].hash);}
if(tvc[i].m_added_to_pool)
MDEBUG("tx added: " << results[i].hash);
}
return ok;
}
//-----------------------------------------------------------------------------------------------
bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
{
std::list<cryptonote::blobdata> tx_blobs;
tx_blobs.push_back(tx_blob);
std::vector<tx_verification_context> tvcv(1);
bool r = handle_incoming_txs(tx_blobs, tvcv, keeped_by_block, relayed, do_not_relay);
tvc = tvcv[0];
return r;
}
//-----------------------------------------------------------------------------------------------

View file

@ -40,6 +40,7 @@
#include "cryptonote_protocol/cryptonote_protocol_handler_common.h"
#include "storages/portable_storage_template_helper.h"
#include "common/download.h"
#include "common/thread_group.h"
#include "tx_pool.h"
#include "blockchain.h"
#include "cryptonote_basic/miner.h"
@ -114,6 +115,22 @@ namespace cryptonote
*/
bool handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay);
/**
* @brief handles a list of incoming transactions
*
* Parses incoming transactions and, if nothing is obviously wrong,
* passes them along to the transaction pool
*
* @param tx_blobs the txs to handle
* @param tvc metadata about the transactions' validity
* @param keeped_by_block if the transactions have been in a block
* @param relayed whether or not the transactions were relayed to us
* @param do_not_relay whether to prevent the transactions from being relayed
*
* @return true if the transactions made it to the transaction pool, otherwise false
*/
bool handle_incoming_txs(const std::list<blobdata>& tx_blobs, std::vector<tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay);
/**
* @brief handles an incoming block
*
@ -753,6 +770,9 @@ namespace cryptonote
*/
bool check_tx_semantic(const transaction& tx, bool keeped_by_block) const;
bool handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay);
bool handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay);
/**
* @copydoc miner::on_block_chain_update
*
@ -859,6 +879,9 @@ namespace cryptonote
time_t start_time;
std::unordered_set<crypto::hash> bad_semantics_txes[2];
boost::mutex bad_semantics_txes_lock;
tools::thread_group m_threadpool;
enum {
UPDATES_DISABLED,

View file

@ -986,6 +986,7 @@ namespace cryptonote
m_core.prepare_handle_incoming_blocks(blocks);
uint64_t block_process_time_full = 0, transactions_process_time_full = 0;
size_t num_txs = 0;
for(const block_complete_entry& block_entry: blocks)
{
if (m_stopping)
@ -996,15 +997,17 @@ namespace cryptonote
// process transactions
TIME_MEASURE_START(transactions_process_time);
for(auto& tx_blob: block_entry.txs)
num_txs += block_entry.txs.size();
std::vector<tx_verification_context> tvc;
m_core.handle_incoming_txs(block_entry.txs, tvc, true, true, false);
std::list<blobdata>::const_iterator it = block_entry.txs.begin();
for (size_t i = 0; i < tvc.size(); ++i, ++it)
{
tx_verification_context tvc = AUTO_VAL_INIT(tvc);
m_core.handle_incoming_tx(tx_blob, tvc, true, true, false);
if(tvc.m_verifivation_failed)
if(tvc[i].m_verifivation_failed)
{
if (!m_p2p->for_connection(span_connection_id, [&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t f)->bool{
LOG_ERROR_CCONTEXT("transaction verification failed on NOTIFY_RESPONSE_GET_OBJECTS, tx_id = "
<< epee::string_tools::pod_to_hex(get_blob_hash(tx_blob)) << ", dropping connection");
<< epee::string_tools::pod_to_hex(get_blob_hash(*it)) << ", dropping connection");
m_p2p->drop_connection(context);
m_block_queue.flush_spans(context.m_connection_id, true);
return true;
@ -1065,7 +1068,7 @@ namespace cryptonote
} // each download block
LOG_PRINT_CCONTEXT_L2("Block process time (" << blocks.size() << " blocks): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms");
MCINFO("sync-info", "Block process time (" << blocks.size() << " blocks, " << num_txs << " txs): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms");
m_core.cleanup_handle_incoming_blocks();

View file

@ -52,6 +52,7 @@ public:
bool have_block(const crypto::hash& id) const {return true;}
bool get_blockchain_top(uint64_t& height, crypto::hash& top_id)const{height=0;top_id=cryptonote::null_hash;return true;}
bool handle_incoming_tx(const cryptonote::blobdata& tx_blob, cryptonote::tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay) { return true; }
bool handle_incoming_txs(const std::list<cryptonote::blobdata>& tx_blob, std::vector<cryptonote::tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay) { return true; }
bool handle_incoming_block(const cryptonote::blobdata& block_blob, cryptonote::block_verification_context& bvc, bool update_miner_blocktemplate = true) { return true; }
void pause_mine(){}
void resume_mine(){}