Process submitted job from prober

This commit is contained in:
ditatompel 2024-05-05 01:42:47 +07:00
parent 9b8182082a
commit 1baddfd2d1
No known key found for this signature in database
GPG key ID: 31D3D06D77950979
6 changed files with 188 additions and 11 deletions

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -21,7 +22,8 @@ import (
const RPCUserAgent = "ditatombot/0.0.1 (Monero RPC Monitoring; Contact: ditatombot@ditatompel.com)" const RPCUserAgent = "ditatombot/0.0.1 (Monero RPC Monitoring; Contact: ditatombot@ditatompel.com)"
type proberClient struct { type proberClient struct {
config *config.App config *config.App
message string
} }
func newProber(cfg *config.App) *proberClient { func newProber(cfg *config.App) *proberClient {
@ -146,19 +148,22 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error)
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
// TODO: Post report to server p.message = err.Error()
p.reportResult(node, time.Since(startTime).Seconds())
return node, err return node, err
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
// TODO: Post report to server p.message = fmt.Sprintf("status code: %d", resp.StatusCode)
return node, fmt.Errorf("status code: %d", resp.StatusCode) p.reportResult(node, time.Since(startTime).Seconds())
return node, errors.New(p.message)
} }
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
// TODO: Post report to server p.message = err.Error()
p.reportResult(node, time.Since(startTime).Seconds())
return node, err return node, err
} }
@ -167,7 +172,8 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error)
}{} }{}
if err := json.Unmarshal(body, &reportNode); err != nil { if err := json.Unmarshal(body, &reportNode); err != nil {
// TODO: Post report to server p.message = err.Error()
p.reportResult(node, time.Since(startTime).Seconds())
return node, err return node, err
} }
if reportNode.Status == "OK" { if reportNode.Status == "OK" {
@ -177,6 +183,7 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error)
node.AdjustedTime = reportNode.AdjustedTime node.AdjustedTime = reportNode.AdjustedTime
node.DatabaseSize = reportNode.DatabaseSize node.DatabaseSize = reportNode.DatabaseSize
node.Difficulty = reportNode.Difficulty node.Difficulty = reportNode.Difficulty
node.Height = reportNode.Height
node.Version = reportNode.Version node.Version = reportNode.Version
if resp.Header.Get("Access-Control-Allow-Origin") == "*" || resp.Header.Get("Access-Control-Allow-Origin") == "https://xmr.ditatompel.com" { if resp.Header.Get("Access-Control-Allow-Origin") == "*" || resp.Header.Get("Access-Control-Allow-Origin") == "https://xmr.ditatompel.com" {
@ -233,9 +240,45 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error)
node.EstimateFee = feeEstimate.Result.Fee node.EstimateFee = feeEstimate.Result.Fee
fmt.Printf("Took %f seconds\n", tookTime) fmt.Printf("Took %f seconds\n", tookTime)
if err := p.reportResult(node, tookTime); err != nil {
return node, err
}
return node, nil return node, nil
} }
func (p *proberClient) reportResult(node repo.MoneroNode, tookTime float64) error {
jsonData, err := json.Marshal(repo.ProbeReport{
TookTime: tookTime,
Message: p.message,
NodeInfo: node,
})
if err != nil {
return err
}
endpoint := fmt.Sprintf("%s/api/v1/job", p.config.ServerEndpoint)
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Add("X-Prober-Api-Key", p.config.ApiKey)
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
req.Header.Set("User-Agent", RPCUserAgent)
client := &http.Client{Timeout: 60 * time.Second}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("status code: %d", resp.StatusCode)
}
return nil
}
// for debug purposes // for debug purposes
func prettyPrint(i interface{}) string { func prettyPrint(i interface{}) string {
s, _ := json.MarshalIndent(i, "", "\t") s, _ := json.MarshalIndent(i, "", "\t")

View file

@ -41,6 +41,6 @@ func CheckProber(c *fiber.Ctx) error {
}) })
} }
c.Locals("prober", prober) c.Locals("prober_id", prober.Id)
return c.Next() return c.Next()
} }

View file

@ -191,6 +191,33 @@ func GiveJob(c *fiber.Ctx) error {
}) })
} }
func ProcessJob(c *fiber.Ctx) error {
moneroRepo := repo.NewMoneroRepo(database.GetDB())
report := repo.ProbeReport{}
if err := c.BodyParser(&report); err != nil {
return c.Status(fiber.StatusUnprocessableEntity).JSON(fiber.Map{
"status": "error",
"message": err.Error(),
"data": nil,
})
}
if err := moneroRepo.ProcessJob(report, c.Locals("prober_id").(int64)); err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"status": "error",
"message": err.Error(),
"data": nil,
})
}
return c.JSON(fiber.Map{
"status": "ok",
"message": "Success",
"data": nil,
})
}
func Crons(c *fiber.Ctx) error { func Crons(c *fiber.Ctx) error {
cronRepo := repo.NewCron(database.GetDB()) cronRepo := repo.NewCron(database.GetDB())

View file

@ -17,5 +17,6 @@ func V1Api(app *fiber.App) {
v1.Get("/nodes", MoneroNodes) v1.Get("/nodes", MoneroNodes)
v1.Post("/nodes", AddNode) v1.Post("/nodes", AddNode)
v1.Get("/job", CheckProber, GiveJob) v1.Get("/job", CheckProber, GiveJob)
v1.Post("/job", CheckProber, ProcessJob)
v1.Get("/crons", Crons) v1.Get("/crons", Crons)
} }

View file

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math"
"net" "net"
"slices" "slices"
"strings" "strings"
@ -18,6 +19,7 @@ type MoneroRepository interface {
Add(protocol string, host string, port uint) error Add(protocol string, host string, port uint) error
Nodes(q MoneroQueryParams) (MoneroNodes, error) Nodes(q MoneroQueryParams) (MoneroNodes, error)
GiveJob(acceptTor int) (MoneroNode, error) GiveJob(acceptTor int) (MoneroNode, error)
ProcessJob(report ProbeReport, proberId int64) error
} }
type MoneroRepo struct { type MoneroRepo struct {
@ -43,7 +45,7 @@ type MoneroNode struct {
Difficulty uint `json:"difficulty" db:"difficulty"` Difficulty uint `json:"difficulty" db:"difficulty"`
Version string `json:"version" db:"version"` Version string `json:"version" db:"version"`
Status string `json:"status,omitempty"` Status string `json:"status,omitempty"`
Uptime float32 `json:"uptime" db:"uptime"` Uptime float64 `json:"uptime" db:"uptime"`
EstimateFee uint `json:"estimate_fee" db:"estimate_fee"` EstimateFee uint `json:"estimate_fee" db:"estimate_fee"`
Asn uint `json:"asn" db:"asn"` Asn uint `json:"asn" db:"asn"`
AsnName string `json:"asn_name" db:"asn_name"` AsnName string `json:"asn_name" db:"asn_name"`
@ -206,8 +208,8 @@ func (repo *MoneroRepo) GiveJob(acceptTor int) (MoneroNode, error) {
node := MoneroNode{} node := MoneroNode{}
query := fmt.Sprintf(`SELECT id, hostname, port, protocol, is_tor FROM tbl_node %s ORDER BY last_checked ASC LIMIT 1`, where) query := fmt.Sprintf(`SELECT id, hostname, port, protocol, is_tor, last_check_status FROM tbl_node %s ORDER BY last_checked ASC LIMIT 1`, where)
err := repo.db.QueryRow(query, queryParams...).Scan(&node.Id, &node.Hostname, &node.Port, &node.Protocol, &node.IsTor) err := repo.db.QueryRow(query, queryParams...).Scan(&node.Id, &node.Hostname, &node.Port, &node.Protocol, &node.IsTor, &node.LastCheckStatus)
if err != nil { if err != nil {
return node, err return node, err
} }
@ -220,3 +222,87 @@ func (repo *MoneroRepo) GiveJob(acceptTor int) (MoneroNode, error) {
return node, nil return node, nil
} }
type ProbeReport struct {
TookTime float64 `json:"took_time"`
Message string `json:"message"`
NodeInfo MoneroNode `json:"node_info"`
}
func (repo *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
if report.NodeInfo.Id == 0 {
return errors.New("Invalid node")
}
qInsertLog := `INSERT INTO tbl_probe_log (node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
_, err := repo.db.Exec(qInsertLog, report.NodeInfo.Id, proberId, report.NodeInfo.IsAvailable, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.EstimateFee, time.Now().Unix(), report.Message, report.TookTime)
if err != nil {
return err
}
now := time.Now()
limitTs := now.AddDate(0, -1, 0).Unix()
nodeStats := struct {
OnlineCount uint `db:"online"`
OfflineCount uint `db:"offline"`
TotalFetched uint `db:"total_fetched"`
}{}
qstats := `SELECT
SUM(if(is_available='1',1,0)) AS online,
SUM(if(is_available='0',1,0)) AS offline,
SUM(if(id='0',0,1)) AS total_fetched FROM
tbl_probe_log WHERE node_id = ? AND date_checked > ?`
repo.db.Get(&nodeStats, qstats, report.NodeInfo.Id, limitTs)
avgUptime := (float64(nodeStats.OnlineCount) / float64(nodeStats.TotalFetched)) * 100
report.NodeInfo.Uptime = math.Ceil(avgUptime*100) / 100
var statuses [5]int
errUnmarshal := report.NodeInfo.LastCheckStatus.Unmarshal(&statuses)
if errUnmarshal != nil {
fmt.Println("Warning", errUnmarshal.Error())
statuses = [5]int{2, 2, 2, 2, 2}
}
nodeAvailable := 0
if report.NodeInfo.IsAvailable {
nodeAvailable = 1
}
newStatuses := statuses[1:]
newStatuses = append(newStatuses, nodeAvailable)
statuesValueToDb, errMarshalStatus := json.Marshal(newStatuses)
if errMarshalStatus != nil {
fmt.Println("WARN", errMarshalStatus.Error())
}
// recheck IP
// TODO: Fill the data using GeoIP
// if report.NodeInfo.Ip != "" {
// ipInfo, errGeoIp := GetGeoIpInfo(report.NodeInfo.Ip)
// if errGeoIp == nil {
// report.NodeInfo.Asn = ipInfo.Asn
// report.NodeInfo.AsnName = ipInfo.AsnOrg
// report.NodeInfo.CountryCode = ipInfo.CountryCode
// report.NodeInfo.CountryName = ipInfo.CountryName
// report.NodeInfo.City = ipInfo.City
// }
// }
update := `UPDATE tbl_node SET
is_available = ?, nettype = ?, height = ?, adjusted_time = ?,
database_size = ?, difficulty = ?, version = ?, uptime = ?,
estimate_fee = ?, ip_addr = ?, asn = ?, asn_name = ?, country = ?,
country_name = ?, city = ?, last_checked = ?, last_check_status = ?,
cors_capable = ?
WHERE id = ?`
_, err = repo.db.Exec(update,
nodeAvailable, report.NodeInfo.NetType, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.Version, report.NodeInfo.Uptime, report.NodeInfo.EstimateFee, report.NodeInfo.Ip, report.NodeInfo.Asn, report.NodeInfo.AsnName, report.NodeInfo.CountryCode, report.NodeInfo.CountryName, report.NodeInfo.City, now.Unix(), string(statuesValueToDb), report.NodeInfo.CorsCapable, report.NodeInfo.Id)
return err
}

View file

@ -54,7 +54,7 @@ CREATE TABLE `tbl_node` (
`adjusted_time` bigint(20) unsigned NOT NULL DEFAULT 0, `adjusted_time` bigint(20) unsigned NOT NULL DEFAULT 0,
`database_size` bigint(20) unsigned NOT NULL DEFAULT 0, `database_size` bigint(20) unsigned NOT NULL DEFAULT 0,
`difficulty` bigint(20) unsigned NOT NULL DEFAULT 0, `difficulty` bigint(20) unsigned NOT NULL DEFAULT 0,
`version` varchar(200) NOT NULL, `version` varchar(200) NOT NULL DEFAULT '',
`uptime` float(5,2) unsigned NOT NULL DEFAULT 0.00, `uptime` float(5,2) unsigned NOT NULL DEFAULT 0.00,
`estimate_fee` int(9) unsigned NOT NULL DEFAULT 0, `estimate_fee` int(9) unsigned NOT NULL DEFAULT 0,
`ip_addr` varchar(200) NOT NULL, `ip_addr` varchar(200) NOT NULL,
@ -72,6 +72,26 @@ CREATE TABLE `tbl_node` (
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */; /*!40101 SET character_set_client = @saved_cs_client */;
DROP TABLE IF EXISTS `tbl_probe_log`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `tbl_probe_log` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`node_id` bigint(20) unsigned NOT NULL DEFAULT 0,
`prober_id` int(9) unsigned NOT NULL DEFAULT 0,
`is_available` tinyint(1) unsigned NOT NULL DEFAULT 0,
`height` bigint(20) unsigned NOT NULL DEFAULT 0,
`adjusted_time` bigint(20) unsigned NOT NULL DEFAULT 0,
`database_size` bigint(20) unsigned NOT NULL DEFAULT 0,
`difficulty` bigint(20) unsigned NOT NULL DEFAULT 0,
`estimate_fee` int(9) unsigned NOT NULL DEFAULT 0,
`date_checked` bigint(20) unsigned NOT NULL DEFAULT 0,
`failed_reason` text NOT NULL DEFAULT '',
`fetch_runtime` float(5,2) unsigned DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `node_id` (`node_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
/*!40101 SET character_set_client = @saved_cs_client */;
DROP TABLE IF EXISTS `tbl_prober`; DROP TABLE IF EXISTS `tbl_prober`;
/*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */; /*!40101 SET character_set_client = utf8 */;