diff --git a/cmd/setup/backoffice/httpserver.go b/cmd/setup/backoffice/httpserver.go index a1f13a9..2976c5c 100644 --- a/cmd/setup/backoffice/httpserver.go +++ b/cmd/setup/backoffice/httpserver.go @@ -31,8 +31,8 @@ func Start( backoffice.GetHealthCheckHandler(), backoffice.CreateConsumer(cache, store), backoffice.GetEvent(cache, store), - backoffice.GetEvents(cache, store), backoffice.GetPathEventHandle(cache, store), + backoffice.GetEvents(cache, store), backoffice.GetRegisterTaskConsumerArchived(cache, store), backoffice.RemoveEvent(cache, store), backoffice.GetInsightsHandle(insightsStore), diff --git a/deployment/tables.sql b/deployment/tables.sql index 8c68122..90c83de 100644 --- a/deployment/tables.sql +++ b/deployment/tables.sql @@ -5,6 +5,7 @@ CREATE TABLE IF NOT EXISTS events ( service_name VARCHAR(255) NOT NULL, state VARCHAR(100) NOT NULL, consumers JSONB NOT NULL, + opts JSONB NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW(), deleted_at TIMESTAMP NULL diff --git a/example/event_data.json b/example/event_data.json index f6b6b43..b67a0d7 100644 --- a/example/event_data.json +++ b/example/event_data.json @@ -1,6 +1,13 @@ { "name": "payment.charged", "type": "external", + "option": { + "wq_type": "low_throughput", + "max_retries": 3, + "retention": "168h", + "unique_ttl": "60s", + "schedule_in": "100ms" + }, "consumers": [ { "service_name": "external-service", @@ -9,12 +16,6 @@ "path": "/payment/charged", "headers": { "Content-Type": "application/json" - }, - "option": { - "wq_type": "low_throughput", - "max_retries": 3, - "retention": "168h", - "unique_ttl": "60s" } } ] diff --git a/example/publisher_data.json b/example/publisher_data.json index dc07ac8..655bfe9 100644 --- a/example/publisher_data.json +++ b/example/publisher_data.json @@ -6,10 +6,5 @@ }, "metadata": { "correlation_id": "5e4dd662-9eba-4321-9c97-0b4ee0942f8b" - }, - "opts": { - "max_retries": 3, - "wq_type": "external", - "schedule_in": "100ms" } } diff --git a/internal/backoffice/path_event_handle_test.go b/internal/backoffice/path_event_handle_test.go index 885ec90..1e76b79 100644 --- a/internal/backoffice/path_event_handle_test.go +++ b/internal/backoffice/path_event_handle_test.go @@ -38,15 +38,15 @@ func TestGetPathEventHandle(t *testing.T) { ServiceName: "user-service", State: "active", Type: "external", + Option: domain.Opt{ + MaxRetries: 3, + }, Consumers: []domain.Consumer{ { ServiceName: "notification-service", Host: "https://api.example.com", Path: "/webhook", Headers: map[string]string{"Content-Type": "application/json"}, - Option: domain.Opt{ - MaxRetries: 3, - }, }, }, }, diff --git a/internal/backoffice/register_consumer.go b/internal/backoffice/register_consumer.go index c9ac782..c3283c8 100644 --- a/internal/backoffice/register_consumer.go +++ b/internal/backoffice/register_consumer.go @@ -16,6 +16,7 @@ import ( type EventDto struct { Name string `json:"name"` Type domain.Type `json:"type"` + Option domain.Opt `json:"option" bson:"option"` Consumers []domain.Consumer `json:"consumers"` } @@ -27,6 +28,7 @@ func (e *EventDto) ToDomain() domain.Event { Name: e.Name, ServiceName: env.InternalServiceName, Type: e.Type, + Option: e.Option, Consumers: e.Consumers, } } diff --git a/internal/domain/event.go b/internal/domain/event.go index 960855b..b8d2135 100644 --- a/internal/domain/event.go +++ b/internal/domain/event.go @@ -16,6 +16,7 @@ type Event struct { ServiceName string `json:"service_name" bson:"service_name"` State string `json:"state" bson:"state"` Type Type `json:"type" bson:"type"` + Option Opt `json:"option" bson:"option"` Consumers []Consumer `json:"consumers" bson:"consumers"` } @@ -24,10 +25,8 @@ func (e *Event) Validate() error { return fmt.Errorf("invalid event type: %s", e.Type) } - for _, consumer := range e.Consumers { - if err := consumer.Option.Validate(); err != nil { - return fmt.Errorf("invalid consumer option: %w", err) - } + if err := e.Option.Validate(); err != nil { + return fmt.Errorf("invalid event option: %w", err) } return nil @@ -38,7 +37,6 @@ type Consumer struct { Host string `json:"host" bson:"host"` Path string `json:"path" bson:"path"` Headers map[string]string `json:"headers" bson:"headers"` - Option Opt `json:"option" bson:"option"` } type Opt struct { diff --git a/internal/interstore/model_event.go b/internal/interstore/model_event.go index 634994e..f73c72d 100644 --- a/internal/interstore/model_event.go +++ b/internal/interstore/model_event.go @@ -13,17 +13,22 @@ type ModelEvent struct { ServiceName string State string Consumers []byte + Option []byte } func (m ModelEvent) ToDomain() domain.Event { var consumers []domain.Consumer json.Unmarshal(m.Consumers, &consumers) + var option domain.Opt + json.Unmarshal(m.Option, &option) + return domain.Event{ ID: m.ID, Name: m.Name, ServiceName: m.ServiceName, State: m.State, Consumers: consumers, + Option: option, } } diff --git a/internal/interstore/postgres_store.go b/internal/interstore/postgres_store.go index 924800a..2273b21 100644 --- a/internal/interstore/postgres_store.go +++ b/internal/interstore/postgres_store.go @@ -60,6 +60,7 @@ func (r *PostgresStore) GetAllEvents(ctx context.Context) ([]domain.Event, error &event.ServiceName, &event.State, &event.Consumers, + &event.Option, ); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } @@ -77,17 +78,19 @@ func (r *PostgresStore) GetInternalEvent(ctx context.Context, eventName string) l := ctxlogger.GetLogger(ctx) query := ` - SELECT id, name, service_name, consumers + SELECT id, name, service_name, consumers, opts FROM events WHERE name = $1 AND deleted_at IS NULL ` var event domain.Event var consumersJSON []byte + var optsJSON []byte err := r.db.QueryRowContext(ctx, query, eventName).Scan( &event.ID, &event.Name, &event.ServiceName, &consumersJSON, + &optsJSON, ) if errors.Is(err, sql.ErrNoRows) { @@ -104,6 +107,10 @@ func (r *PostgresStore) GetInternalEvent(ctx context.Context, eventName string) return domain.Event{}, fmt.Errorf("failed to unmarshal consumers: %w", err) } + if err := json.Unmarshal(optsJSON, &event.Option); err != nil { + return domain.Event{}, fmt.Errorf("failed to unmarshal event option: %w", err) + } + return event, nil } @@ -140,6 +147,7 @@ func (r *PostgresStore) GetInternalEvents(ctx context.Context, filters domain.Fi &event.ServiceName, &event.State, &event.Consumers, + &event.Option, ); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } @@ -161,13 +169,18 @@ func (r *PostgresStore) Save(ctx context.Context, event domain.Event) error { return fmt.Errorf("failed to marshal consumers: %w", err) } + optsJSON, err := json.Marshal(event.Option) + if err != nil { + return fmt.Errorf("failed to marshal event option: %w", err) + } + if event.State == "" { event.State = "active" } query := ` - INSERT INTO events (id, name, service_name, state, consumers, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO events (id, name, service_name, state, consumers, opts, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ` now := time.Now() @@ -177,6 +190,7 @@ func (r *PostgresStore) Save(ctx context.Context, event domain.Event) error { event.ServiceName, event.State, consumersJSON, + optsJSON, now, now, ) @@ -194,7 +208,8 @@ const modelEventFields = ` name, service_name, state, - consumers + consumers, + opts ` func (r *PostgresStore) GetEventByID(ctx context.Context, eventID uuid.UUID) (domain.Event, error) { @@ -209,6 +224,7 @@ func (r *PostgresStore) GetEventByID(ctx context.Context, eventID uuid.UUID) (do &event.ServiceName, &event.State, &event.Consumers, + &event.Option, ) if errors.Is(err, sql.ErrNoRows) { @@ -252,6 +268,7 @@ func (r *PostgresStore) GetAllSchedulers(ctx context.Context, state string) ([]d &event.ServiceName, &event.State, &event.Consumers, + &event.Option, ); err != nil { l.Error("Error on scan row", "error", err) return nil, fmt.Errorf("failed to scan row: %w", err) @@ -283,7 +300,8 @@ func (r *PostgresStore) UpdateEvent(ctx context.Context, event domain.Event) err events SET name = $2, service_name = $3, state = $4, - consumers = $5 + consumers = $5, + opts = $6 WHERE id = $1 AND deleted_at IS NULL;` consumersJSON, err := json.Marshal(event.Consumers) @@ -291,7 +309,12 @@ func (r *PostgresStore) UpdateEvent(ctx context.Context, event domain.Event) err return fmt.Errorf("failed to marshal consumers: %w", err) } - if _, err := r.db.ExecContext(ctx, query, event.ID, event.Name, event.ServiceName, event.State, consumersJSON); err != nil { + optsJSON, err := json.Marshal(event.Option) + if err != nil { + return fmt.Errorf("failed to marshal event option: %w", err) + } + + if _, err := r.db.ExecContext(ctx, query, event.ID, event.Name, event.ServiceName, event.State, consumersJSON, optsJSON); err != nil { return fmt.Errorf("failed to update event: %w", err) } diff --git a/internal/wtrhandler/external_handle_http.go b/internal/wtrhandler/external_handle_http.go index c243709..c810977 100644 --- a/internal/wtrhandler/external_handle_http.go +++ b/internal/wtrhandler/external_handle_http.go @@ -87,8 +87,8 @@ func PublisherEvent( eventType = domain.EventTypeInternal.String() } + config := event.Option.ToAsynqOptions() for _, consumer := range event.Consumers { - config := consumer.Option.ToAsynqOptions() input := RequestPayload{ EventName: event.Name, diff --git a/internal/wtrhandler/external_handle_http_test.go b/internal/wtrhandler/external_handle_http_test.go index df69f9a..0560330 100644 --- a/internal/wtrhandler/external_handle_http_test.go +++ b/internal/wtrhandler/external_handle_http_test.go @@ -8,11 +8,9 @@ import ( "net/http" "net/http/httptest" "testing" - "time" "github.com/IsaacDSC/gqueue/internal/cfg" "github.com/IsaacDSC/gqueue/internal/domain" - "github.com/IsaacDSC/gqueue/pkg/intertime" "github.com/IsaacDSC/gqueue/pkg/pubadapter" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" @@ -54,10 +52,6 @@ func TestGetExternalHandle(t *testing.T) { "Content-Type": "application/json", }, }, - Opts: domain.Opt{ - MaxRetries: 3, - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "user.created").Return(domain.Event{ @@ -114,13 +108,6 @@ func TestGetExternalHandle(t *testing.T) { "X-Correlation": "corr_def456", }, }, - Opts: domain.Opt{ - MaxRetries: 5, - Retention: intertime.Duration(24 * time.Hour), - UniqueTTL: intertime.Duration(1 * time.Hour), - ScheduleIn: intertime.Duration(30 * time.Second), - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "order.completed").Return(domain.Event{ @@ -174,11 +161,6 @@ func TestGetExternalHandle(t *testing.T) { "X-Service": "notification-worker", }, }, - Opts: domain.Opt{ - MaxRetries: 2, - Deadline: func() *time.Time { t := time.Now().Add(5 * time.Minute); return &t }(), - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "notification.send").Return(domain.Event{ @@ -227,10 +209,6 @@ func TestGetExternalHandle(t *testing.T) { Version: "1.0", Environment: "test", }, - Opts: domain.Opt{ - MaxRetries: 3, - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "payment.failed").Return(domain.Event{ @@ -273,9 +251,6 @@ func TestGetExternalHandle(t *testing.T) { Version: "1.0", Environment: "test", }, - Opts: domain.Opt{ - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "system.ping").Return(domain.Event{ @@ -319,9 +294,6 @@ func TestGetExternalHandle(t *testing.T) { Version: "1.0", Environment: "test", }, - Opts: domain.Opt{ - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "nonexistent.event").Return(domain.Event{}, domain.EventNotFound).Times(1) @@ -344,9 +316,6 @@ func TestGetExternalHandle(t *testing.T) { Version: "1.0", Environment: "test", }, - Opts: domain.Opt{ - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "error.event").Return(domain.Event{}, errors.New("database connection error")).Times(1) @@ -369,9 +338,6 @@ func TestGetExternalHandle(t *testing.T) { Version: "1.0", Environment: "test", }, - Opts: domain.Opt{ - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "no.consumers.event").Return(domain.Event{ @@ -401,10 +367,6 @@ func TestGetExternalHandle(t *testing.T) { "Content-Type": "application/json", }, }, - Opts: domain.Opt{ - MaxRetries: 3, - WqType: pubadapter.LowLatency, - }, }, setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "multi.consumer.event").Return(domain.Event{ diff --git a/internal/wtrhandler/internal_payload.go b/internal/wtrhandler/internal_payload.go index a19cb69..b49fef9 100644 --- a/internal/wtrhandler/internal_payload.go +++ b/internal/wtrhandler/internal_payload.go @@ -3,16 +3,12 @@ package wtrhandler import ( "encoding/json" "fmt" - - "github.com/IsaacDSC/gqueue/internal/domain" - "github.com/hibiken/asynq" ) type InternalPayload struct { - EventName string `json:"event_name"` - Data Data `json:"data"` - Metadata Metadata `json:"metadata"` - Opts domain.Opt `json:"opts"` + EventName string `json:"event_name"` + Data Data `json:"data"` + Metadata Metadata `json:"metadata"` } func (p InternalPayload) Validate() error { @@ -27,16 +23,6 @@ func (p InternalPayload) Validate() error { return nil } -func (p InternalPayload) AsynqOpts() []asynq.Option { - return p.Opts.ToAsynqOptions() -} - -func (p InternalPayload) Attributes(topicName string) map[string]string { - opts := p.Opts.Attributes() - opts["topic"] = topicName - return opts -} - type Metadata struct { Source string `json:"source"` Version string `json:"version"`