From 03fc463d0d9f9a3ea3b37f0202708c3807e1503f Mon Sep 17 00:00:00 2001 From: IsaacDSC Date: Sat, 21 Feb 2026 12:27:53 -0300 Subject: [PATCH] feat: implement centralized shutdown handling and improve context usage for graceful server shutdown --- cmd/api/main.go | 48 ++++++++++++++++------------- cmd/setup/pubsub/consumer.go | 17 +++------- cmd/setup/task/consumer.go | 11 ++----- pkg/asyncadapter/gbubsub_adapter.go | 19 +++++++++--- 4 files changed, 48 insertions(+), 47 deletions(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index 7abb3e5..cacd6e7 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -20,33 +20,39 @@ import ( "github.com/redis/go-redis/v9" ) -// waitForShutdown waits for SIGINT/SIGTERM and gracefully shuts down the provided servers. -func waitForShutdown(server *http.Server) { +// Centralized shutdown: creates a context that is cancelled on SIGINT/SIGTERM and waits for all servers to shutdown. +func waitForShutdown(ctx context.Context, servers []*http.Server) { quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit - log.Println("Shutting down servers...") + select { + case <-quit: + log.Println("Shutting down servers...") + case <-ctx.Done(): + log.Println("Context cancelled, shutting down servers...") + } - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - if server != nil { - if err := server.Shutdown(shutdownCtx); err != nil { - log.Printf("Backoffice server shutdown error: %v", err) + for _, server := range servers { + if server != nil { + if err := server.Shutdown(shutdownCtx); err != nil { + log.Printf("Server shutdown error: %v", err) + } } } - log.Println("Server shutdown complete") + log.Println("All servers shutdown complete") } // go run . --scope=all // go run . --scope=backoffice // go run . --scope=pubsub -// go run . --scope=task func main() { conf := cfg.Get() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() scope := flag.String("scope", "all", "service to run") flag.Parse() @@ -64,6 +70,8 @@ func main() { } var servers []*http.Server + var closers []func() + if scopeOrAll(*scope, "backoffice") { backofficeServer := backoffice.Start( redisClient, @@ -85,11 +93,8 @@ func main() { s := pubsub.New( store, memStore, fetch, storeInsights, ) - s.Start(ctx, conf) - - defer s.Close() - + closers = append(closers, s.Close) servers = append(servers, s.Server()) } @@ -97,18 +102,17 @@ func main() { s := task.New( store, memStore, fetch, storeInsights, ) - s.Start(ctx, conf) - - defer s.Close() - + closers = append(closers, s.Close) servers = append(servers, s.Server()) } - for _, server := range servers { - waitForShutdown(server) - } + waitForShutdown(ctx, servers) + // call all closers after shutdown + for _, closeFn := range closers { + closeFn() + } } func scopeOrAll(scope, expected string) bool { diff --git a/cmd/setup/pubsub/consumer.go b/cmd/setup/pubsub/consumer.go index d7ff955..16dd7e0 100644 --- a/cmd/setup/pubsub/consumer.go +++ b/cmd/setup/pubsub/consumer.go @@ -3,10 +3,7 @@ package pubsub import ( "context" "log" - "os" - "os/signal" "sync" - "syscall" "time" "cloud.google.com/go/pubsub" @@ -18,11 +15,7 @@ import ( ) func (s *Service) consumer(ctx context.Context, env cfg.Config) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() + // Use only the context passed from main for shutdown control concurrency := env.AsynqConfig.Concurrency @@ -104,12 +97,10 @@ func (s *Service) consumer(ctx context.Context, env cfg.Config) { log.Println("[*] starting worker with configs") log.Println("[*] wq.concurrency", (len(handlers))*concurrency) - log.Println("[*] Worker started. Press Ctrl+C to gracefully shutdown...") - - <-sigChan - log.Println("[*] Received shutdown signal, initiating graceful shutdown...") + log.Println("[*] Worker started. Waiting for shutdown signal from context...") - cancel() + <-ctx.Done() + log.Println("[*] Context cancelled, initiating graceful shutdown...") done := make(chan struct{}) go func() { diff --git a/cmd/setup/task/consumer.go b/cmd/setup/task/consumer.go index 5c6afb9..7858058 100644 --- a/cmd/setup/task/consumer.go +++ b/cmd/setup/task/consumer.go @@ -3,9 +3,6 @@ package task import ( "context" "log" - "os" - "os/signal" - "syscall" "github.com/IsaacDSC/gqueue/cmd/setup/middleware" "github.com/IsaacDSC/gqueue/internal/app/taskapp" @@ -17,8 +14,6 @@ import ( ) func (s *Service) consumer(ctx context.Context, env cfg.Config, asynqCfg asynq.Config) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) mux := asynq.NewServeMux() mux.Use(middleware.AsynqLogger) @@ -42,9 +37,9 @@ func (s *Service) consumer(ctx context.Context, env cfg.Config, asynqCfg asynq.C } }() - <-sigChan - log.Println("[*] Received shutdown signal, initiating graceful shutdown...") - + // Wait for context cancellation + <-ctx.Done() + log.Println("[*] Context cancelled, initiating graceful shutdown...") s.asynqServer.Shutdown() log.Println("[*] Asynq server stopped gracefully") diff --git a/pkg/asyncadapter/gbubsub_adapter.go b/pkg/asyncadapter/gbubsub_adapter.go index 53f5767..d792337 100644 --- a/pkg/asyncadapter/gbubsub_adapter.go +++ b/pkg/asyncadapter/gbubsub_adapter.go @@ -36,12 +36,12 @@ func (h Handle[T]) ToGPubSubHandler(pub pubadapter.GenericPublisher) gpubsub.Han retryCount, err := strconv.Atoi(strRetryCount) if err != nil { - panic(err) //TODO: adicionar validação e tratamento de erro melhor + panic(err) //TODO: add better validation and error handling } maxRetryAttempts, err := strconv.Atoi(strMaxRetryCount) if err != nil { - panic(err) //TODO: adicionar validação e tratamento de erro melhor + panic(err) //TODO: add better validation and error handling } if retryCount >= maxRetryAttempts { @@ -52,14 +52,25 @@ func (h Handle[T]) ToGPubSubHandler(pub pubadapter.GenericPublisher) gpubsub.Han retryCount++ msg.Attributes["retry_count"] = strconv.Itoa(retryCount) topic := msg.Attributes["topic"] - time.Sleep(time.Second * 5) + + // Wait respecting the context + select { + case <-time.After(5 * time.Second): + // continue + case <-ctx.Done(): + return + } + + if ctx.Err() != nil { + return + } + if err := pub.Publish(ctx, topic, msg, pubadapter.Opts{ Attributes: msg.Attributes, }); err != nil { msg.Nack() return } - } return gpubsub.Handle{