mirror of
https://github.com/Rucknium/misc-research.git
synced 2024-11-16 15:58:03 +00:00
Initial mempool archive collection scripts
This commit is contained in:
parent
912f221f68
commit
38f0eb2d7f
6 changed files with 325 additions and 0 deletions
97
General-Mempool-Archive/collect-archive.R
Normal file
97
General-Mempool-Archive/collect-archive.R
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
|
||||||
|
script.args <- commandArgs(trailingOnly = TRUE)
|
||||||
|
|
||||||
|
stopifnot(length(script.args) == 2 && grepl("(bch)|(ltc)|(doge)", script.args[1], ignore.case = TRUE))
|
||||||
|
|
||||||
|
blockchain.name <- tolower(script.args[1])
|
||||||
|
|
||||||
|
blockchain.conf.file <- script.args[2]
|
||||||
|
|
||||||
|
blockchain.config <- rbch::conrpc(blockchain.conf.file)
|
||||||
|
rpcport <- readLines(blockchain.conf.file)
|
||||||
|
rpcport <- rpcport[grepl("rpcport", rpcport) ]
|
||||||
|
if (length(rpcport) > 0) {
|
||||||
|
blockchain.config@url <- paste0("http://127.0.0.1:", gsub("[^0-9]", "", rpcport))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
tx.pool <- c()
|
||||||
|
|
||||||
|
# Check that node is responding
|
||||||
|
while(length(tx.pool) == 0) {
|
||||||
|
tx.pool <- rbch::getrawmempool(blockchain.config)@result
|
||||||
|
Sys.sleep(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
con <- DBI::dbConnect(RSQLite::SQLite(), paste0(blockchain.name, "-mempool-archive.db"))
|
||||||
|
DBI::dbExecute(con, "PRAGMA journal_mode=WAL;")
|
||||||
|
# export-csv.R can read while collect-archive.R writes
|
||||||
|
# https://stackoverflow.com/questions/15143871/simplest-way-to-retry-sqlite-query-if-db-is-locked
|
||||||
|
|
||||||
|
while (TRUE) {
|
||||||
|
|
||||||
|
compute.time <- system.time({
|
||||||
|
|
||||||
|
tx.pool <- rbch::getrawmempool(blockchain.config)@result
|
||||||
|
|
||||||
|
bestblockhash <- rbch::getbestblockhash(blockchain.config)@result
|
||||||
|
|
||||||
|
block.header <- rbch::getblockheader(blockchain.config, bestblockhash)@result
|
||||||
|
|
||||||
|
block_receive_time <- round(Sys.time())
|
||||||
|
# One second time resolution
|
||||||
|
|
||||||
|
if (length(tx.pool) > 0) {
|
||||||
|
|
||||||
|
txs <- vector(mode = "list", length = length(tx.pool))
|
||||||
|
|
||||||
|
if (blockchain.name != "ltc") {
|
||||||
|
for (i in seq_along(tx.pool)) {
|
||||||
|
txs[[i]] <- data.table::data.table(
|
||||||
|
id_hash = names(tx.pool)[i],
|
||||||
|
fee = tx.pool[[i]]$fee,
|
||||||
|
weight = tx.pool[[i]]$size,
|
||||||
|
receive_time = tx.pool[[i]]$time)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (i in seq_along(tx.pool)) {
|
||||||
|
txs[[i]] <- data.table::data.table(
|
||||||
|
id_hash = names(tx.pool)[i],
|
||||||
|
fee = tx.pool[[i]]$fee,
|
||||||
|
weight = tx.pool[[i]]$vsize,
|
||||||
|
receive_time = tx.pool[[i]]$time)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
txs <- data.table::rbindlist(txs)
|
||||||
|
|
||||||
|
tx.statement <- DBI::dbSendQuery(con,
|
||||||
|
"INSERT OR IGNORE INTO txs VALUES (:id_hash,:fee,:weight,:receive_time)")
|
||||||
|
# "IGNORE" prevents the same txs from being inserted more than once
|
||||||
|
DBI::dbBind(tx.statement, params = txs)
|
||||||
|
DBI::dbClearResult(tx.statement)
|
||||||
|
|
||||||
|
blocks <- data.table::data.table(
|
||||||
|
block_hash = block.header$hash,
|
||||||
|
prev_block_hash = block.header$previousblockhash,
|
||||||
|
block_height = block.header$height,
|
||||||
|
block_timestamp = block.header$time,
|
||||||
|
block_receive_time = as.character(as.numeric(block_receive_time))
|
||||||
|
)
|
||||||
|
|
||||||
|
block.statement <- DBI::dbSendQuery(con,
|
||||||
|
"INSERT OR IGNORE INTO blocks VALUES (:block_hash,:prev_block_hash,:block_height,:block_timestamp,:block_receive_time)")
|
||||||
|
# "IGNORE" prevents the same blocks from being inserted more than once
|
||||||
|
DBI::dbBind(block.statement, params = blocks)
|
||||||
|
DBI::dbClearResult(block.statement)
|
||||||
|
|
||||||
|
}
|
||||||
|
})
|
||||||
|
print(compute.time["elapsed"])
|
||||||
|
Sys.sleep(max(c(0, 1 - compute.time["elapsed"])))
|
||||||
|
# Should poll once per second unless data processing takes more than one second. In
|
||||||
|
# that case, polls as frequently as possible.
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
19
General-Mempool-Archive/export-csv.R
Normal file
19
General-Mempool-Archive/export-csv.R
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
|
||||||
|
script.args <- commandArgs(trailingOnly = TRUE)
|
||||||
|
stopifnot(length(script.args) == 1 && grepl("(bch)|(ltc)|(doge)", script.args, ignore.case = TRUE))
|
||||||
|
blockchain.name <- tolower(script.args)
|
||||||
|
|
||||||
|
con <- DBI::dbConnect(RSQLite::SQLite(), paste0(blockchain.name, "-mempool-archive.db"))
|
||||||
|
DBI::dbExecute(con, "PRAGMA journal_mode=WAL;")
|
||||||
|
# export-csv.R can read while collect-archive.R writes
|
||||||
|
# https://stackoverflow.com/questions/15143871/simplest-way-to-retry-sqlite-query-if-db-is-locked
|
||||||
|
|
||||||
|
file.time <- Sys.time()
|
||||||
|
|
||||||
|
txs <- DBI::dbGetQuery(con, "SELECT * FROM txs")
|
||||||
|
txs$receive_time_UTC <- as.POSIXct(as.integer(txs$receive_time), origin = "1970-01-01")
|
||||||
|
write.csv(txs, paste0(blockchain.name, "-mempool-archive-", gsub("( )|([:])", "-", file.time), ".csv"), row.names = FALSE)
|
||||||
|
|
||||||
|
blocks <- DBI::dbGetQuery(con, "SELECT * FROM blocks")
|
||||||
|
blocks$block_receive_time_UTC <- as.POSIXct(as.integer(blocks$block_receive_time), origin = "1970-01-01")
|
||||||
|
write.csv(blocks, paste0(blockchain.name, "-block-archive-", gsub("( )|([:])", "-", file.time), ".csv"), row.names = FALSE)
|
35
General-Mempool-Archive/initialize-archive.R
Normal file
35
General-Mempool-Archive/initialize-archive.R
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
|
||||||
|
if (!require(RSQLite)) { install.packages("RSQLite") }
|
||||||
|
if (!require(data.table)) { install.packages("data.table") }
|
||||||
|
if (!require(RJSONIO)) { install.packages("RJSONIO") }
|
||||||
|
if (!require(RCurl)) { install.packages("RCurl") }
|
||||||
|
if (!require(rbch)) { install.packages("rbch") }
|
||||||
|
|
||||||
|
script.args <- commandArgs(trailingOnly = TRUE)
|
||||||
|
|
||||||
|
stopifnot(length(script.args) == 1 && grepl("(bch)|(ltc)|(doge)", script.args, ignore.case = TRUE))
|
||||||
|
|
||||||
|
blockchain.name <- tolower(script.args)
|
||||||
|
|
||||||
|
con <- DBI::dbConnect(RSQLite::SQLite(), paste0(blockchain.name, "-mempool-archive.db"))
|
||||||
|
DBI::dbExecute(con, "PRAGMA journal_mode=WAL;")
|
||||||
|
# export-csv.R can read while collect-archive.R writes
|
||||||
|
# https://stackoverflow.com/questions/15143871/simplest-way-to-retry-sqlite-query-if-db-is-locked
|
||||||
|
|
||||||
|
DBI::dbExecute(con, "CREATE TABLE txs (
|
||||||
|
id_hash TEXT,
|
||||||
|
fee TEXT,
|
||||||
|
weight TEXT,
|
||||||
|
receive_time TEXT,
|
||||||
|
unique(id_hash)
|
||||||
|
)")
|
||||||
|
|
||||||
|
DBI::dbExecute(con, "CREATE TABLE blocks (
|
||||||
|
block_hash TEXT,
|
||||||
|
prev_block_hash TEXT,
|
||||||
|
block_height TEXT,
|
||||||
|
block_timestamp TEXT,
|
||||||
|
block_receive_time TEXT,
|
||||||
|
unique(block_hash)
|
||||||
|
)")
|
||||||
|
|
129
Monero-Mempool-Archive/collect-archive.R
Normal file
129
Monero-Mempool-Archive/collect-archive.R
Normal file
|
@ -0,0 +1,129 @@
|
||||||
|
|
||||||
|
# Modified from TownforgeR::tf_rpc_curl function
|
||||||
|
xmr.rpc <- function(
|
||||||
|
url.rpc = "http://127.0.0.1:18081/json_rpc",
|
||||||
|
method = "",
|
||||||
|
params = list(),
|
||||||
|
userpwd = "",
|
||||||
|
num.as.string = TRUE,
|
||||||
|
nonce.as.string = FALSE,
|
||||||
|
keep.trying.rpc = FALSE,
|
||||||
|
...
|
||||||
|
){
|
||||||
|
|
||||||
|
json.ret <- RJSONIO::toJSON(
|
||||||
|
list(
|
||||||
|
jsonrpc = "2.0",
|
||||||
|
id = "0",
|
||||||
|
method = method,
|
||||||
|
params = params
|
||||||
|
), digits = 50
|
||||||
|
)
|
||||||
|
|
||||||
|
rcp.ret <- tryCatch(RCurl::postForm(url.rpc,
|
||||||
|
.opts = list(
|
||||||
|
userpwd = userpwd,
|
||||||
|
postfields = json.ret,
|
||||||
|
httpheader = c('Content-Type' = 'application/json', Accept = 'application/json')
|
||||||
|
# https://stackoverflow.com/questions/19267261/timeout-while-reading-csv-file-from-url-in-r
|
||||||
|
)
|
||||||
|
), error = function(e) {NULL})
|
||||||
|
|
||||||
|
if (keep.trying.rpc && length(rcp.ret) == 0) {
|
||||||
|
while (length(rcp.ret) == 0) {
|
||||||
|
rcp.ret <- tryCatch(RCurl::postForm(url.rpc,
|
||||||
|
.opts = list(
|
||||||
|
userpwd = userpwd,
|
||||||
|
postfields = json.ret,
|
||||||
|
httpheader = c('Content-Type' = 'application/json', Accept = 'application/json')
|
||||||
|
# https://stackoverflow.com/questions/19267261/timeout-while-reading-csv-file-from-url-in-r
|
||||||
|
)
|
||||||
|
), error = function(e) {NULL})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is.null(rcp.ret)) {
|
||||||
|
stop("Cannot connect to monerod. Is monerod running?")
|
||||||
|
}
|
||||||
|
|
||||||
|
if (num.as.string) {
|
||||||
|
rcp.ret <- gsub("(: )([-0123456789.]+)([,\n\r])", "\\1\"\\2\"\\3", rcp.ret )
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nonce.as.string & ! num.as.string) {
|
||||||
|
rcp.ret <- gsub("(\"nonce\": )([-0123456789.]+)([,\n\r])", "\\1\"\\2\"\\3", rcp.ret )
|
||||||
|
}
|
||||||
|
|
||||||
|
RJSONIO::fromJSON(rcp.ret) # , simplify = FALSE
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.pool <- c()
|
||||||
|
|
||||||
|
# Check that node is responding
|
||||||
|
while(length(tx.pool) == 0) {
|
||||||
|
tx.pool <- xmr.rpc("http://127.0.0.1:18081/get_transaction_pool")$transactions
|
||||||
|
|
||||||
|
if (length(tx.pool) > 0 && tx.pool[[1]]$receive_time == 0) {
|
||||||
|
error("Transaction receive_time is missing. Possible solution: remove '--restricted-rpc' monerod flag.")
|
||||||
|
}
|
||||||
|
Sys.sleep(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
con <- DBI::dbConnect(RSQLite::SQLite(), "xmr-mempool-archive.db")
|
||||||
|
DBI::dbExecute(con, "PRAGMA journal_mode=WAL;")
|
||||||
|
# export-csv.R can read while collect-archive.R writes
|
||||||
|
# https://stackoverflow.com/questions/15143871/simplest-way-to-retry-sqlite-query-if-db-is-locked
|
||||||
|
|
||||||
|
while (TRUE) {
|
||||||
|
|
||||||
|
compute.time <- system.time({
|
||||||
|
|
||||||
|
tx.pool <- xmr.rpc("http://127.0.0.1:18081/get_transaction_pool", keep.trying.rpc = TRUE)$transactions
|
||||||
|
|
||||||
|
block.header <- xmr.rpc(url.rpc = "http://127.0.0.1:18081/json_rpc", method = "get_last_block_header")$result$block_header
|
||||||
|
|
||||||
|
block_receive_time <- round(Sys.time())
|
||||||
|
# One second time resolution
|
||||||
|
|
||||||
|
if (length(tx.pool) > 0) {
|
||||||
|
|
||||||
|
txs <- vector(mode = "list", length = length(tx.pool))
|
||||||
|
|
||||||
|
for (i in seq_along(tx.pool)) {
|
||||||
|
txs[[i]] <- data.table::data.table(
|
||||||
|
id_hash = tx.pool[[i]]$id_hash,
|
||||||
|
fee = tx.pool[[i]]$fee,
|
||||||
|
weight = tx.pool[[i]]$weight,
|
||||||
|
receive_time = tx.pool[[i]]$receive_time)
|
||||||
|
}
|
||||||
|
|
||||||
|
txs <- data.table::rbindlist(txs)
|
||||||
|
|
||||||
|
tx.statement <- DBI::dbSendQuery(con,
|
||||||
|
"INSERT OR IGNORE INTO txs VALUES (:id_hash,:fee,:weight,:receive_time)")
|
||||||
|
# "IGNORE" prevents the same txs from being inserted more than once
|
||||||
|
DBI::dbBind(tx.statement, params = txs)
|
||||||
|
DBI::dbClearResult(tx.statement)
|
||||||
|
|
||||||
|
blocks <- data.table::data.table(
|
||||||
|
block_hash = block.header$hash,
|
||||||
|
prev_block_hash = block.header$prev_hash,
|
||||||
|
block_height = block.header$height,
|
||||||
|
block_timestamp = block.header$timestamp,
|
||||||
|
block_receive_time = as.character(as.numeric(block_receive_time))
|
||||||
|
)
|
||||||
|
|
||||||
|
block.statement <- DBI::dbSendQuery(con,
|
||||||
|
"INSERT OR IGNORE INTO blocks VALUES (:block_hash,:prev_block_hash,:block_height,:block_timestamp,:block_receive_time)")
|
||||||
|
# "IGNORE" prevents the same blocks from being inserted more than once
|
||||||
|
DBI::dbBind(block.statement, params = blocks)
|
||||||
|
DBI::dbClearResult(block.statement)
|
||||||
|
|
||||||
|
}
|
||||||
|
})
|
||||||
|
print(compute.time["elapsed"])
|
||||||
|
Sys.sleep(max(c(0, 1 - compute.time["elapsed"])))
|
||||||
|
# Should poll once per second unless data processing takes more than one second. In
|
||||||
|
# that case, polls as frequently as possible.
|
||||||
|
}
|
||||||
|
|
15
Monero-Mempool-Archive/export-csv.R
Normal file
15
Monero-Mempool-Archive/export-csv.R
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
|
||||||
|
con <- DBI::dbConnect(RSQLite::SQLite(), "xmr-mempool-archive.db")
|
||||||
|
DBI::dbExecute(con, "PRAGMA journal_mode=WAL;")
|
||||||
|
# export-csv.R can read while collect-archive.R writes
|
||||||
|
# https://stackoverflow.com/questions/15143871/simplest-way-to-retry-sqlite-query-if-db-is-locked
|
||||||
|
|
||||||
|
file.time <- Sys.time()
|
||||||
|
|
||||||
|
txs <- DBI::dbGetQuery(con, "SELECT * FROM txs")
|
||||||
|
txs$receive_time_UTC <- as.POSIXct(as.integer(txs$receive_time), origin = "1970-01-01")
|
||||||
|
write.csv(txs, paste0("xmr-mempool-archive-", gsub("( )|([:])", "-", file.time), ".csv"), row.names = FALSE)
|
||||||
|
|
||||||
|
blocks <- DBI::dbGetQuery(con, "SELECT * FROM blocks")
|
||||||
|
blocks$block_receive_time_UTC <- as.POSIXct(as.integer(blocks$block_receive_time), origin = "1970-01-01")
|
||||||
|
write.csv(blocks, paste0("xmr-block-archive-", gsub("( )|([:])", "-", file.time), ".csv"), row.names = FALSE)
|
30
Monero-Mempool-Archive/initialize-archive.R
Normal file
30
Monero-Mempool-Archive/initialize-archive.R
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
|
||||||
|
if (!require(RSQLite)) { install.packages("RSQLite") }
|
||||||
|
if (!require(data.table)) { install.packages("data.table") }
|
||||||
|
if (!require(RJSONIO)) { install.packages("RJSONIO") }
|
||||||
|
if (!require(RCurl)) { install.packages("RCurl") }
|
||||||
|
|
||||||
|
con <- DBI::dbConnect(RSQLite::SQLite(), "xmr-mempool-archive.db")
|
||||||
|
DBI::dbExecute(con, "PRAGMA journal_mode=WAL;")
|
||||||
|
# export-csv.R can read while collect-archive.R writes
|
||||||
|
# https://stackoverflow.com/questions/15143871/simplest-way-to-retry-sqlite-query-if-db-is-locked
|
||||||
|
|
||||||
|
DBI::dbExecute(con, "CREATE TABLE txs (
|
||||||
|
id_hash TEXT,
|
||||||
|
fee TEXT,
|
||||||
|
weight TEXT,
|
||||||
|
receive_time TEXT,
|
||||||
|
unique(id_hash)
|
||||||
|
)")
|
||||||
|
# unique(id_hash) prevents the same txs being inserted more than once
|
||||||
|
|
||||||
|
DBI::dbExecute(con, "CREATE TABLE blocks (
|
||||||
|
block_hash TEXT,
|
||||||
|
prev_block_hash TEXT,
|
||||||
|
block_height TEXT,
|
||||||
|
block_timestamp TEXT,
|
||||||
|
block_receive_time TEXT,
|
||||||
|
unique(block_hash)
|
||||||
|
)")
|
||||||
|
# unique(block_hash) prevents the same blocks being inserted more than once
|
||||||
|
|
Loading…
Reference in a new issue