Merge pull request #2229

be7810c qt: implement FutureScheduler, always await async code to complete (xiphon)
This commit is contained in:
luigi1111 2019-06-25 14:02:40 -05:00
commit 036b3b56c5
No known key found for this signature in database
GPG key ID: F4ACA0183641E010
9 changed files with 277 additions and 187 deletions

View file

@ -63,6 +63,7 @@ HEADERS += \
src/libwalletqt/UnsignedTransaction.h \
Logger.h \
MainApp.h \
src/qt/FutureScheduler.h \
src/qt/ipc.h \
src/qt/mime.h \
src/qt/KeysFiles.h \
@ -96,6 +97,7 @@ SOURCES += main.cpp \
src/libwalletqt/UnsignedTransaction.cpp \
Logger.cpp \
MainApp.cpp \
src/qt/FutureScheduler.cpp \
src/qt/ipc.cpp \
src/qt/mime.cpp \
src/qt/KeysFiles.cpp \

View file

@ -131,19 +131,12 @@ bool DaemonManager::start(const QString &flags, NetworkType::Type nettype, const
}
// Start start watcher
QFuture<bool> future = QtConcurrent::run(this, &DaemonManager::startWatcher, nettype);
QFutureWatcher<bool> * watcher = new QFutureWatcher<bool>();
connect(watcher, &QFutureWatcher<bool>::finished,
this, [this, watcher]() {
QFuture<bool> future = watcher->future();
watcher->deleteLater();
if(future.result())
m_scheduler.run([this, nettype] {
if (startWatcher(nettype))
emit daemonStarted();
else
emit daemonStartFailure();
});
watcher->setFuture(future);
return true;
}
@ -155,17 +148,12 @@ bool DaemonManager::stop(NetworkType::Type nettype)
qDebug() << message;
// Start stop watcher - Will kill if not shutting down
QFuture<bool> future = QtConcurrent::run(this, &DaemonManager::stopWatcher, nettype);
QFutureWatcher<bool> * watcher = new QFutureWatcher<bool>();
connect(watcher, &QFutureWatcher<bool>::finished,
this, [this, watcher]() {
QFuture<bool> future = watcher->future();
watcher->deleteLater();
if(future.result()) {
m_scheduler.run([this, nettype] {
if (stopWatcher(nettype))
{
emit daemonStopped();
}
});
watcher->setFuture(future);
return true;
}
@ -330,6 +318,7 @@ QVariantMap DaemonManager::validateDataDir(const QString &dataDir) const
DaemonManager::DaemonManager(QObject *parent)
: QObject(parent)
, m_scheduler(this)
{
// Platform depetent path to monerod
@ -344,3 +333,8 @@ DaemonManager::DaemonManager(QObject *parent)
m_has_daemon = false;
}
}
DaemonManager::~DaemonManager()
{
m_scheduler.shutdownWaitForFinished();
}

View file

@ -33,6 +33,7 @@
#include <QUrl>
#include <QProcess>
#include <QVariantMap>
#include "qt/FutureScheduler.h"
#include "NetworkType.h"
class DaemonManager : public QObject
@ -71,6 +72,8 @@ public slots:
private:
explicit DaemonManager(QObject *parent = 0);
~DaemonManager();
static DaemonManager * m_instance;
static QStringList m_clArgs;
QProcess *m_daemon;
@ -79,6 +82,7 @@ private:
bool m_has_daemon = true;
bool m_app_exit = false;
FutureScheduler m_scheduler;
};
#endif // DAEMONMANAGER_H

View file

@ -150,13 +150,8 @@ NetworkType::Type Wallet::nettype() const
void Wallet::updateConnectionStatusAsync()
{
QFuture<Monero::Wallet::ConnectionStatus> future = QtConcurrent::run(m_walletImpl, &Monero::Wallet::connected);
QFutureWatcher<Monero::Wallet::ConnectionStatus> *connectionWatcher = new QFutureWatcher<Monero::Wallet::ConnectionStatus>();
connect(connectionWatcher, &QFutureWatcher<Monero::Wallet::ConnectionStatus>::finished, [=]() {
QFuture<Monero::Wallet::ConnectionStatus> future = connectionWatcher->future();
connectionWatcher->deleteLater();
ConnectionStatus newStatus = static_cast<ConnectionStatus>(future.result());
m_scheduler.run([this] {
ConnectionStatus newStatus = static_cast<ConnectionStatus>(m_walletImpl->connected());
if (newStatus != m_connectionStatus || !m_initialized) {
m_initialized = true;
m_connectionStatus = newStatus;
@ -166,7 +161,6 @@ void Wallet::updateConnectionStatusAsync()
// Release lock
m_connectionStatusRunning = false;
});
connectionWatcher->setFuture(future);
}
Wallet::ConnectionStatus Wallet::connected(bool forceCheck)
@ -246,15 +240,10 @@ void Wallet::initAsync(const QString &daemonAddress, quint64 upperTransactionLim
emit connectionStatusChanged(m_connectionStatus);
}
QFuture<bool> future = QtConcurrent::run(this, &Wallet::init,
daemonAddress, upperTransactionLimit, isRecovering, isRecoveringFromDevice, restoreHeight);
QFutureWatcher<bool> * watcher = new QFutureWatcher<bool>();
connect(watcher, &QFutureWatcher<bool>::finished,
this, [this, watcher, daemonAddress, upperTransactionLimit, isRecovering, restoreHeight]() {
QFuture<bool> future = watcher->future();
watcher->deleteLater();
if(future.result()){
m_scheduler.run([this, daemonAddress, upperTransactionLimit, isRecovering, isRecoveringFromDevice, restoreHeight] {
bool success = init(daemonAddress, upperTransactionLimit, isRecovering, isRecoveringFromDevice, restoreHeight);
if (success)
{
emit walletCreationHeightChanged();
qDebug() << "init async finished - starting refresh";
connected(true);
@ -262,7 +251,6 @@ void Wallet::initAsync(const QString &daemonAddress, quint64 upperTransactionLim
}
});
watcher->setFuture(future);
}
//! create a view only wallet
@ -348,16 +336,32 @@ void Wallet::setSubaddressLabel(quint32 accountIndex, quint32 addressIndex, cons
m_walletImpl->setSubaddressLabel(accountIndex, addressIndex, label.toStdString());
}
void Wallet::refreshHeightAsync() const
void Wallet::refreshHeightAsync()
{
QtConcurrent::run([this] {
QFuture<quint64> daemonHeight = QtConcurrent::run([this] {
return daemonBlockChainHeight();
m_scheduler.run([this] {
quint64 daemonHeight;
QPair<bool, QFuture<void>> daemonHeightFuture = m_scheduler.run([this, &daemonHeight] {
daemonHeight = daemonBlockChainHeight();
});
QFuture<quint64> targetHeight = QtConcurrent::run([this] {
return daemonBlockChainTargetHeight();
if (!daemonHeightFuture.first)
{
return;
}
quint64 targetHeight;
QPair<bool, QFuture<void>> targetHeightFuture = m_scheduler.run([this, &targetHeight] {
targetHeight = daemonBlockChainTargetHeight();
});
emit heightRefreshed(blockChainHeight(), daemonHeight.result(), targetHeight.result());
if (!targetHeightFuture.first)
{
return;
}
quint64 walletHeight = blockChainHeight();
daemonHeightFuture.second.waitForFinished();
targetHeightFuture.second.waitForFinished();
emit heightRefreshed(walletHeight, daemonHeight, targetHeight);
});
}
@ -458,17 +462,10 @@ void Wallet::createTransactionAsync(const QString &dst_addr, const QString &paym
quint64 amount, quint32 mixin_count,
PendingTransaction::Priority priority)
{
QFuture<PendingTransaction*> future = QtConcurrent::run(this, &Wallet::createTransaction,
dst_addr, payment_id,amount, mixin_count, priority);
QFutureWatcher<PendingTransaction*> * watcher = new QFutureWatcher<PendingTransaction*>();
connect(watcher, &QFutureWatcher<PendingTransaction*>::finished,
this, [this, watcher,dst_addr,payment_id,mixin_count]() {
QFuture<PendingTransaction*> future = watcher->future();
watcher->deleteLater();
emit transactionCreated(future.result(),dst_addr,payment_id,mixin_count);
m_scheduler.run([this, dst_addr, payment_id, amount, mixin_count, priority] {
PendingTransaction *tx = createTransaction(dst_addr, payment_id, amount, mixin_count, priority);
emit transactionCreated(tx, dst_addr, payment_id, mixin_count);
});
watcher->setFuture(future);
}
PendingTransaction *Wallet::createTransactionAll(const QString &dst_addr, const QString &payment_id,
@ -486,17 +483,10 @@ void Wallet::createTransactionAllAsync(const QString &dst_addr, const QString &p
quint32 mixin_count,
PendingTransaction::Priority priority)
{
QFuture<PendingTransaction*> future = QtConcurrent::run(this, &Wallet::createTransactionAll,
dst_addr, payment_id, mixin_count, priority);
QFutureWatcher<PendingTransaction*> * watcher = new QFutureWatcher<PendingTransaction*>();
connect(watcher, &QFutureWatcher<PendingTransaction*>::finished,
this, [this, watcher,dst_addr,payment_id,mixin_count]() {
QFuture<PendingTransaction*> future = watcher->future();
watcher->deleteLater();
emit transactionCreated(future.result(),dst_addr,payment_id,mixin_count);
m_scheduler.run([this, dst_addr, payment_id, mixin_count, priority] {
PendingTransaction *tx = createTransactionAll(dst_addr, payment_id, mixin_count, priority);
emit transactionCreated(tx, dst_addr, payment_id, mixin_count);
});
watcher->setFuture(future);
}
PendingTransaction *Wallet::createSweepUnmixableTransaction()
@ -508,16 +498,10 @@ PendingTransaction *Wallet::createSweepUnmixableTransaction()
void Wallet::createSweepUnmixableTransactionAsync()
{
QFuture<PendingTransaction*> future = QtConcurrent::run(this, &Wallet::createSweepUnmixableTransaction);
QFutureWatcher<PendingTransaction*> * watcher = new QFutureWatcher<PendingTransaction*>();
connect(watcher, &QFutureWatcher<PendingTransaction*>::finished,
this, [this, watcher]() {
QFuture<PendingTransaction*> future = watcher->future();
watcher->deleteLater();
emit transactionCreated(future.result(),"","",0);
m_scheduler.run([this] {
PendingTransaction *tx = createSweepUnmixableTransaction();
emit transactionCreated(tx, "", "", 0);
});
watcher->setFuture(future);
}
UnsignedTransaction * Wallet::loadTxFile(const QString &fileName)
@ -539,18 +523,9 @@ bool Wallet::submitTxFile(const QString &fileName) const
void Wallet::commitTransactionAsync(PendingTransaction *t)
{
QStringList txid(t->txid());
QFuture<bool> future = QtConcurrent::run(t, &PendingTransaction::commit);
QFutureWatcher<bool> * watcher = new QFutureWatcher<bool>();
connect(watcher, &QFutureWatcher<bool>::finished,
this, [this, watcher, t, txid]() {
QFuture<bool> future = watcher->future();
watcher->deleteLater();
emit transactionCommitted(future.result(), t, txid);
});
watcher->setFuture(future);
m_scheduler.run([this, t] {
emit transactionCommitted(t->commit(), t, t->txid());
});
}
void Wallet::disposeTransaction(PendingTransaction *t)
@ -662,23 +637,11 @@ QString Wallet::getTxKey(const QString &txid) const
return QString::fromStdString(m_walletImpl->getTxKey(txid.toStdString()));
}
void Wallet::getTxKeyAsync(const QString &txid, const QJSValue &ref)
void Wallet::getTxKeyAsync(const QString &txid, const QJSValue &callback)
{
QFuture<QString> future = QtConcurrent::run(this, &Wallet::getTxKey, txid);
auto watcher = new QFutureWatcher<QString>(this);
connect(watcher, &QFutureWatcher<QString>::finished,
this, [watcher, txid, ref]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
auto txKey = future.result();
if (ref.isCallable()){
QJSValue cb(ref);
cb.call(QJSValueList {txid, txKey});
}
});
watcher->setFuture(future);
m_scheduler.run([this, txid] {
return QJSValueList({txid, getTxKey(txid)});
}, callback);
}
QString Wallet::checkTxKey(const QString &txid, const QString &tx_key, const QString &address)
@ -699,23 +662,11 @@ QString Wallet::getTxProof(const QString &txid, const QString &address, const QS
return QString::fromStdString(result);
}
void Wallet::getTxProofAsync(const QString &txid, const QString &address, const QString &message, const QJSValue &ref)
void Wallet::getTxProofAsync(const QString &txid, const QString &address, const QString &message, const QJSValue &callback)
{
QFuture<QString> future = QtConcurrent::run(this, &Wallet::getTxProof, txid, address, message);
auto watcher = new QFutureWatcher<QString>(this);
connect(watcher, &QFutureWatcher<QString>::finished,
this, [watcher, txid, ref]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
auto proof = future.result();
if (ref.isCallable()){
QJSValue cb(ref);
cb.call(QJSValueList {txid, proof});
}
});
watcher->setFuture(future);
m_scheduler.run([this, txid, address, message] {
return QJSValueList({txid, getTxProof(txid, address, message)});
}, callback);
}
QString Wallet::checkTxProof(const QString &txid, const QString &address, const QString &message, const QString &signature)
@ -737,23 +688,11 @@ Q_INVOKABLE QString Wallet::getSpendProof(const QString &txid, const QString &me
return QString::fromStdString(result);
}
void Wallet::getSpendProofAsync(const QString &txid, const QString &message, const QJSValue &ref)
void Wallet::getSpendProofAsync(const QString &txid, const QString &message, const QJSValue &callback)
{
QFuture<QString> future = QtConcurrent::run(this, &Wallet::getSpendProof, txid, message);
auto watcher = new QFutureWatcher<QString>(this);
connect(watcher, &QFutureWatcher<QString>::finished,
this, [watcher, txid, ref]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
auto proof = future.result();
if (ref.isCallable()){
QJSValue cb(ref);
cb.call(QJSValueList {txid, proof});
}
});
watcher->setFuture(future);
m_scheduler.run([this, txid, message] {
return QJSValueList({txid, getSpendProof(txid, message)});
}, callback);
}
Q_INVOKABLE QString Wallet::checkSpendProof(const QString &txid, const QString &message, const QString &signature) const
@ -1007,6 +946,7 @@ Wallet::Wallet(Monero::Wallet *w, QObject *parent)
, m_daemonBlockChainTargetHeightTtl(DAEMON_BLOCKCHAIN_TARGET_HEIGHT_CACHE_TTL_SECONDS)
, m_connectionStatusTtl(WALLET_CONNECTION_STATUS_CACHE_TTL_SECONDS)
, m_currentSubaddressAccount(0)
, m_scheduler(this)
{
m_history = new TransactionHistory(m_walletImpl->history(), this);
m_addressBook = new AddressBook(m_walletImpl->addressBook(), this);
@ -1028,6 +968,9 @@ Wallet::Wallet(Monero::Wallet *w, QObject *parent)
Wallet::~Wallet()
{
qDebug("~Wallet: Closing wallet");
m_scheduler.shutdownWaitForFinished();
delete m_addressBook;
m_addressBook = NULL;

View file

@ -37,6 +37,7 @@
#include <QtConcurrent/QtConcurrent>
#include "wallet/api/wallet2_api.h" // we need to have an access to the Monero::Wallet::Status enum here;
#include "qt/FutureScheduler.h"
#include "PendingTransaction.h" // we need to have an access to the PendingTransaction::Priority enum here;
#include "UnsignedTransaction.h"
#include "NetworkType.h"
@ -182,7 +183,7 @@ public:
//! returns if view only wallet
Q_INVOKABLE bool viewOnly() const;
Q_INVOKABLE void refreshHeightAsync() const;
Q_INVOKABLE void refreshHeightAsync();
//! export/import key images
Q_INVOKABLE bool exportKeyImages(const QString& path);
@ -290,13 +291,13 @@ public:
Q_INVOKABLE bool setUserNote(const QString &txid, const QString &note);
Q_INVOKABLE QString getUserNote(const QString &txid) const;
Q_INVOKABLE QString getTxKey(const QString &txid) const;
Q_INVOKABLE void getTxKeyAsync(const QString &txid, const QJSValue &ref);
Q_INVOKABLE void getTxKeyAsync(const QString &txid, const QJSValue &callback);
Q_INVOKABLE QString checkTxKey(const QString &txid, const QString &tx_key, const QString &address);
Q_INVOKABLE QString getTxProof(const QString &txid, const QString &address, const QString &message) const;
Q_INVOKABLE void getTxProofAsync(const QString &txid, const QString &address, const QString &message, const QJSValue &ref);
Q_INVOKABLE void getTxProofAsync(const QString &txid, const QString &address, const QString &message, const QJSValue &callback);
Q_INVOKABLE QString checkTxProof(const QString &txid, const QString &address, const QString &message, const QString &signature);
Q_INVOKABLE QString getSpendProof(const QString &txid, const QString &message) const;
Q_INVOKABLE void getSpendProofAsync(const QString &txid, const QString &message, const QJSValue &ref);
Q_INVOKABLE void getSpendProofAsync(const QString &txid, const QString &message, const QJSValue &callback);
Q_INVOKABLE QString checkSpendProof(const QString &txid, const QString &message, const QString &signature) const;
// Rescan spent outputs
Q_INVOKABLE bool rescanSpent();
@ -356,7 +357,7 @@ signals:
// emitted when transaction is created async
void transactionCreated(PendingTransaction * transaction, QString address, QString paymentId, quint32 mixinCount);
void connectionStatusChanged(ConnectionStatus status) const;
void connectionStatusChanged(int status) const;
private:
Wallet(QObject * parent = nullptr);
@ -406,6 +407,7 @@ private:
QString m_daemonUsername;
QString m_daemonPassword;
Monero::WalletListener *m_walletListener;
FutureScheduler m_scheduler;
};

View file

@ -145,17 +145,9 @@ Wallet *WalletManager::openWallet(const QString &path, const QString &password,
void WalletManager::openWalletAsync(const QString &path, const QString &password, NetworkType::Type nettype, quint64 kdfRounds)
{
QFuture<Wallet*> future = QtConcurrent::run(this, &WalletManager::openWallet,
path, password, nettype, kdfRounds);
QFutureWatcher<Wallet*> * watcher = new QFutureWatcher<Wallet*>();
connect(watcher, &QFutureWatcher<Wallet*>::finished,
this, [this, watcher]() {
QFuture<Wallet*> future = watcher->future();
watcher->deleteLater();
emit walletOpened(future.result());
m_scheduler.run([this, path, password, nettype, kdfRounds] {
emit walletOpened(openWallet(path, password, nettype, kdfRounds));
});
watcher->setFuture(future);
}
@ -216,21 +208,10 @@ Wallet *WalletManager::createWalletFromDevice(const QString &path, const QString
void WalletManager::createWalletFromDeviceAsync(const QString &path, const QString &password, NetworkType::Type nettype,
const QString &deviceName, quint64 restoreHeight, const QString &subaddressLookahead)
{
auto lmbd = [=](){
return this->createWalletFromDevice(path, password, nettype, deviceName, restoreHeight, subaddressLookahead);
};
QFuture<Wallet *> future = QtConcurrent::run(lmbd);
QFutureWatcher<Wallet *> * watcher = new QFutureWatcher<Wallet *>();
connect(watcher, &QFutureWatcher<Wallet *>::finished,
this, [this, watcher]() {
QFuture<Wallet *> future = watcher->future();
watcher->deleteLater();
emit walletCreated(future.result());
});
watcher->setFuture(future);
m_scheduler.run([this, path, password, nettype, deviceName, restoreHeight, subaddressLookahead] {
Wallet *wallet = createWalletFromDevice(path, password, nettype, deviceName, restoreHeight, subaddressLookahead);
emit walletCreated(wallet);
});
}
QString WalletManager::closeWallet()
@ -249,16 +230,9 @@ QString WalletManager::closeWallet()
void WalletManager::closeWalletAsync()
{
QFuture<QString> future = QtConcurrent::run(this, &WalletManager::closeWallet);
QFutureWatcher<QString> * watcher = new QFutureWatcher<QString>();
connect(watcher, &QFutureWatcher<QString>::finished,
this, [this, watcher]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
emit walletClosed(future.result());
m_scheduler.run([this] {
emit walletClosed(closeWallet());
});
watcher->setFuture(future);
}
bool WalletManager::walletExists(const QString &path) const
@ -333,7 +307,7 @@ QString WalletManager::paymentIdFromAddress(const QString &address, NetworkType:
void WalletManager::setDaemonAddressAsync(const QString &address)
{
QtConcurrent::run([this, address] {
m_scheduler.run([this, address] {
m_pimpl->setDaemonAddress(address.toStdString());
});
}
@ -376,9 +350,9 @@ bool WalletManager::isMining() const
return m_pimpl->isMining();
}
void WalletManager::miningStatusAsync() const
void WalletManager::miningStatusAsync()
{
QtConcurrent::run([this] {
m_scheduler.run([this] {
emit miningStatus(isMining());
});
}
@ -488,19 +462,11 @@ bool WalletManager::saveQrCode(const QString &code, const QString &path) const
return QRCodeImageProvider::genQrImage(code, &size).scaled(size.expandedTo(QSize(240, 240)), Qt::KeepAspectRatio).save(path, "PNG", 100);
}
void WalletManager::checkUpdatesAsync(const QString &software, const QString &subdir) const
void WalletManager::checkUpdatesAsync(const QString &software, const QString &subdir)
{
QFuture<QString> future = QtConcurrent::run(this, &WalletManager::checkUpdates,
software, subdir);
QFutureWatcher<QString> * watcher = new QFutureWatcher<QString>();
connect(watcher, &QFutureWatcher<Wallet*>::finished,
this, [this, watcher]() {
QFuture<QString> future = watcher->future();
watcher->deleteLater();
qDebug() << "Checking for updates - done";
emit checkUpdatesComplete(future.result());
m_scheduler.run([this, software, subdir] {
emit checkUpdatesComplete(checkUpdates(software, subdir));
});
watcher->setFuture(future);
}
@ -532,11 +498,18 @@ bool WalletManager::clearWalletCache(const QString &wallet_path) const
return walletCache.rename(newFileName);
}
WalletManager::WalletManager(QObject *parent) : QObject(parent)
WalletManager::WalletManager(QObject *parent)
: QObject(parent)
, m_scheduler(this)
{
m_pimpl = Monero::WalletManagerFactory::getWalletManager();
}
WalletManager::~WalletManager()
{
m_scheduler.shutdownWaitForFinished();
}
void WalletManager::onWalletPassphraseNeeded(Monero::Wallet *)
{
m_mutex_pass.lock();

View file

@ -37,6 +37,7 @@
#include <QPointer>
#include <QWaitCondition>
#include <QMutex>
#include "qt/FutureScheduler.h"
#include "NetworkType.h"
class Wallet;
@ -151,7 +152,7 @@ public:
Q_INVOKABLE bool localDaemonSynced() const;
Q_INVOKABLE bool isDaemonLocal(const QString &daemon_address) const;
Q_INVOKABLE void miningStatusAsync() const;
Q_INVOKABLE void miningStatusAsync();
Q_INVOKABLE bool startMining(const QString &address, quint32 threads, bool backgroundMining, bool ignoreBattery);
Q_INVOKABLE bool stopMining();
@ -175,7 +176,7 @@ public:
Q_INVOKABLE bool parse_uri(const QString &uri, QString &address, QString &payment_id, uint64_t &amount, QString &tx_description, QString &recipient_name, QVector<QString> &unknown_parameters, QString &error) const;
Q_INVOKABLE QVariantMap parse_uri_to_object(const QString &uri) const;
Q_INVOKABLE bool saveQrCode(const QString &, const QString &) const;
Q_INVOKABLE void checkUpdatesAsync(const QString &software, const QString &subdir) const;
Q_INVOKABLE void checkUpdatesAsync(const QString &software, const QString &subdir);
Q_INVOKABLE QString checkUpdates(const QString &software, const QString &subdir) const;
// clear/rename wallet cache
@ -200,6 +201,7 @@ private:
friend class WalletPassphraseListenerImpl;
explicit WalletManager(QObject *parent = 0);
~WalletManager();
bool isMining() const;
@ -212,6 +214,8 @@ private:
QMutex m_mutex_pass;
QString m_passphrase;
bool m_passphrase_abort;
FutureScheduler m_scheduler;
};
#endif // WALLETMANAGER_H

View file

@ -0,0 +1,89 @@
#include "FutureScheduler.h"
FutureScheduler::FutureScheduler(QObject *parent)
: QObject(parent), Alive(0), Stopping(false)
{
}
FutureScheduler::~FutureScheduler()
{
shutdownWaitForFinished();
}
void FutureScheduler::shutdownWaitForFinished() noexcept
{
QMutexLocker locker(&Mutex);
Stopping = true;
while (Alive > 0)
{
Condition.wait(&Mutex);
}
}
QPair<bool, QFuture<void>> FutureScheduler::run(std::function<void()> function) noexcept
{
return execute<void>([this, function](QFutureWatcher<void> *) {
return QtConcurrent::run([this, function] {
try
{
function();
}
catch (const std::exception &exception)
{
qWarning() << "Exception thrown from async function: " << exception.what();
}
done();
});
});
}
QPair<bool, QFuture<QJSValueList>> FutureScheduler::run(std::function<QJSValueList() noexcept> function, const QJSValue &callback) noexcept
{
if (!callback.isCallable())
{
throw std::runtime_error("js callback must be callable");
}
return execute<QJSValueList>([this, function, callback](QFutureWatcher<QJSValueList> *watcher) {
connect(watcher, &QFutureWatcher<QJSValueList>::finished, [watcher, callback] {
QJSValue(callback).call(watcher->future().result());
});
return QtConcurrent::run([this, function] {
QJSValueList result;
try
{
result = function();
}
catch (const std::exception &exception)
{
qWarning() << "Exception thrown from async function: " << exception.what();
}
done();
return result;
});
});
}
bool FutureScheduler::add() noexcept
{
QMutexLocker locker(&Mutex);
if (Stopping)
{
return false;
}
++Alive;
return true;
}
void FutureScheduler::done() noexcept
{
{
QMutexLocker locker(&Mutex);
--Alive;
}
Condition.wakeAll();
}

79
src/qt/FutureScheduler.h Normal file
View file

@ -0,0 +1,79 @@
#ifndef FUTURE_SCHEDULER_H
#define FUTURE_SCHEDULER_H
#include <functional>
#include <QtConcurrent/QtConcurrent>
#include <QFuture>
#include <QJSValue>
#include <QMutex>
#include <QMutexLocker>
#include <QPair>
#include <QWaitCondition>
class FutureScheduler : public QObject
{
Q_OBJECT
public:
FutureScheduler(QObject *parent);
~FutureScheduler();
void shutdownWaitForFinished() noexcept;
QPair<bool, QFuture<void>> run(std::function<void()> function) noexcept;
QPair<bool, QFuture<QJSValueList>> run(std::function<QJSValueList() noexcept> function, const QJSValue &callback) noexcept;
private:
bool add() noexcept;
void done() noexcept;
template<typename T>
QFutureWatcher<T> *newWatcher()
{
QFutureWatcher<T> *watcher = new QFutureWatcher<T>();
QThread *schedulerThread = this->thread();
if (watcher->thread() != schedulerThread)
{
watcher->moveToThread(schedulerThread);
}
watcher->setParent(this);
return watcher;
}
template<typename T>
QPair<bool, QFuture<T>> execute(std::function<QFuture<T>(QFutureWatcher<T> *)> makeFuture) noexcept
{
if (add())
{
try
{
auto *watcher = newWatcher<T>();
watcher->setFuture(makeFuture(watcher));
connect(watcher, &QFutureWatcher<T>::finished, [this, watcher] {
watcher->deleteLater();
});
return qMakePair(true, watcher->future());
}
catch (const std::exception &exception)
{
qCritical() << "Failed to schedule async function: " << exception.what();
done();
}
}
return qMakePair(false, QFuture<T>());
}
QFutureWatcher<void> schedule(std::function<void()> function);
QFutureWatcher<QJSValueList> schedule(std::function<QJSValueList() noexcept> function, const QJSValue &callback);
private:
size_t Alive;
QWaitCondition Condition;
QMutex Mutex;
bool Stopping;
};
#endif // FUTURE_SCHEDULER_H