From 40b9a6e1d6f23e1fe159fc8530ca8db8d9c3c980 Mon Sep 17 00:00:00 2001 From: ditatompel Date: Thu, 23 May 2024 02:58:58 +0700 Subject: [PATCH] Make sure cron goroutine is stopped Adding struct chan to run cron Process to stop the goroutine. Moving db migration inside `fiber.IsChild` block to avoid multiple execution migration script when in prefork mode. Give additional time for graceful shutdown. --- cmd/server/serve.go | 44 +++++++++++++++++++++---------------- internal/cron/cron.go | 50 +++++++++++++++++++++++-------------------- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/cmd/server/serve.go b/cmd/server/serve.go index 32c5101..d7d929f 100644 --- a/cmd/server/serve.go +++ b/cmd/server/serve.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "syscall" + "time" "xmr-remote-nodes/frontend" "xmr-remote-nodes/handler" "xmr-remote-nodes/internal/config" @@ -22,7 +23,7 @@ import ( var serveCmd = &cobra.Command{ Use: "serve", - Short: "Serve the WebUI", + Short: "Serve the WebUI and APIs", Long: `This command will run HTTP server for APIs and WebUI.`, Run: func(_ *cobra.Command, _ []string) { serve() @@ -31,14 +32,26 @@ var serveCmd = &cobra.Command{ func serve() { appCfg := config.AppCfg() - // connect to DB if err := database.ConnectDB(); err != nil { - panic(err) + slog.Error(fmt.Sprintf("[DB] %s", err.Error())) + os.Exit(1) } - // run db migrations - if err := database.MigrateDb(database.GetDB()); err != nil { - panic(err) + // signal channel to capture system calls + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) + + stopCron := make(chan struct{}) + if !fiber.IsChild() { + // run db migrations + if err := database.MigrateDb(database.GetDB()); err != nil { + slog.Error(fmt.Sprintf("[DB] %s", err.Error())) + os.Exit(1) + } + + // run cron process + cronRepo := cron.New() + go cronRepo.RunCronProcess(stopCron) } // Define Fiber config & app. @@ -67,28 +80,21 @@ func serve() { // NotFoundFile: "index.html", })) - // signal channel to capture system calls - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) - - // start a cleanup cron-job - if !fiber.IsChild() { - cronRepo := cron.New() - go cronRepo.RunCronProcess() - } - - // start shutdown goroutine + // go routine to capture system calls go func() { - // capture sigterm and other system call here <-sigCh + close(stopCron) // stop cron goroutine slog.Info("Shutting down HTTP server...") _ = app.Shutdown() + + // give time for graceful shutdown + time.Sleep(1 * time.Second) }() // start http server serverAddr := fmt.Sprintf("%s:%d", appCfg.Host, appCfg.Port) if err := app.Listen(serverAddr); err != nil { - slog.Error(fmt.Sprintf("Server is not running! error: %v", err)) + slog.Error(fmt.Sprintf("[HTTP] Server is not running! error: %v", err)) } } diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 218251f..2618142 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -9,7 +9,7 @@ import ( ) type CronRepository interface { - RunCronProcess() + RunCronProcess(chan struct{}) Crons() ([]Cron, error) } @@ -36,35 +36,39 @@ func New() CronRepository { return &CronRepo{db: database.GetDB()} } -func (r *CronRepo) RunCronProcess() { +func (r *CronRepo) RunCronProcess(c chan struct{}) { for { - time.Sleep(60 * time.Second) - slog.Info("[CRON] Running cron cycle...") - list, err := r.queueList() - if err != nil { - slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err)) - continue - } - for _, task := range list { - startTime := time.Now() - currentTs := startTime.Unix() - delayedTask := currentTs - task.NextRun - if task.CronState == 1 && delayedTask <= int64(rerunTimeout) { - slog.Debug(fmt.Sprintf("[CRON] Skipping task %s because it is already running", task.Slug)) + select { + case <-time.After(60 * time.Second): + slog.Info("[CRON] Running cron cycle...") + list, err := r.queueList() + if err != nil { + slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err)) continue } + for _, task := range list { + startTime := time.Now() + currentTs := startTime.Unix() + delayedTask := currentTs - task.NextRun + if task.CronState == 1 && delayedTask <= int64(rerunTimeout) { + slog.Debug(fmt.Sprintf("[CRON] Skipping task %s because it is already running", task.Slug)) + continue + } - r.preRunTask(task.ID, currentTs) + r.preRunTask(task.ID, currentTs) + r.execCron(task.Slug) - r.execCron(task.Slug) + runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000 + slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime)) + nextRun := currentTs + int64(task.RunEvery) - runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000 - slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime)) - nextRun := currentTs + int64(task.RunEvery) - - r.postRunTask(task.ID, nextRun, runTime) + r.postRunTask(task.ID, nextRun, runTime) + } + slog.Info("[CRON] Cron cycle done!") + case <-c: + slog.Info("[CRON] Shutting down cron...") + return } - slog.Info("[CRON] Cron cycle done!") } }