xmr-remote-nodes/internal/monero/report.go
Christian Ditaputratama 518d4b4335
feat: Added IPv6 nodes support (alpha) #84
This commit accept IPv6 nodes submission.

When user submit new public node, the server will check IP addresses
from given hostname. If host IP addresses doesn't have IPv4, it will
be recorded as "IPv6 only" node.

Probers that support IPv6 may add `IPV6_CAPABLE=true` to the `.env`
file.

Please note that this feature still experimental and may not being
merged to the main branch.
2024-09-06 00:08:59 +07:00

365 lines
7.8 KiB
Go

package monero
import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"math"
"slices"
"strings"
"time"
"github.com/ditatompel/xmr-remote-nodes/internal/geo"
)
type QueryLogs struct {
NodeID int // 0 for all, >0 for specific node
Status int // -1 for all, 0 for failed, 1 for success
FailedReason string // empty for all, if not empty, will be used as search from failed_reaso
RowsPerPage int
Page int
SortBy string
SortDirection string
}
func (q QueryLogs) toSQL() (args []interface{}, where, sortBy, sortDirection string) {
wq := []string{}
if q.NodeID != 0 {
wq = append(wq, "node_id = ?")
args = append(args, q.NodeID)
}
if q.Status != -1 {
wq = append(wq, "is_available = ?")
args = append(args, q.Status)
}
if q.FailedReason != "" {
wq = append(wq, "failed_reason LIKE ?")
args = append(args, "%"+q.FailedReason+"%")
}
if len(wq) > 0 {
where = "WHERE " + strings.Join(wq, " AND ")
}
as := []string{"date_checked", "fetch_runtime"}
sortBy = "id"
if slices.Contains(as, q.SortBy) {
sortBy = q.SortBy
}
sortDirection = "DESC"
if q.SortDirection == "asc" {
sortDirection = "ASC"
}
return args, where, sortBy, sortDirection
}
type FetchLog struct {
ID int `db:"id" json:"id,omitempty"`
NodeID int `db:"node_id" json:"node_id"`
ProberID int `db:"prober_id" json:"prober_id"`
Status int `db:"is_available" json:"status"`
Height int `db:"height" json:"height"`
AdjustedTime int `db:"adjusted_time" json:"adjusted_time"`
DatabaseSize int `db:"database_size" json:"database_size"`
Difficulty int `db:"difficulty" json:"difficulty"`
EstimateFee int `db:"estimate_fee" json:"estimate_fee"`
DateChecked int `db:"date_checked" json:"date_checked"`
FailedReason string `db:"failed_reason" json:"failed_reason"`
FetchRuntime float64 `db:"fetch_runtime" json:"fetch_runtime"`
}
type FetchLogs struct {
TotalRows int `json:"total_rows"`
RowsPerPage int `json:"rows_per_page"`
Items []*FetchLog `json:"items"`
}
// Logs returns list of fetched log result for given query
func (r *moneroRepo) Logs(q QueryLogs) (FetchLogs, error) {
args, where, sortBy, sortDirection := q.toSQL()
var fetchLogs FetchLogs
fetchLogs.RowsPerPage = q.RowsPerPage
qTotal := fmt.Sprintf(`SELECT COUNT(id) FROM tbl_probe_log %s`, where)
err := r.db.QueryRow(qTotal, args...).Scan(&fetchLogs.TotalRows)
if err != nil {
return fetchLogs, err
}
args = append(args, q.RowsPerPage, (q.Page-1)*q.RowsPerPage)
query := fmt.Sprintf(`
SELECT
*
FROM
tbl_probe_log
%s -- where query
ORDER BY
%s
%s
LIMIT ?
OFFSET ?`, where, sortBy, sortDirection)
err = r.db.Select(&fetchLogs.Items, query, args...)
return fetchLogs, err
}
// GiveJob returns node that should be probed for the next time
func (r *moneroRepo) GiveJob(acceptTor, acceptIPv6 int) (Node, error) {
args := []interface{}{}
wq := []string{}
where := ""
if acceptTor != 1 {
wq = append(wq, "is_tor = ?")
args = append(args, 0)
}
if acceptIPv6 != 1 {
wq = append(wq, "ipv6_only = ?")
args = append(args, 0)
}
if len(wq) > 0 {
where = "WHERE " + strings.Join(wq, " AND ")
}
var node Node
query := fmt.Sprintf(`
SELECT
id,
hostname,
port,
protocol,
is_tor,
last_check_status
FROM
tbl_node
%s -- where query if any
ORDER BY
last_checked ASC
LIMIT 1`, where)
err := r.db.QueryRow(query, args...).Scan(
&node.ID,
&node.Hostname,
&node.Port,
&node.Protocol,
&node.IsTor,
&node.LastCheckStatus)
if err != nil {
return node, err
}
_, err = r.db.Exec(`
UPDATE tbl_node
SET last_checked = ?
WHERE id = ?`, time.Now().Unix(), node.ID)
if err != nil {
return node, err
}
return node, nil
}
type ProbeReport struct {
TookTime float64 `json:"took_time"`
Message string `json:"message"`
Node Node `json:"node"`
}
type nodeStats struct {
Online uint `db:"online"` // total count online
Offline uint `db:"offline"` // total count offline
TotalFetched uint `db:"total_fetched"`
}
// Create new status indicator based from LastCheckStatus and recent IsAvailable status
func (p *ProbeReport) parseStatuses() string {
var s [5]int
if err := p.Node.LastCheckStatus.Unmarshal(&s); err != nil {
slog.Warn(err.Error())
s = [5]int{2, 2, 2, 2, 2}
}
si := 0 // set default "status indicator" to offline
if p.Node.IsAvailable {
si = 1
}
ns := s[1:]
ns = append(ns, si)
j, err := json.Marshal(ns)
if err != nil {
slog.Warn(err.Error())
}
return string(j)
}
// Process report data from probers
func (r *moneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
if report.Node.ID == 0 {
return errors.New("Invalid node")
}
now := time.Now()
qInsertLog := `
INSERT INTO tbl_probe_log (
node_id,
prober_id,
is_available,
height,
adjusted_time,
database_size,
difficulty,
estimate_fee,
date_checked,
failed_reason,
fetch_runtime
) VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?
)`
_, err := r.db.Exec(qInsertLog,
report.Node.ID,
proberId,
report.Node.IsAvailable,
report.Node.Height,
report.Node.AdjustedTime,
report.Node.DatabaseSize,
report.Node.Difficulty,
report.Node.EstimateFee,
now.Unix(),
report.Message,
report.TookTime)
if err != nil {
return err
}
limitTs := now.AddDate(0, -1, 0).Unix()
var stats nodeStats
qstats := `
SELECT
SUM(if(is_available='1',1,0)) AS online,
SUM(if(is_available='0',1,0)) AS offline,
SUM(if(id='0',0,1)) AS total_fetched
FROM
tbl_probe_log
WHERE
node_id = ?
AND date_checked > ?`
if err := r.db.Get(&stats, qstats, report.Node.ID, limitTs); err != nil {
slog.Warn(err.Error())
}
avgUptime := (float64(stats.Online) / float64(stats.TotalFetched)) * 100
report.Node.Uptime = math.Ceil(avgUptime*100) / 100
statuses := report.parseStatuses()
// recheck IP
if report.Node.IP != "" {
if ipInfo, errGeoIp := geo.Info(report.Node.IP); errGeoIp != nil {
fmt.Println("WARN:", errGeoIp.Error())
} else {
report.Node.ASN = ipInfo.ASN
report.Node.ASNName = ipInfo.ASNOrg
report.Node.CountryCode = ipInfo.CountryCode
report.Node.CountryName = ipInfo.CountryName
report.Node.City = ipInfo.City
report.Node.Longitude = ipInfo.Longitude
report.Node.Latitude = ipInfo.Latitude
}
}
if report.Node.IsAvailable {
update := `
UPDATE tbl_node
SET
is_available = ?,
nettype = ?,
height = ?,
adjusted_time = ?,
database_size = ?,
difficulty = ?,
version = ?,
uptime = ?,
estimate_fee = ?,
ip_addr = ?,
asn = ?,
asn_name = ?,
country = ?,
country_name = ?,
city = ?,
last_checked = ?,
last_check_status = ?,
cors_capable = ?
WHERE
id = ?`
_, err := r.db.Exec(update,
1,
report.Node.Nettype,
report.Node.Height,
report.Node.AdjustedTime,
report.Node.DatabaseSize,
report.Node.Difficulty,
report.Node.Version,
report.Node.Uptime,
report.Node.EstimateFee,
report.Node.IP,
report.Node.ASN,
report.Node.ASNName,
report.Node.CountryCode,
report.Node.CountryName,
report.Node.City,
now.Unix(),
statuses,
report.Node.CORSCapable,
report.Node.ID)
if err != nil {
slog.Warn(err.Error())
}
} else {
u := `
UPDATE tbl_node
SET
is_available = ?,
uptime = ?,
last_checked = ?,
last_check_status = ?
WHERE
id = ?`
if _, err := r.db.Exec(u, 0, report.Node.Uptime, now.Unix(), statuses, report.Node.ID); err != nil {
slog.Warn(err.Error())
}
}
if avgUptime <= 0 && stats.TotalFetched > 300 {
fmt.Println("Deleting Monero node (0% uptime from > 300 records)")
if err := r.Delete(report.Node.ID); err != nil {
slog.Warn(err.Error())
}
}
_, err = r.db.Exec(`
UPDATE tbl_prober
SET last_submit_ts = ?
WHERE id = ?`, now.Unix(), proberId)
return err
}