Make cron as it's own package

This commit is contained in:
ditatompel 2024-05-23 02:24:06 +07:00
parent c6efceb0ac
commit 8c1f6b0c43
No known key found for this signature in database
GPG key ID: 31D3D06D77950979
3 changed files with 27 additions and 29 deletions

View file

@ -5,8 +5,8 @@ import (
"os" "os"
"text/tabwriter" "text/tabwriter"
"time" "time"
"xmr-remote-nodes/internal/cron"
"xmr-remote-nodes/internal/database" "xmr-remote-nodes/internal/database"
"xmr-remote-nodes/internal/repo"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -19,8 +19,7 @@ var cronCmd = &cobra.Command{
if err := database.ConnectDB(); err != nil { if err := database.ConnectDB(); err != nil {
panic(err) panic(err)
} }
cronRepo := repo.NewCron(database.GetDB()) crons, err := cron.New().Crons()
crons, err := cronRepo.Crons()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
@ -32,8 +31,8 @@ var cronCmd = &cobra.Command{
w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
fmt.Fprintf(w, "ID\t| Name\t| Run Every\t| Last Run\t| Took Time\n") fmt.Fprintf(w, "ID\t| Name\t| Run Every\t| Last Run\t| Took Time\n")
for _, cron := range crons { for _, cron := range crons {
fmt.Fprintf(w, "%d\t| %s\t| %d\t| %s\t| %f\n", fmt.Fprintf(w, "%d\t| %s\t| %ds\t| %s\t| %f\n",
cron.Id, cron.ID,
cron.Title, cron.Title,
cron.RunEvery, cron.RunEvery,
time.Unix(cron.LastRun, 0).Format(time.RFC3339), time.Unix(cron.LastRun, 0).Format(time.RFC3339),

View file

@ -9,8 +9,8 @@ import (
"xmr-remote-nodes/frontend" "xmr-remote-nodes/frontend"
"xmr-remote-nodes/handler" "xmr-remote-nodes/handler"
"xmr-remote-nodes/internal/config" "xmr-remote-nodes/internal/config"
"xmr-remote-nodes/internal/cron"
"xmr-remote-nodes/internal/database" "xmr-remote-nodes/internal/database"
"xmr-remote-nodes/internal/repo"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/cors"
@ -73,7 +73,7 @@ func serve() {
// start a cleanup cron-job // start a cleanup cron-job
if !fiber.IsChild() { if !fiber.IsChild() {
cronRepo := repo.NewCron(database.GetDB()) cronRepo := cron.New()
go cronRepo.RunCronProcess() go cronRepo.RunCronProcess()
} }

View file

@ -1,4 +1,4 @@
package repo package cron
import ( import (
"fmt" "fmt"
@ -18,7 +18,7 @@ type CronRepo struct {
} }
type Cron struct { type Cron struct {
Id int `json:"id" db:"id"` ID int `json:"id" db:"id"`
Title string `json:"title" db:"title"` Title string `json:"title" db:"title"`
Slug string `json:"slug" db:"slug"` Slug string `json:"slug" db:"slug"`
Description string `json:"description" db:"description"` Description string `json:"description" db:"description"`
@ -32,15 +32,15 @@ type Cron struct {
var rerunTimeout = 300 var rerunTimeout = 300
func NewCron(db *database.DB) CronRepository { func New() CronRepository {
return &CronRepo{db} return &CronRepo{db: database.GetDB()}
} }
func (repo *CronRepo) RunCronProcess() { func (r *CronRepo) RunCronProcess() {
for { for {
time.Sleep(60 * time.Second) time.Sleep(60 * time.Second)
slog.Info("[CRON] Running cron cycle...") slog.Info("[CRON] Running cron cycle...")
list, err := repo.queueList() list, err := r.queueList()
if err != nil { if err != nil {
slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err)) slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err))
continue continue
@ -54,23 +54,23 @@ func (repo *CronRepo) RunCronProcess() {
continue continue
} }
repo.preRunTask(task.Id, currentTs) r.preRunTask(task.ID, currentTs)
repo.execCron(task.Slug) r.execCron(task.Slug)
runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000 runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000
slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime)) slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime))
nextRun := currentTs + int64(task.RunEvery) nextRun := currentTs + int64(task.RunEvery)
repo.postRunTask(task.Id, nextRun, runTime) r.postRunTask(task.ID, nextRun, runTime)
} }
slog.Info("[CRON] Cron cycle done!") slog.Info("[CRON] Cron cycle done!")
} }
} }
func (repo *CronRepo) Crons() ([]Cron, error) { func (r *CronRepo) Crons() ([]Cron, error) {
var tasks []Cron var tasks []Cron
err := repo.db.Select(&tasks, ` err := r.db.Select(&tasks, `
SELECT SELECT
id, id,
title, title,
@ -87,7 +87,7 @@ func (repo *CronRepo) Crons() ([]Cron, error) {
return tasks, err return tasks, err
} }
func (repo *CronRepo) queueList() ([]Cron, error) { func (r *CronRepo) queueList() ([]Cron, error) {
tasks := []Cron{} tasks := []Cron{}
query := ` query := `
SELECT SELECT
@ -102,12 +102,12 @@ func (repo *CronRepo) queueList() ([]Cron, error) {
WHERE WHERE
is_enabled = ? is_enabled = ?
AND next_run <= ?` AND next_run <= ?`
err := repo.db.Select(&tasks, query, 1, time.Now().Unix()) err := r.db.Select(&tasks, query, 1, time.Now().Unix())
return tasks, err return tasks, err
} }
func (repo *CronRepo) preRunTask(id int, lastRunTs int64) { func (r *CronRepo) preRunTask(id int, lastRunTs int64) {
query := ` query := `
UPDATE tbl_cron UPDATE tbl_cron
SET SET
@ -115,14 +115,14 @@ func (repo *CronRepo) preRunTask(id int, lastRunTs int64) {
last_run = ? last_run = ?
WHERE WHERE
id = ?` id = ?`
row, err := repo.db.Query(query, 1, lastRunTs, id) row, err := r.db.Query(query, 1, lastRunTs, id)
if err != nil { if err != nil {
slog.Error(fmt.Sprintf("[CRON] Failed to update pre cron state: %s", err)) slog.Error(fmt.Sprintf("[CRON] Failed to update pre cron state: %s", err))
} }
defer row.Close() defer row.Close()
} }
func (repo *CronRepo) postRunTask(id int, nextRun int64, runtime float64) { func (r *CronRepo) postRunTask(id int, nextRun int64, runtime float64) {
query := ` query := `
UPDATE tbl_cron UPDATE tbl_cron
SET SET
@ -131,27 +131,26 @@ func (repo *CronRepo) postRunTask(id int, nextRun int64, runtime float64) {
run_time = ? run_time = ?
WHERE WHERE
id = ?` id = ?`
row, err := repo.db.Query(query, 0, nextRun, runtime, id) row, err := r.db.Query(query, 0, nextRun, runtime, id)
if err != nil { if err != nil {
slog.Error(fmt.Sprintf("[CRON] Failed to update post cron state: %s", err)) slog.Error(fmt.Sprintf("[CRON] Failed to update post cron state: %s", err))
} }
defer row.Close() defer row.Close()
} }
func (repo *CronRepo) execCron(slug string) { func (r *CronRepo) execCron(slug string) {
switch slug { switch slug {
case "delete_old_probe_logs": case "delete_old_probe_logs":
slog.Info(fmt.Sprintf("[CRON] Start running task: %s", slug)) slog.Info(fmt.Sprintf("[CRON] Start running task: %s", slug))
repo.deleteOldProbeLogs() r.deleteOldProbeLogs()
break
} }
} }
func (repo *CronRepo) deleteOldProbeLogs() { func (r *CronRepo) deleteOldProbeLogs() {
// for now, we only delete stats older than 1 month +2 days // for now, we only delete stats older than 1 month +2 days
startTs := time.Now().AddDate(0, -1, -2).Unix() startTs := time.Now().AddDate(0, -1, -2).Unix()
query := `DELETE FROM tbl_probe_log WHERE date_checked < ?` query := `DELETE FROM tbl_probe_log WHERE date_checked < ?`
_, err := repo.db.Exec(query, startTs) _, err := r.db.Exec(query, startTs)
if err != nil { if err != nil {
slog.Error(fmt.Sprintf("[CRON] Failed to delete old probe logs: %s", err)) slog.Error(fmt.Sprintf("[CRON] Failed to delete old probe logs: %s", err))
} }