Make sure cron goroutine is stopped

Adding struct chan to run cron Process to stop the goroutine.

Moving db migration inside `fiber.IsChild` block
to avoid multiple execution migration script when in prefork mode.

Give additional time for graceful shutdown.
This commit is contained in:
ditatompel 2024-05-23 02:58:58 +07:00
parent 8c1f6b0c43
commit 40b9a6e1d6
No known key found for this signature in database
GPG key ID: 31D3D06D77950979
2 changed files with 52 additions and 42 deletions

View file

@ -6,6 +6,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time"
"xmr-remote-nodes/frontend" "xmr-remote-nodes/frontend"
"xmr-remote-nodes/handler" "xmr-remote-nodes/handler"
"xmr-remote-nodes/internal/config" "xmr-remote-nodes/internal/config"
@ -22,7 +23,7 @@ import (
var serveCmd = &cobra.Command{ var serveCmd = &cobra.Command{
Use: "serve", Use: "serve",
Short: "Serve the WebUI", Short: "Serve the WebUI and APIs",
Long: `This command will run HTTP server for APIs and WebUI.`, Long: `This command will run HTTP server for APIs and WebUI.`,
Run: func(_ *cobra.Command, _ []string) { Run: func(_ *cobra.Command, _ []string) {
serve() serve()
@ -31,14 +32,26 @@ var serveCmd = &cobra.Command{
func serve() { func serve() {
appCfg := config.AppCfg() appCfg := config.AppCfg()
// connect to DB
if err := database.ConnectDB(); err != nil { if err := database.ConnectDB(); err != nil {
panic(err) slog.Error(fmt.Sprintf("[DB] %s", err.Error()))
os.Exit(1)
} }
// run db migrations // signal channel to capture system calls
if err := database.MigrateDb(database.GetDB()); err != nil { sigCh := make(chan os.Signal, 1)
panic(err) signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
stopCron := make(chan struct{})
if !fiber.IsChild() {
// run db migrations
if err := database.MigrateDb(database.GetDB()); err != nil {
slog.Error(fmt.Sprintf("[DB] %s", err.Error()))
os.Exit(1)
}
// run cron process
cronRepo := cron.New()
go cronRepo.RunCronProcess(stopCron)
} }
// Define Fiber config & app. // Define Fiber config & app.
@ -67,28 +80,21 @@ func serve() {
// NotFoundFile: "index.html", // NotFoundFile: "index.html",
})) }))
// signal channel to capture system calls // go routine to capture system calls
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
// start a cleanup cron-job
if !fiber.IsChild() {
cronRepo := cron.New()
go cronRepo.RunCronProcess()
}
// start shutdown goroutine
go func() { go func() {
// capture sigterm and other system call here
<-sigCh <-sigCh
close(stopCron) // stop cron goroutine
slog.Info("Shutting down HTTP server...") slog.Info("Shutting down HTTP server...")
_ = app.Shutdown() _ = app.Shutdown()
// give time for graceful shutdown
time.Sleep(1 * time.Second)
}() }()
// start http server // start http server
serverAddr := fmt.Sprintf("%s:%d", appCfg.Host, appCfg.Port) serverAddr := fmt.Sprintf("%s:%d", appCfg.Host, appCfg.Port)
if err := app.Listen(serverAddr); err != nil { if err := app.Listen(serverAddr); err != nil {
slog.Error(fmt.Sprintf("Server is not running! error: %v", err)) slog.Error(fmt.Sprintf("[HTTP] Server is not running! error: %v", err))
} }
} }

View file

@ -9,7 +9,7 @@ import (
) )
type CronRepository interface { type CronRepository interface {
RunCronProcess() RunCronProcess(chan struct{})
Crons() ([]Cron, error) Crons() ([]Cron, error)
} }
@ -36,35 +36,39 @@ func New() CronRepository {
return &CronRepo{db: database.GetDB()} return &CronRepo{db: database.GetDB()}
} }
func (r *CronRepo) RunCronProcess() { func (r *CronRepo) RunCronProcess(c chan struct{}) {
for { for {
time.Sleep(60 * time.Second) select {
slog.Info("[CRON] Running cron cycle...") case <-time.After(60 * time.Second):
list, err := r.queueList() slog.Info("[CRON] Running cron cycle...")
if err != nil { list, err := r.queueList()
slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err)) if err != nil {
continue slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err))
}
for _, task := range list {
startTime := time.Now()
currentTs := startTime.Unix()
delayedTask := currentTs - task.NextRun
if task.CronState == 1 && delayedTask <= int64(rerunTimeout) {
slog.Debug(fmt.Sprintf("[CRON] Skipping task %s because it is already running", task.Slug))
continue continue
} }
for _, task := range list {
startTime := time.Now()
currentTs := startTime.Unix()
delayedTask := currentTs - task.NextRun
if task.CronState == 1 && delayedTask <= int64(rerunTimeout) {
slog.Debug(fmt.Sprintf("[CRON] Skipping task %s because it is already running", task.Slug))
continue
}
r.preRunTask(task.ID, currentTs) r.preRunTask(task.ID, currentTs)
r.execCron(task.Slug)
r.execCron(task.Slug) runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000
slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime))
nextRun := currentTs + int64(task.RunEvery)
runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000 r.postRunTask(task.ID, nextRun, runTime)
slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime)) }
nextRun := currentTs + int64(task.RunEvery) slog.Info("[CRON] Cron cycle done!")
case <-c:
r.postRunTask(task.ID, nextRun, runTime) slog.Info("[CRON] Shutting down cron...")
return
} }
slog.Info("[CRON] Cron cycle done!")
} }
} }