diff --git a/.todos.md b/.todos.md index bf2d360..947f0c9 100644 --- a/.todos.md +++ b/.todos.md @@ -22,3 +22,10 @@ Filtros de mensagens publicadas -> ✅ CI para validação minima do projeto -> ✅ Validar a aplicação para ver se todos lugares que salvam no redis estão com TTL >> OK -> ✅ Conseguir rodar asynq e google pubsub juntos para configurações específicas + + + + +>> Adicionar Cancel context by timeout + +>> Implementar ack | nack task \ No newline at end of file diff --git a/cmd/api/main.go b/cmd/api/main.go index 7abb3e5..cacd6e7 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -20,33 +20,39 @@ import ( "github.com/redis/go-redis/v9" ) -// waitForShutdown waits for SIGINT/SIGTERM and gracefully shuts down the provided servers. -func waitForShutdown(server *http.Server) { +// Centralized shutdown: creates a context that is cancelled on SIGINT/SIGTERM and waits for all servers to shutdown. +func waitForShutdown(ctx context.Context, servers []*http.Server) { quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit - log.Println("Shutting down servers...") + select { + case <-quit: + log.Println("Shutting down servers...") + case <-ctx.Done(): + log.Println("Context cancelled, shutting down servers...") + } - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - if server != nil { - if err := server.Shutdown(shutdownCtx); err != nil { - log.Printf("Backoffice server shutdown error: %v", err) + for _, server := range servers { + if server != nil { + if err := server.Shutdown(shutdownCtx); err != nil { + log.Printf("Server shutdown error: %v", err) + } } } - log.Println("Server shutdown complete") + log.Println("All servers shutdown complete") } // go run . --scope=all // go run . --scope=backoffice // go run . --scope=pubsub -// go run . --scope=task func main() { conf := cfg.Get() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() scope := flag.String("scope", "all", "service to run") flag.Parse() @@ -64,6 +70,8 @@ func main() { } var servers []*http.Server + var closers []func() + if scopeOrAll(*scope, "backoffice") { backofficeServer := backoffice.Start( redisClient, @@ -85,11 +93,8 @@ func main() { s := pubsub.New( store, memStore, fetch, storeInsights, ) - s.Start(ctx, conf) - - defer s.Close() - + closers = append(closers, s.Close) servers = append(servers, s.Server()) } @@ -97,18 +102,17 @@ func main() { s := task.New( store, memStore, fetch, storeInsights, ) - s.Start(ctx, conf) - - defer s.Close() - + closers = append(closers, s.Close) servers = append(servers, s.Server()) } - for _, server := range servers { - waitForShutdown(server) - } + waitForShutdown(ctx, servers) + // call all closers after shutdown + for _, closeFn := range closers { + closeFn() + } } func scopeOrAll(scope, expected string) bool { diff --git a/cmd/api/shutdown_test.go b/cmd/api/shutdown_test.go new file mode 100644 index 0000000..0347645 --- /dev/null +++ b/cmd/api/shutdown_test.go @@ -0,0 +1,216 @@ +package main + +import ( + "bufio" + "bytes" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" + "time" +) + +// syncBuffer wraps a buffer so stdout and stderr can be written concurrently without dropping output. +type syncBuffer struct { + mu sync.Mutex + b bytes.Buffer +} + +func (s *syncBuffer) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.b.Write(p) +} + +func (s *syncBuffer) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.b.String() +} + +// buildApiBinary builds ./cmd/api and returns the path to the binary. +func buildApiBinary(t *testing.T, moduleRoot string) string { + t.Helper() + exe := filepath.Join(t.TempDir(), "gqueue-api") + if os.PathListSeparator == ';' { + exe += ".exe" + } + build := exec.Command("go", "build", "-o", exe, "./cmd/api") + build.Dir = moduleRoot + if out, err := build.CombinedOutput(); err != nil { + t.Fatalf("go build ./cmd/api: %v\n%s", err, out) + } + return exe +} + +// loadEnvFile reads path and sets env vars from KEY=VALUE lines (comments and empty lines ignored). +func loadEnvFile(path string) { + f, err := os.Open(path) + if err != nil { + return + } + defer f.Close() + s := bufio.NewScanner(f) + for s.Scan() { + line := strings.TrimSpace(s.Text()) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if strings.HasPrefix(line, "export ") { + line = strings.TrimSpace(line[7:]) + } + i := strings.Index(line, "=") + if i <= 0 { + continue + } + key := strings.TrimSpace(line[:i]) + value := strings.TrimSpace(line[i+1:]) + if len(value) >= 2 && (value[0] == '"' && value[len(value)-1] == '"' || value[0] == '\'' && value[len(value)-1] == '\'') { + value = value[1 : len(value)-1] + } + _ = os.Setenv(key, value) + } +} + +// holdBackofficeConnection opens a TCP connection to the backoffice server and sends an +// incomplete HTTP request so the server blocks reading until ReadTimeout. When we SIGINT, +// the server must wait for this connection to drain before exiting—proving shutdown waits. +func holdBackofficeConnection(t *testing.T) (closeConn func()) { + t.Helper() + port := os.Getenv("BACKOFFICE_API_PORT") + if port == "" { + port = "8081" + } + addr := net.JoinHostPort("localhost", port) + conn, err := net.DialTimeout("tcp", addr, 2*time.Second) + if err != nil { + t.Skipf("Cannot connect to backoffice at %s (server may not be up): %v", addr, err) + return func() {} + } + // Incomplete request: no \r\n\r\n so server keeps reading (until ReadTimeout). + _, _ = conn.Write([]byte("GET /health HTTP/1.1\r\nHost: localhost\r\n")) + return func() { _ = conn.Close() } +} + +// requiredShutdownLogs are the log messages that must appear (see main.go waitForShutdown). +// We require the first and last so we know shutdown started and completed; the duration lines +// in between may be interleaved with asynq logs and can be flaky to capture. +const minShutdownDuration = 2 * time.Second + +var requiredShutdownLogs = []string{ + "Shutting down servers...", + "All servers shutdown complete", +} + +func TestShutdownGraceful(t *testing.T) { + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Failed to get working directory: %v", err) + } + + // Find module root (same dir as go.mod). + moduleRoot := wd + for { + goMod := filepath.Join(moduleRoot, "go.mod") + if _, err := os.Stat(goMod); err == nil { + break + } + parent := filepath.Dir(moduleRoot) + if parent == moduleRoot { + t.Fatalf("go.mod not found (searched from %s)", wd) + } + moduleRoot = parent + } + + // Load .env from module root so "go test" can use local env (only stdlib, no godotenv). + loadEnvFile(filepath.Join(moduleRoot, ".env")) + + // Skip if required env is missing (integration test needs Redis + DB). + if os.Getenv("DB_CONNECTION_STRING") == "" || os.Getenv("CACHE_ADDR") == "" { + t.Skip("Skipping shutdown integration test: DB_CONNECTION_STRING and CACHE_ADDR must be set (use .env or run scripts/run_shutdown_test.sh)") + } + + // Build and run the binary so SIGINT goes to the server process (go run would send it to the go process only). + exe := buildApiBinary(t, moduleRoot) + cmd := exec.Command(exe) + cmd.Dir = moduleRoot + output := &syncBuffer{} + cmd.Stdout = output + cmd.Stderr = output + + if err := cmd.Start(); err != nil { + t.Fatalf("Failed to start server: %v", err) + } + defer func() { + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + }() + + // Wait for server to be up (main finishes init and enters waitForShutdown). + time.Sleep(3 * time.Second) + + if cmd.Process == nil { + t.Fatalf("cmd.Process is nil after Start") + } + + // Hold one connection with an incomplete HTTP request so the server must wait for it + // (or ReadTimeout) during shutdown. This proves shutdown is not killing immediately. + closeConn := holdBackofficeConnection(t) + defer closeConn() + + // Give the server time to see the connection (in-flight). + time.Sleep(500 * time.Millisecond) + + startShutdown := time.Now() + if err := cmd.Process.Signal(os.Interrupt); err != nil { + t.Fatalf("Failed to send SIGINT: %v", err) + } + + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + select { + case waitErr := <-done: + shutdownDuration := time.Since(startShutdown) + outStr := output.String() + + // (1) Process must exit with code 0 (graceful exit). Non-zero = panic, crash, or unclean exit. + if waitErr != nil { + if exitErr, ok := waitErr.(*exec.ExitError); ok { + t.Errorf("Process did not exit gracefully: exit code %d. Output:\n%s", exitErr.ExitCode(), outStr) + } else { + t.Errorf("Process wait error: %v", waitErr) + } + } + + // (2) All shutdown log lines from waitForShutdown must appear. + for _, want := range requiredShutdownLogs { + if !strings.Contains(outStr, want) { + t.Errorf("Shutdown log missing: %q. Full output:\n%s", want, outStr) + } + } + + t.Logf("Shutdown completed in %v", shutdownDuration) + + // (3) Shutdown must take at least minShutdownDuration: we held an in-flight connection + // (incomplete HTTP request); the server must wait for it before exiting. + if shutdownDuration < minShutdownDuration { + t.Errorf("Shutdown did not wait for in-flight connection: took %v, expected at least %v", shutdownDuration, minShutdownDuration) + } + // (4) Shutdown should complete under the 15s select timeout (server uses 1m timeout internally). + if shutdownDuration > 14*time.Second { + t.Errorf("Shutdown took too long: %v", shutdownDuration) + } + case <-time.After(15 * time.Second): + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + t.Fatalf("Shutdown did not complete within 15s. Output so far:\n%s", output.String()) + } +} diff --git a/cmd/setup/backoffice/httpserver.go b/cmd/setup/backoffice/httpserver.go index b24916a..f290ed0 100644 --- a/cmd/setup/backoffice/httpserver.go +++ b/cmd/setup/backoffice/httpserver.go @@ -4,6 +4,7 @@ import ( "context" "log" "net/http" + "time" "github.com/IsaacDSC/gqueue/cmd/setup/middleware" "github.com/IsaacDSC/gqueue/internal/app/backofficeapp" @@ -53,8 +54,11 @@ func Start( port := env.BackofficeApiPort server := &http.Server{ - Addr: port.String(), - Handler: handler, + Addr: port.String(), + Handler: handler, + ReadTimeout: 10 * time.Second, + WriteTimeout: 15 * time.Second, + IdleTimeout: 60 * time.Second, } log.Printf("[*] Starting Backoffice server on :%d", port) diff --git a/cmd/setup/pubsub/consumer.go b/cmd/setup/pubsub/consumer.go index d7ff955..16dd7e0 100644 --- a/cmd/setup/pubsub/consumer.go +++ b/cmd/setup/pubsub/consumer.go @@ -3,10 +3,7 @@ package pubsub import ( "context" "log" - "os" - "os/signal" "sync" - "syscall" "time" "cloud.google.com/go/pubsub" @@ -18,11 +15,7 @@ import ( ) func (s *Service) consumer(ctx context.Context, env cfg.Config) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() + // Use only the context passed from main for shutdown control concurrency := env.AsynqConfig.Concurrency @@ -104,12 +97,10 @@ func (s *Service) consumer(ctx context.Context, env cfg.Config) { log.Println("[*] starting worker with configs") log.Println("[*] wq.concurrency", (len(handlers))*concurrency) - log.Println("[*] Worker started. Press Ctrl+C to gracefully shutdown...") - - <-sigChan - log.Println("[*] Received shutdown signal, initiating graceful shutdown...") + log.Println("[*] Worker started. Waiting for shutdown signal from context...") - cancel() + <-ctx.Done() + log.Println("[*] Context cancelled, initiating graceful shutdown...") done := make(chan struct{}) go func() { diff --git a/cmd/setup/pubsub/http_api.go b/cmd/setup/pubsub/http_api.go index 737d02b..6891979 100644 --- a/cmd/setup/pubsub/http_api.go +++ b/cmd/setup/pubsub/http_api.go @@ -4,6 +4,7 @@ import ( "context" "log" "net/http" + "time" "github.com/IsaacDSC/gqueue/cmd/setup/middleware" "github.com/IsaacDSC/gqueue/internal/app/health" @@ -37,8 +38,11 @@ func (s *Service) startHttpServer(ctx context.Context, env cfg.Config) *http.Ser port := env.PubsubApiPort server := &http.Server{ - Addr: port.String(), - Handler: handler, + Addr: port.String(), + Handler: handler, + ReadTimeout: 10 * time.Second, + WriteTimeout: 200 * time.Millisecond, + IdleTimeout: 60 * time.Second, } log.Printf("[*] Starting Pubsub API server on :%d", port) diff --git a/cmd/setup/task/consumer.go b/cmd/setup/task/consumer.go index 5c6afb9..7858058 100644 --- a/cmd/setup/task/consumer.go +++ b/cmd/setup/task/consumer.go @@ -3,9 +3,6 @@ package task import ( "context" "log" - "os" - "os/signal" - "syscall" "github.com/IsaacDSC/gqueue/cmd/setup/middleware" "github.com/IsaacDSC/gqueue/internal/app/taskapp" @@ -17,8 +14,6 @@ import ( ) func (s *Service) consumer(ctx context.Context, env cfg.Config, asynqCfg asynq.Config) { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) mux := asynq.NewServeMux() mux.Use(middleware.AsynqLogger) @@ -42,9 +37,9 @@ func (s *Service) consumer(ctx context.Context, env cfg.Config, asynqCfg asynq.C } }() - <-sigChan - log.Println("[*] Received shutdown signal, initiating graceful shutdown...") - + // Wait for context cancellation + <-ctx.Done() + log.Println("[*] Context cancelled, initiating graceful shutdown...") s.asynqServer.Shutdown() log.Println("[*] Asynq server stopped gracefully") diff --git a/cmd/setup/task/http_api.go b/cmd/setup/task/http_api.go index b69ebfa..c2bcd53 100644 --- a/cmd/setup/task/http_api.go +++ b/cmd/setup/task/http_api.go @@ -4,6 +4,7 @@ import ( "context" "log" "net/http" + "time" "github.com/IsaacDSC/gqueue/cmd/setup/middleware" "github.com/IsaacDSC/gqueue/internal/app/health" @@ -37,8 +38,11 @@ func (s *Service) startHttpServer(ctx context.Context, env cfg.Config) *http.Ser port := env.TaskApiPort server := &http.Server{ - Addr: port.String(), - Handler: handler, + Addr: port.String(), + Handler: handler, + ReadTimeout: 10 * time.Second, + WriteTimeout: 200 * time.Millisecond, + IdleTimeout: 60 * time.Second, } log.Printf("[*] Starting Task API server on :%d", port) diff --git a/docs/limits.md b/docs/limits.md new file mode 100644 index 0000000..7e3b52b --- /dev/null +++ b/docs/limits.md @@ -0,0 +1,51 @@ +# limit configurations + +> Warning about limits timeouts and rate limits + + +### Producer limits + +https://github.com/IsaacDSC/gqueue/blob/03fc463d0d9f9a3ea3b37f0202708c3807e1503f/pkg/httpclient/client_http_with_logger.go#L93 + +```go +client, err := clienthttp.New("", + clienthttp.WithTimeout(30*time.Second), + clienthttp.WithMaxIdleConns(100), + clienthttp.WithMaxIdleConnsPerHost(2), + clienthttp.WithIdleConnTimeout(90*time.Second), + clienthttp.WithAuditor(auditor), +) +``` + +#### Pubsub +- 200ms timeout response producer message +- 2000 RPS + +#### Task +- 200ms timeout response producer message +- 15 RPS + + +### Consumer limits + +https://github.com/IsaacDSC/gqueue/blob/03fc463d0d9f9a3ea3b37f0202708c3807e1503f/cmd/setup/pubsub/http_api.go#L40 + +```go +server := &http.Server{ + Addr: port.String(), + Handler: handler, + ReadTimeout: 10 * time.Second, + WriteTimeout: 200 * time.Millisecond, + IdleTimeout: 60 * time.Second, +} + +``` + +#### Pubsub +- 200ms timeout response producer message +- 2000 RPS + +#### Task +- 200ms timeout response producer message +- ACK | NACK timeout 5m +- 15 RPS \ No newline at end of file diff --git a/docs/shutdown.md b/docs/shutdown.md new file mode 100644 index 0000000..b7d7d00 --- /dev/null +++ b/docs/shutdown.md @@ -0,0 +1,64 @@ +# Graceful Shutdown in gqueue + +## Overview +This document describes the centralized graceful shutdown in gqueue: how it is triggered, how HTTP servers and workers are stopped, and how to test it. + +## How Shutdown Works + +1. **Signal handling** + In `cmd/api/main.go`, `waitForShutdown` listens for `SIGINT` and `SIGTERM` (and for `ctx.Done()` if the context is cancelled). The main process does not exit until shutdown has finished. + +2. **HTTP servers** + When a signal is received, the code logs `"Shutting down servers..."` and then calls `server.Shutdown(shutdownCtx)` for each HTTP server (Backoffice, PubSub API, Task API). Shutdown uses a **15 second** timeout so in-flight requests can finish. Servers are shut down in sequence. + +3. **Closers** + After all servers have shut down, `main` runs the registered closers (e.g. `pubsub.Close`, `task.Close`). Those closers stop workers and release resources. Only then does the process exit. + +4. **Context** + The global `context.Context` created in `main` is passed into PubSub and Task when they start. Workers and long-running logic should respect this context so they can stop when the process is shutting down (typically as a result of closers running). + +## Configuration + +- **Shutdown timeout** + The timeout used in `waitForShutdown` for `server.Shutdown` is currently **15 seconds** (hardcoded in `cmd/api/main.go`). The config field `ShutdownTimeout` (`env:"SHUTDOWN_TIMEOUT"`, default `30s`) exists in `internal/cfg` but is not yet used there; it can be wired in later if desired. + +- **Concurrency** + Worker concurrency for PubSub and Task is controlled by `AsynqConfig.Concurrency` / `WQ_CONCURRENCY` (see `internal/cfg`). + +## Services and Workers + +- **Backoffice HTTP server** + Started via `backoffice.Start(...)`. Shut down by the central handler with the same 15s timeout. + +- **PubSub** + Started via `pubsub.New(...).Start(ctx, conf)`. The PubSub HTTP server is in the `servers` list and is shut down by `waitForShutdown`. Subscribers and other resources are stopped when the PubSub closer runs. + +- **Task (Asynq)** + Started via `task.New(...).Start(ctx, conf)`. The Task API server is shut down by `waitForShutdown`; the Asynq worker and related resources are stopped when the Task closer runs. + +## Testing + +An integration test checks that shutdown completes correctly when the process receives SIGINT: + +- **Test:** `cmd/api/shutdown_test.go` — `TestShutdownGraceful` +- **What it does:** Builds the API binary, starts it, holds an in-flight HTTP connection to the backoffice server, sends SIGINT, then checks that the process exits with code 0, that the logs show shutdown started and completed, and that shutdown took at least 2 seconds (so the server actually waited for the in-flight connection). +- **Env:** The test skips if `DB_CONNECTION_STRING` or `CACHE_ADDR` are not set. It loads `.env` from the module root (stdlib only, no extra deps) so you can run `go test` with env from a local `.env`. +- **Run:** + - From repo root with env already set (e.g. `.env` or direnv): + `go test -v -run TestShutdownGraceful ./cmd/api/` + - Or use the script that sources `.env-example` and `.env` then runs the test: + `./scripts/run_shutdown_test.sh` + +Requires Redis and Postgres (and other env) to be up so the binary can start. + +## Best Practices + +- Pass the context from `main` into any new goroutine or handler so they can stop when the process is shutting down. +- Rely on the central shutdown in `main`; avoid extra signal handling or ad-hoc shutdown logic in services. +- Prefer operations that can be interrupted via context (e.g. context-aware HTTP clients, `ctx.Done()` in loops). + +## References + +- `cmd/api/main.go` — `waitForShutdown`, `main` (servers, closers, and scope). +- `cmd/setup/pubsub` and `cmd/setup/task` — how services register servers and closers and use the context. +- `cmd/api/shutdown_test.go` — integration test for graceful shutdown. diff --git a/internal/cfg/env.go b/internal/cfg/env.go index 49bc7c3..1f4ad46 100644 --- a/internal/cfg/env.go +++ b/internal/cfg/env.go @@ -58,9 +58,10 @@ type Config struct { InternalBaseURL string `env:"INTERNAL_BASE_URL"` InternalServiceName string `env:"INTERNAL_SERVICE_NAME"` - PubsubApiPort ServerPort `env:"PUBSUB_API_PORT" env-default:"8082"` - TaskApiPort ServerPort `env:"TASK_API_PORT" env-default:"8082"` - BackofficeApiPort ServerPort `env:"BACKOFFICE_API_PORT" env-default:"8081"` + PubsubApiPort ServerPort `env:"PUBSUB_API_PORT" env-default:"8082"` + TaskApiPort ServerPort `env:"TASK_API_PORT" env-default:"8082"` + BackofficeApiPort ServerPort `env:"BACKOFFICE_API_PORT" env-default:"8081"` + ShutdownTimeout time.Duration `env:"SHUTDOWN_TIMEOUT" env-default:"30s"` } var cfg Config diff --git a/pkg/asyncadapter/gbubsub_adapter.go b/pkg/asyncadapter/gbubsub_adapter.go index 53f5767..d792337 100644 --- a/pkg/asyncadapter/gbubsub_adapter.go +++ b/pkg/asyncadapter/gbubsub_adapter.go @@ -36,12 +36,12 @@ func (h Handle[T]) ToGPubSubHandler(pub pubadapter.GenericPublisher) gpubsub.Han retryCount, err := strconv.Atoi(strRetryCount) if err != nil { - panic(err) //TODO: adicionar validação e tratamento de erro melhor + panic(err) //TODO: add better validation and error handling } maxRetryAttempts, err := strconv.Atoi(strMaxRetryCount) if err != nil { - panic(err) //TODO: adicionar validação e tratamento de erro melhor + panic(err) //TODO: add better validation and error handling } if retryCount >= maxRetryAttempts { @@ -52,14 +52,25 @@ func (h Handle[T]) ToGPubSubHandler(pub pubadapter.GenericPublisher) gpubsub.Han retryCount++ msg.Attributes["retry_count"] = strconv.Itoa(retryCount) topic := msg.Attributes["topic"] - time.Sleep(time.Second * 5) + + // Wait respecting the context + select { + case <-time.After(5 * time.Second): + // continue + case <-ctx.Done(): + return + } + + if ctx.Err() != nil { + return + } + if err := pub.Publish(ctx, topic, msg, pubadapter.Opts{ Attributes: msg.Attributes, }); err != nil { msg.Nack() return } - } return gpubsub.Handle{