diff --git a/README.md b/README.md index 8cecccf..d2640f0 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,7 @@ graph TB Consumers[Consumer Services] %% Flows - Services -->|Create Events
Register Triggers
Publish Events| API + Services -->|Create Events
Register Consumers
Publish Events| API API --> Database API --> Queue Queue --> Worker diff --git a/deployment/dashboard.sql b/deployment/dashboard.sql index 8920e6d..9c79123 100644 --- a/deployment/dashboard.sql +++ b/deployment/dashboard.sql @@ -1,10 +1,10 @@ -- CONSUMERS INSCRITOS NOS TOPICOS SELECT - e.unique_key as topic, + e.name as topic, CONCAT(elem->>'host', elem->>'path') AS consumer FROM events AS e -CROSS JOIN LATERAL jsonb_array_elements(e.triggers) AS elem +CROSS JOIN LATERAL jsonb_array_elements(e.consumers) AS elem WHERE e.deleted_at IS NULL - AND jsonb_typeof(e.triggers) = 'array' + AND jsonb_typeof(e.consumers) = 'array' AND elem ? 'host' ORDER BY topic desc; diff --git a/deployment/tables.sql b/deployment/tables.sql index 1b2dbf5..8c68122 100644 --- a/deployment/tables.sql +++ b/deployment/tables.sql @@ -1,11 +1,10 @@ -- Internal Events Table CREATE TABLE IF NOT EXISTS events ( id UUID PRIMARY KEY, - unique_key TEXT NOT NULL UNIQUE, - name VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL UNIQUE, service_name VARCHAR(255) NOT NULL, state VARCHAR(100) NOT NULL, - triggers JSONB NOT NULL, + consumers JSONB NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW(), deleted_at TIMESTAMP NULL diff --git a/example/event_data.json b/example/event_data.json index 9258d7e..f6b6b43 100644 --- a/example/event_data.json +++ b/example/event_data.json @@ -1,7 +1,7 @@ { "name": "payment.charged", "type": "external", - "triggers": [ + "consumers": [ { "service_name": "external-service", "type": "persistent", diff --git a/example/path_event_data.json b/example/path_event_data.json index 1cfdcd7..81a61fd 100644 --- a/example/path_event_data.json +++ b/example/path_event_data.json @@ -1,9 +1,7 @@ { "name": "payment.processed", "service_name": "my-app", - "repo_url": "http://github.com/my-org/my-repo", - "team_owner": "my-team", - "triggers": [ + "consumers": [ { "service_name": "consumer-1", "type": "persistent", diff --git a/internal/asynqstore/cache_store.go b/internal/asynqstore/cache_store.go index e7697a7..684615f 100644 --- a/internal/asynqstore/cache_store.go +++ b/internal/asynqstore/cache_store.go @@ -13,7 +13,7 @@ import ( ) type Cacher interface { - FindAllTriggers(ctx context.Context) ([]domain.Event, error) + FindAllConsumers(ctx context.Context) ([]domain.Event, error) FindAllQueues(ctx context.Context) ([]asynqtask.Queue, error) FindArchivedTasks(ctx context.Context, queue string) ([]string, error) GetMsgArchivedTask(ctx context.Context, queue, task string) (asynqtask.RawMsg, error) @@ -34,7 +34,7 @@ var _ Cacher = (*Cache)(nil) const archivedKey = "gqueue:consumers:schedule:archived" -func (c Cache) FindAllTriggers(ctx context.Context) ([]domain.Event, error) { +func (c Cache) FindAllConsumers(ctx context.Context) ([]domain.Event, error) { cResult, err := c.cache.Get(ctx, archivedKey).Result() if errors.Is(err, redis.Nil) { return nil, asynqtask.ErrorNotFound diff --git a/internal/asynqtask/archived_task.go b/internal/asynqtask/archived_task.go index 4d91b99..e8964ea 100644 --- a/internal/asynqtask/archived_task.go +++ b/internal/asynqtask/archived_task.go @@ -11,7 +11,7 @@ import ( ) type CacheManager interface { - FindAllTriggers(ctx context.Context) ([]domain.Event, error) + FindAllConsumers(ctx context.Context) ([]domain.Event, error) FindAllQueues(ctx context.Context) ([]Queue, error) FindArchivedTasks(ctx context.Context, queue string) ([]string, error) GetMsgArchivedTask(ctx context.Context, queue, task string) (RawMsg, error) @@ -42,7 +42,7 @@ func (n TaskManager) NotifyListeners(ctx context.Context) error { l := ctxlogger.GetLogger(ctx) l.Debug("Starting notification process") - events, err := n.cm.FindAllTriggers(ctx) + events, err := n.cm.FindAllConsumers(ctx) if errors.Is(err, ErrorNotFound) { results, err := n.store.GetAllSchedulers(ctx, "archived") if errors.Is(domain.EventNotFound, err) { @@ -80,7 +80,7 @@ func (n TaskManager) NotifyListeners(ctx context.Context) error { QueueName: msg.Queue, Tasks: msg.Tasks, Data: msg.Msg["data"], - Schedulers: events.Triggers, + Schedulers: events.Consumers, }) } } diff --git a/internal/asynqtask/fetch_msg_data.go b/internal/asynqtask/fetch_msg_data.go index 97c7950..6c49b41 100644 --- a/internal/asynqtask/fetch_msg_data.go +++ b/internal/asynqtask/fetch_msg_data.go @@ -7,7 +7,7 @@ type FetchMsg struct { QueueName string Tasks []string Data any - Schedulers []domain.Trigger + Schedulers []domain.Consumer } type TaskArchivedData struct { diff --git a/internal/backoffice/get_event.go b/internal/backoffice/get_event.go index d5e588b..0c2f7e2 100644 --- a/internal/backoffice/get_event.go +++ b/internal/backoffice/get_event.go @@ -24,7 +24,7 @@ func GetEvent(cc cachemanager.Cache, repo Repository) httpadapter.HttpHandle { var event domain.Event if err := cc.Once(ctx, key, &event, defaultTTL, func(ctx context.Context) (any, error) { - return repo.GetInternalEvent(ctx, eventName, serviceName, "active") + return repo.GetInternalEvent(ctx, eventName) }); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/internal/backoffice/interfaces.go b/internal/backoffice/interfaces.go index e471b05..7861845 100644 --- a/internal/backoffice/interfaces.go +++ b/internal/backoffice/interfaces.go @@ -9,7 +9,7 @@ import ( type Repository interface { Save(ctx context.Context, event domain.Event) error - GetInternalEvent(ctx context.Context, eventName, serviceName string, state string) (domain.Event, error) + GetInternalEvent(ctx context.Context, eventName string) (domain.Event, error) GetInternalEvents(ctx context.Context, filters domain.FilterEvents) ([]domain.Event, error) DisabledEvent(ctx context.Context, eventID uuid.UUID) error UpdateEvent(ctx context.Context, event domain.Event) error diff --git a/internal/backoffice/path_event_handle_test.go b/internal/backoffice/path_event_handle_test.go index 3396f91..885ec90 100644 --- a/internal/backoffice/path_event_handle_test.go +++ b/internal/backoffice/path_event_handle_test.go @@ -38,7 +38,7 @@ func TestGetPathEventHandle(t *testing.T) { ServiceName: "user-service", State: "active", Type: "external", - Triggers: []domain.Trigger{ + Consumers: []domain.Consumer{ { ServiceName: "notification-service", Host: "https://api.example.com", diff --git a/internal/backoffice/register_consumer.go b/internal/backoffice/register_consumer.go index 74cef7e..c9ac782 100644 --- a/internal/backoffice/register_consumer.go +++ b/internal/backoffice/register_consumer.go @@ -14,9 +14,9 @@ import ( ) type EventDto struct { - Name string `json:"name"` - Type domain.Type `json:"type"` - Triggers []domain.Trigger `json:"triggers"` + Name string `json:"name"` + Type domain.Type `json:"type"` + Consumers []domain.Consumer `json:"consumers"` } func (e *EventDto) ToDomain() domain.Event { @@ -27,7 +27,7 @@ func (e *EventDto) ToDomain() domain.Event { Name: e.Name, ServiceName: env.InternalServiceName, Type: e.Type, - Triggers: e.Triggers, + Consumers: e.Consumers, } } diff --git a/internal/backoffice/repository_mock.go b/internal/backoffice/repository_mock.go index 7f39b3a..24cfacb 100644 --- a/internal/backoffice/repository_mock.go +++ b/internal/backoffice/repository_mock.go @@ -72,18 +72,18 @@ func (mr *MockRepositoryMockRecorder) GetEventByID(ctx, eventID any) *gomock.Cal } // GetInternalEvent mocks base method. -func (m *MockRepository) GetInternalEvent(ctx context.Context, eventName, serviceName, state string) (domain.Event, error) { +func (m *MockRepository) GetInternalEvent(ctx context.Context, eventName string) (domain.Event, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetInternalEvent", ctx, eventName, serviceName, state) + ret := m.ctrl.Call(m, "GetInternalEvent", ctx, eventName) ret0, _ := ret[0].(domain.Event) ret1, _ := ret[1].(error) return ret0, ret1 } // GetInternalEvent indicates an expected call of GetInternalEvent. -func (mr *MockRepositoryMockRecorder) GetInternalEvent(ctx, eventName, serviceName, state any) *gomock.Call { +func (mr *MockRepositoryMockRecorder) GetInternalEvent(ctx, eventName any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInternalEvent", reflect.TypeOf((*MockRepository)(nil).GetInternalEvent), ctx, eventName, serviceName, state) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInternalEvent", reflect.TypeOf((*MockRepository)(nil).GetInternalEvent), ctx, eventName) } // GetInternalEvents mocks base method. diff --git a/internal/domain/event.go b/internal/domain/event.go index 3bdae18..960855b 100644 --- a/internal/domain/event.go +++ b/internal/domain/event.go @@ -11,29 +11,29 @@ import ( ) type Event struct { - ID uuid.UUID `json:"id" bson:"id"` - Name string `json:"name" bson:"name"` - ServiceName string `json:"service_name" bson:"service_name"` - State string `json:"state" bson:"state"` - Type Type `json:"type" bson:"type"` - Triggers []Trigger `json:"triggers" bson:"triggers"` + ID uuid.UUID `json:"id" bson:"id"` + Name string `json:"name" bson:"name"` + ServiceName string `json:"service_name" bson:"service_name"` + State string `json:"state" bson:"state"` + Type Type `json:"type" bson:"type"` + Consumers []Consumer `json:"consumers" bson:"consumers"` } func (e *Event) Validate() error { - if e.Type != TriggerTypeInternal && e.Type != TriggerTypeExternal { + if e.Type != EventTypeInternal && e.Type != EventTypeExternal { return fmt.Errorf("invalid event type: %s", e.Type) } - for _, trigger := range e.Triggers { - if err := trigger.Option.Validate(); err != nil { - return fmt.Errorf("invalid trigger option: %w", err) + for _, consumer := range e.Consumers { + if err := consumer.Option.Validate(); err != nil { + return fmt.Errorf("invalid consumer option: %w", err) } } return nil } -type Trigger struct { +type Consumer struct { ServiceName string `json:"service_name" bson:"service_name"` Host string `json:"host" bson:"host"` Path string `json:"path" bson:"path"` @@ -60,8 +60,8 @@ func (vt Type) String() string { } const ( - TriggerTypeInternal Type = "internal" - TriggerTypeExternal Type = "external" + EventTypeInternal Type = "internal" + EventTypeExternal Type = "external" ) func (o Opt) Validate() error { diff --git a/internal/fetcher/notification.go b/internal/fetcher/notification.go index a397f7a..ae24839 100644 --- a/internal/fetcher/notification.go +++ b/internal/fetcher/notification.go @@ -17,8 +17,8 @@ func NewNotification() *Notification { return &Notification{} } -func (n Notification) NotifyTrigger(ctx context.Context, data map[string]any, headers map[string]string, trigger wtrhandler.Trigger) error { - url := trigger.GetUrl() +func (n Notification) Notify(ctx context.Context, data map[string]any, headers map[string]string, consumer wtrhandler.Consumer) error { + url := consumer.GetUrl() return fetch(ctx, url, data, headers) } @@ -50,7 +50,7 @@ func fetch(ctx context.Context, url string, data any, headers map[string]string) req.Header.Set(key, value) } - // #nosec G704 -- SSRF is intentional: this function sends webhooks to user-configured trigger endpoints + // #nosec G704 -- SSRF is intentional: this function sends webhooks to user-configured consumers endpoints resp, err := client.Do(req) if err != nil { return fmt.Errorf("post request: %w", err) diff --git a/internal/fetcher/notification_test.go b/internal/fetcher/notification_test.go index 213ed3e..fa1923a 100644 --- a/internal/fetcher/notification_test.go +++ b/internal/fetcher/notification_test.go @@ -28,7 +28,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { name string data map[string]any headers map[string]string - trigger wtrhandler.Trigger + consumer wtrhandler.Consumer serverResponse serverResponse wantErr bool errContains string @@ -44,7 +44,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "Authorization": "Bearer token123", "X-Custom": "custom-value", }, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "test-service", BaseUrl: "", Path: "/webhook", @@ -64,7 +64,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { headers: map[string]string{ "Content-Type": "application/json", }, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "user-service", BaseUrl: "", Path: "/users/webhook", @@ -81,7 +81,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "ping": "pong", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "ping-service", BaseUrl: "", Path: "/ping", @@ -98,7 +98,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "test": "boundary", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "boundary-service", BaseUrl: "", Path: "/boundary", @@ -115,7 +115,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "invalid": "data", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "validation-service", BaseUrl: "", Path: "/validate", @@ -133,7 +133,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "sensitive": "data", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "auth-service", BaseUrl: "", Path: "/secure", @@ -151,7 +151,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "event": "not-found", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "missing-service", BaseUrl: "", Path: "/missing", @@ -169,7 +169,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "event": "server-error", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "error-service", BaseUrl: "", Path: "/error", @@ -187,7 +187,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "redirect": "test", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "redirect-service", BaseUrl: "", Path: "/redirect", @@ -205,7 +205,7 @@ func TestNotification_NotifyTrigger(t *testing.T) { "complex": "url", }, headers: map[string]string{}, - trigger: wtrhandler.Trigger{ + consumer: wtrhandler.Consumer{ ServiceName: "complex-service", BaseUrl: "", Path: "/api/v1/webhooks", @@ -223,24 +223,24 @@ func TestNotification_NotifyTrigger(t *testing.T) { server := createTestServer(t, tt.serverResponse) defer server.Close() - tt.trigger.BaseUrl = server.URL + tt.consumer.BaseUrl = server.URL notification := NewNotification() ctx := context.Background() - err := notification.NotifyTrigger(ctx, tt.data, tt.headers, tt.trigger) + err := notification.Notify(ctx, tt.data, tt.headers, tt.consumer) if tt.wantErr { if err == nil { - t.Errorf("NotifyTrigger() expected error but got none") + t.Errorf("Notify() expected error but got none") return } if tt.errContains != "" && !containsString(err.Error(), tt.errContains) { - t.Errorf("NotifyTrigger() error = %v, want error containing %v", err, tt.errContains) + t.Errorf("Notify() error = %v, want error containing %v", err, tt.errContains) } } else { if err != nil { - t.Errorf("NotifyTrigger() unexpected error = %v", err) + t.Errorf("Notify() unexpected error = %v", err) } } }) @@ -255,20 +255,20 @@ func TestNotification_NotifyTrigger_InvalidData(t *testing.T) { "channel": make(chan int), } - trigger := wtrhandler.Trigger{ + trigger := wtrhandler.Consumer{ ServiceName: "test-service", BaseUrl: "http://example.com", Path: "/webhook", } - err := notification.NotifyTrigger(ctx, invalidData, nil, trigger) + err := notification.Notify(ctx, invalidData, nil, trigger) if err == nil { - t.Error("NotifyTrigger() expected error for invalid data but got none") + t.Error("Notify() expected error for invalid data but got none") return } if !containsString(err.Error(), "marshal data") { - t.Errorf("NotifyTrigger() error = %v, want error containing 'marshal data'", err) + t.Errorf("Notify() error = %v, want error containing 'marshal data'", err) } } diff --git a/internal/interstore/model_event.go b/internal/interstore/model_event.go index d743e47..634994e 100644 --- a/internal/interstore/model_event.go +++ b/internal/interstore/model_event.go @@ -12,18 +12,18 @@ type ModelEvent struct { Name string ServiceName string State string - Triggers []byte + Consumers []byte } func (m ModelEvent) ToDomain() domain.Event { - var triggers []domain.Trigger - json.Unmarshal(m.Triggers, &triggers) + var consumers []domain.Consumer + json.Unmarshal(m.Consumers, &consumers) return domain.Event{ ID: m.ID, Name: m.Name, ServiceName: m.ServiceName, State: m.State, - Triggers: triggers, + Consumers: consumers, } } diff --git a/internal/interstore/postgres_store.go b/internal/interstore/postgres_store.go index f3d9df5..924800a 100644 --- a/internal/interstore/postgres_store.go +++ b/internal/interstore/postgres_store.go @@ -59,7 +59,7 @@ func (r *PostgresStore) GetAllEvents(ctx context.Context) ([]domain.Event, error &event.Name, &event.ServiceName, &event.State, - &event.Triggers, + &event.Consumers, ); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } @@ -73,36 +73,35 @@ func (r *PostgresStore) GetAllEvents(ctx context.Context) ([]domain.Event, error return events, nil } -func (r *PostgresStore) GetInternalEvent(ctx context.Context, eventName, serviceName string, state string) (domain.Event, error) { +func (r *PostgresStore) GetInternalEvent(ctx context.Context, eventName string) (domain.Event, error) { l := ctxlogger.GetLogger(ctx) - uniqueKey := r.getUniqueKey(eventName, serviceName, state) - query := ` - SELECT id, name, service_name, triggers + SELECT id, name, service_name, consumers FROM events - WHERE unique_key = $1 AND deleted_at IS NULL + WHERE name = $1 AND deleted_at IS NULL ` var event domain.Event - var triggersJSON []byte - err := r.db.QueryRowContext(ctx, query, uniqueKey).Scan( + var consumersJSON []byte + err := r.db.QueryRowContext(ctx, query, eventName).Scan( &event.ID, &event.Name, &event.ServiceName, - &triggersJSON, + &consumersJSON, ) + if errors.Is(err, sql.ErrNoRows) { + l.Warn("No documents found", "event_name", eventName) + return domain.Event{}, domain.EventNotFound + } + if err != nil { - if errors.Is(err, sql.ErrNoRows) { - l.Warn("No documents found", "unique key", uniqueKey) - return domain.Event{}, domain.EventNotFound - } - l.Error("Error on get internal event by unique key", "error", err) + l.Error("Error on get internal event by event_name", "error", err) return domain.Event{}, fmt.Errorf("failed to get internal event: %w", err) } - if err := json.Unmarshal(triggersJSON, &event.Triggers); err != nil { - return domain.Event{}, fmt.Errorf("failed to unmarshal triggers: %w", err) + if err := json.Unmarshal(consumersJSON, &event.Consumers); err != nil { + return domain.Event{}, fmt.Errorf("failed to unmarshal consumers: %w", err) } return event, nil @@ -140,7 +139,7 @@ func (r *PostgresStore) GetInternalEvents(ctx context.Context, filters domain.Fi &event.Name, &event.ServiceName, &event.State, - &event.Triggers, + &event.Consumers, ); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } @@ -157,30 +156,27 @@ func (r *PostgresStore) GetInternalEvents(ctx context.Context, filters domain.Fi func (r *PostgresStore) Save(ctx context.Context, event domain.Event) error { l := ctxlogger.GetLogger(ctx) - triggersJSON, err := json.Marshal(event.Triggers) + consumersJSON, err := json.Marshal(event.Consumers) if err != nil { - return fmt.Errorf("failed to marshal triggers: %w", err) + return fmt.Errorf("failed to marshal consumers: %w", err) } if event.State == "" { event.State = "active" } - uniqueKey := r.getUniqueKey(event.Name, event.ServiceName, event.State) - query := ` - INSERT INTO events (id, unique_key, name, service_name, state, triggers, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + INSERT INTO events (id, name, service_name, state, consumers, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) ` now := time.Now() _, err = r.db.ExecContext(ctx, query, uuid.New(), - uniqueKey, event.Name, event.ServiceName, event.State, - triggersJSON, + consumersJSON, now, now, ) @@ -198,7 +194,7 @@ const modelEventFields = ` name, service_name, state, - triggers + consumers ` func (r *PostgresStore) GetEventByID(ctx context.Context, eventID uuid.UUID) (domain.Event, error) { @@ -212,7 +208,7 @@ func (r *PostgresStore) GetEventByID(ctx context.Context, eventID uuid.UUID) (do &event.Name, &event.ServiceName, &event.State, - &event.Triggers, + &event.Consumers, ) if errors.Is(err, sql.ErrNoRows) { @@ -255,7 +251,7 @@ func (r *PostgresStore) GetAllSchedulers(ctx context.Context, state string) ([]d &event.Name, &event.ServiceName, &event.State, - &event.Triggers, + &event.Consumers, ); err != nil { l.Error("Error on scan row", "error", err) return nil, fmt.Errorf("failed to scan row: %w", err) @@ -272,8 +268,8 @@ func (r *PostgresStore) GetAllSchedulers(ctx context.Context, state string) ([]d } func (r *PostgresStore) DisabledEvent(ctx context.Context, eventID uuid.UUID) error { - query := `UPDATE events SET state = 'disabled', unique_key = $2, deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL;` - _, err := r.db.Exec(query, eventID, fmt.Sprintf("disabled.%s", uuid.New().String())) + query := `UPDATE events SET state = 'disabled', deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL;` + _, err := r.db.Exec(query, eventID) if err != nil { return fmt.Errorf("failed to disable event: %w", err) } @@ -287,21 +283,17 @@ func (r *PostgresStore) UpdateEvent(ctx context.Context, event domain.Event) err events SET name = $2, service_name = $3, state = $4, - triggers = $5 + consumers = $5 WHERE id = $1 AND deleted_at IS NULL;` - triggersJSON, err := json.Marshal(event.Triggers) + consumersJSON, err := json.Marshal(event.Consumers) if err != nil { - return fmt.Errorf("failed to marshal triggers: %w", err) + return fmt.Errorf("failed to marshal consumers: %w", err) } - if _, err := r.db.ExecContext(ctx, query, event.ID, event.Name, event.ServiceName, event.State, triggersJSON); err != nil { + if _, err := r.db.ExecContext(ctx, query, event.ID, event.Name, event.ServiceName, event.State, consumersJSON); err != nil { return fmt.Errorf("failed to update event: %w", err) } return nil } - -func (r *PostgresStore) getUniqueKey(eventName, serviceName, state string) string { - return fmt.Sprintf("%s:%s:%s", eventName, serviceName, state) -} diff --git a/internal/interstore/store.go b/internal/interstore/store.go index 472510a..8e6392c 100644 --- a/internal/interstore/store.go +++ b/internal/interstore/store.go @@ -9,7 +9,7 @@ import ( type Repository interface { Save(ctx context.Context, event domain.Event) error - GetInternalEvent(ctx context.Context, eventName, serviceName string, state string) (domain.Event, error) + GetInternalEvent(ctx context.Context, eventName string) (domain.Event, error) GetInternalEvents(ctx context.Context, filters domain.FilterEvents) ([]domain.Event, error) DisabledEvent(ctx context.Context, eventID uuid.UUID) error UpdateEvent(ctx context.Context, event domain.Event) error diff --git a/internal/wtrhandler/deadletter_asynq_handle.go b/internal/wtrhandler/deadletter_asynq_handle.go index fa8c80f..d86448a 100644 --- a/internal/wtrhandler/deadletter_asynq_handle.go +++ b/internal/wtrhandler/deadletter_asynq_handle.go @@ -38,18 +38,18 @@ func NewDeadLatterQueue(store DeadLetterStore, fetcher Fetcher) asyncadapter.Han } for _, event := range events { - for _, trigger := range event.Triggers { - fetcher.NotifyTrigger(ctx, map[string]any{ + for _, consumer := range event.Consumers { + fetcher.Notify(ctx, map[string]any{ "event": event.Name, "id": p.ID, "data": p.Data, "metadata": p.Attributes, "event_at": p.PublishTime, - }, trigger.Headers, Trigger{ - ServiceName: trigger.ServiceName, - BaseUrl: trigger.Host, - Path: trigger.Path, - Headers: trigger.Headers, + }, consumer.Headers, Consumer{ + ServiceName: consumer.ServiceName, + BaseUrl: consumer.Host, + Path: consumer.Path, + Headers: consumer.Headers, }) } } diff --git a/internal/wtrhandler/deadletter_asynq_handle_test.go b/internal/wtrhandler/deadletter_asynq_handle_test.go index b36598d..78c040a 100644 --- a/internal/wtrhandler/deadletter_asynq_handle_test.go +++ b/internal/wtrhandler/deadletter_asynq_handle_test.go @@ -24,9 +24,9 @@ func init() { // notifyCall struct to track calls made to the mock fetcher type notifyCall struct { - data map[string]any - headers map[string]string - trigger Trigger + data map[string]any + headers map[string]string + consumer Consumer } // mockFetcherWithCalls wraps MockFetcher to track calls @@ -35,13 +35,13 @@ type mockFetcherWithCalls struct { notifyCalls []notifyCall } -func (m *mockFetcherWithCalls) NotifyTrigger(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { +func (m *mockFetcherWithCalls) Notify(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { m.notifyCalls = append(m.notifyCalls, notifyCall{ - data: data, - headers: headers, - trigger: trigger, + data: data, + headers: headers, + consumer: consumer, }) - return m.MockFetcher.NotifyTrigger(ctx, data, headers, trigger) + return m.MockFetcher.Notify(ctx, data, headers, consumer) } func TestNewDeadLatterQueue(t *testing.T) { @@ -98,7 +98,7 @@ func TestDeadLetterQueue_Handler_Success(t *testing.T) { Name: "user.created", ServiceName: "user-service", State: "archived", - Triggers: []domain.Trigger{ + Consumers: []domain.Consumer{ { ServiceName: "notification-service", Host: "http://localhost:8080", @@ -122,7 +122,7 @@ func TestDeadLetterQueue_Handler_Success(t *testing.T) { Name: "order.completed", ServiceName: "order-service", State: "archived", - Triggers: []domain.Trigger{ + Consumers: []domain.Consumer{ { ServiceName: "email-service", Host: "http://localhost:8082", @@ -146,9 +146,9 @@ func TestDeadLetterQueue_Handler_Success(t *testing.T) { notifyCalls: []notifyCall{}, } - // Expect 3 calls to NotifyTrigger (2 triggers from first event + 1 trigger from second event) + // Expect 3 calls to Notify (2 consumers from first event + 1 consumer from second event) mockFetcher.MockFetcher.EXPECT(). - NotifyTrigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Notify(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil). Times(3) @@ -163,11 +163,11 @@ func TestDeadLetterQueue_Handler_Success(t *testing.T) { require.NoError(t, err) - // Verify that NotifyTrigger was called for each trigger in each event - expectedCalls := 3 // 2 triggers from first event + 1 trigger from second event + // Verify that Notify was called for each consumer in each event + expectedCalls := 3 // 2 consumers from first event + 1 consumer from second event assert.Len(t, mockFetcher.notifyCalls, expectedCalls) - // Verify first call (first trigger of first event) + // Verify first call (first consumer of first event) firstCall := mockFetcher.notifyCalls[0] assert.Equal(t, "user.created", firstCall.data["event"]) assert.Equal(t, "test-message-id", firstCall.data["id"]) @@ -176,14 +176,14 @@ func TestDeadLetterQueue_Handler_Success(t *testing.T) { // Don't compare exact time due to marshaling precision loss assert.NotNil(t, firstCall.data["event_at"]) assert.Equal(t, map[string]string{"Content-Type": "application/json", "Authorization": "Bearer token"}, firstCall.headers) - assert.Equal(t, Trigger{ + assert.Equal(t, Consumer{ ServiceName: "notification-service", BaseUrl: "http://localhost:8080", Path: "/webhook/user-created", Headers: map[string]string{"Content-Type": "application/json", "Authorization": "Bearer token"}, - }, firstCall.trigger) + }, firstCall.consumer) - // Verify second call (second trigger of first event) + // Verify second call (second consumer of first event) secondCall := mockFetcher.notifyCalls[1] assert.Equal(t, "user.created", secondCall.data["event"]) assert.Equal(t, "test-message-id", secondCall.data["id"]) @@ -191,14 +191,14 @@ func TestDeadLetterQueue_Handler_Success(t *testing.T) { assert.Equal(t, map[string]string{"source": "api", "version": "1.0"}, secondCall.data["metadata"]) assert.NotNil(t, secondCall.data["event_at"]) assert.Equal(t, map[string]string{"X-API-Key": "analytics-key"}, secondCall.headers) - assert.Equal(t, Trigger{ + assert.Equal(t, Consumer{ ServiceName: "analytics-service", BaseUrl: "http://localhost:8081", Path: "/analytics/event", Headers: map[string]string{"X-API-Key": "analytics-key"}, - }, secondCall.trigger) + }, secondCall.consumer) - // Verify third call (first trigger of second event) + // Verify third call (first consumer of second event) thirdCall := mockFetcher.notifyCalls[2] assert.Equal(t, "order.completed", thirdCall.data["event"]) assert.Equal(t, "test-message-id", thirdCall.data["id"]) @@ -206,12 +206,12 @@ func TestDeadLetterQueue_Handler_Success(t *testing.T) { assert.Equal(t, map[string]string{"source": "api", "version": "1.0"}, thirdCall.data["metadata"]) assert.NotNil(t, thirdCall.data["event_at"]) assert.Equal(t, map[string]string{"Content-Type": "application/json"}, thirdCall.headers) - assert.Equal(t, Trigger{ + assert.Equal(t, Consumer{ ServiceName: "email-service", BaseUrl: "http://localhost:8082", Path: "/send-confirmation", Headers: map[string]string{"Content-Type": "application/json"}, - }, thirdCall.trigger) + }, thirdCall.consumer) } func TestDeadLetterQueue_Handler_EventNotFound(t *testing.T) { @@ -248,7 +248,7 @@ func TestDeadLetterQueue_Handler_EventNotFound(t *testing.T) { // Should return nil when EventNotFound require.NoError(t, err) - // Verify that NotifyTrigger was not called + // Verify that Notify was not called assert.Empty(t, mockFetcher.notifyCalls) } @@ -289,7 +289,7 @@ func TestDeadLetterQueue_Handler_StoreError(t *testing.T) { assert.Contains(t, err.Error(), "failed to get all schedulers:") assert.Contains(t, err.Error(), "database connection failed") - // Verify that NotifyTrigger was not called + // Verify that Notify was not called assert.Empty(t, mockFetcher.notifyCalls) } @@ -343,11 +343,11 @@ func TestDeadLetterQueue_Handler_EmptyEvents(t *testing.T) { require.NoError(t, err) - // Verify that NotifyTrigger was not called since there are no events + // Verify that Notify was not called since there are no events assert.Empty(t, mockFetcher.notifyCalls) } -func TestDeadLetterQueue_Handler_EventsWithNoTriggers(t *testing.T) { +func TestDeadLetterQueue_Handler_EventsWithNoConsumers(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -360,10 +360,10 @@ func TestDeadLetterQueue_Handler_EventsWithNoTriggers(t *testing.T) { mockEvents := []domain.Event{ { - Name: "event.without.triggers", + Name: "event.without.consumers", ServiceName: "test-service", State: "archived", - Triggers: []domain.Trigger{}, // No triggers + Consumers: []domain.Consumer{}, // No consumers }, } @@ -389,7 +389,7 @@ func TestDeadLetterQueue_Handler_EventsWithNoTriggers(t *testing.T) { require.NoError(t, err) - // Verify that NotifyTrigger was not called since there are no triggers + // Verify that Notify was not called since there are no consumers assert.Empty(t, mockFetcher.notifyCalls) } @@ -410,7 +410,7 @@ func TestDeadLetterQueue_Handler_FetcherError(t *testing.T) { Name: "test.event", ServiceName: "test-service", State: "archived", - Triggers: []domain.Trigger{ + Consumers: []domain.Consumer{ { ServiceName: "webhook-service", Host: "http://localhost:8080", @@ -434,7 +434,7 @@ func TestDeadLetterQueue_Handler_FetcherError(t *testing.T) { } mockFetcher.MockFetcher.EXPECT(). - NotifyTrigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Notify(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(fetcherError). Times(1) @@ -447,14 +447,14 @@ func TestDeadLetterQueue_Handler_FetcherError(t *testing.T) { asyncCtx := asyncadapter.NewAsyncCtx[pubsub.Message](context.Background(), messageBytes) err = handle.Handler(asyncCtx) - // The handler should continue even if NotifyTrigger fails (no error propagation in current implementation) + // The handler should continue even if Notify fails (no error propagation in current implementation) require.NoError(t, err) - // Verify that NotifyTrigger was called despite the error + // Verify that Notify was called despite the error assert.Len(t, mockFetcher.notifyCalls, 1) } -func TestDeadLetterQueue_Handler_MultipleEventsWithMixedTriggers(t *testing.T) { +func TestDeadLetterQueue_Handler_MultipleEventsWithMixedConsumers(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -471,7 +471,7 @@ func TestDeadLetterQueue_Handler_MultipleEventsWithMixedTriggers(t *testing.T) { Name: "order.created", ServiceName: "order-service", State: "archived", - Triggers: []domain.Trigger{ + Consumers: []domain.Consumer{ { ServiceName: "inventory-service", Host: "http://inventory.local", @@ -484,13 +484,13 @@ func TestDeadLetterQueue_Handler_MultipleEventsWithMixedTriggers(t *testing.T) { Name: "notification.send", ServiceName: "notification-service", State: "archived", - Triggers: []domain.Trigger{}, // Event with no triggers + Consumers: []domain.Consumer{}, // Event with no consumers }, { Name: "audit.log", ServiceName: "audit-service", State: "archived", - Triggers: []domain.Trigger{ + Consumers: []domain.Consumer{ { ServiceName: "logging-service", Host: "http://logs.local", @@ -518,9 +518,9 @@ func TestDeadLetterQueue_Handler_MultipleEventsWithMixedTriggers(t *testing.T) { notifyCalls: []notifyCall{}, } - // Expect 3 calls to NotifyTrigger: 1 from first event + 0 from second event + 2 from third event + // Expect 3 calls to Notify: 1 from first event + 0 from second event + 2 from third event mockFetcher.MockFetcher.EXPECT(). - NotifyTrigger(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Notify(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil). Times(3) @@ -546,7 +546,7 @@ func TestDeadLetterQueue_Handler_MultipleEventsWithMixedTriggers(t *testing.T) { assert.Contains(t, eventNames, "order.created") assert.Contains(t, eventNames, "audit.log") - assert.NotContains(t, eventNames, "notification.send") // This event has no triggers + assert.NotContains(t, eventNames, "notification.send") // This event has no consumers // Count occurrences of each event orderCount := 0 @@ -561,7 +561,7 @@ func TestDeadLetterQueue_Handler_MultipleEventsWithMixedTriggers(t *testing.T) { } assert.Equal(t, 1, orderCount, "order.created should appear once") - assert.Equal(t, 2, auditCount, "audit.log should appear twice (2 triggers)") + assert.Equal(t, 2, auditCount, "audit.log should appear twice (2 consumers)") } func TestDeadLetterQueue_Handler_VerifyState(t *testing.T) { diff --git a/internal/wtrhandler/external_handle_http.go b/internal/wtrhandler/external_handle_http.go index 6be1756..c243709 100644 --- a/internal/wtrhandler/external_handle_http.go +++ b/internal/wtrhandler/external_handle_http.go @@ -84,21 +84,21 @@ func PublisherEvent( eventType := event.Type.String() if eventType == "" { l.Warn("event type is empty, defaulting to internal", "event_name", event.Name) - eventType = domain.TriggerTypeInternal.String() + eventType = domain.EventTypeInternal.String() } - for _, trigger := range event.Triggers { - config := trigger.Option.ToAsynqOptions() + for _, consumer := range event.Consumers { + config := consumer.Option.ToAsynqOptions() input := RequestPayload{ EventName: event.Name, Data: payload.Data, Headers: payload.Metadata.Headers, - Trigger: Trigger{ - ServiceName: trigger.ServiceName, - BaseUrl: trigger.Host, - Path: trigger.Path, - Headers: trigger.Headers, + Consumer: Consumer{ + ServiceName: consumer.ServiceName, + BaseUrl: consumer.Host, + Path: consumer.Path, + Headers: consumer.Headers, }, } diff --git a/internal/wtrhandler/external_handle_http_test.go b/internal/wtrhandler/external_handle_http_test.go index 4b185e9..df69f9a 100644 --- a/internal/wtrhandler/external_handle_http_test.go +++ b/internal/wtrhandler/external_handle_http_test.go @@ -62,8 +62,8 @@ func TestGetExternalHandle(t *testing.T) { setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "user.created").Return(domain.Event{ Name: "user.created", - Type: domain.TriggerTypeInternal, - Triggers: []domain.Trigger{{ + Type: domain.EventTypeInternal, + Consumers: []domain.Consumer{{ ServiceName: "test-service", Host: "http://localhost:8080", Path: "/webhook", @@ -125,8 +125,8 @@ func TestGetExternalHandle(t *testing.T) { setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "order.completed").Return(domain.Event{ Name: "order.completed", - Type: domain.TriggerTypeInternal, - Triggers: []domain.Trigger{{ + Type: domain.EventTypeInternal, + Consumers: []domain.Consumer{{ ServiceName: "test-service", Host: "http://localhost:8080", Path: "/webhook", @@ -183,8 +183,8 @@ func TestGetExternalHandle(t *testing.T) { setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "notification.send").Return(domain.Event{ Name: "notification.send", - Type: domain.TriggerTypeInternal, - Triggers: []domain.Trigger{{ + Type: domain.EventTypeInternal, + Consumers: []domain.Consumer{{ ServiceName: "test-service", Host: "http://localhost:8080", Path: "/webhook", @@ -235,8 +235,8 @@ func TestGetExternalHandle(t *testing.T) { setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "payment.failed").Return(domain.Event{ Name: "payment.failed", - Type: domain.TriggerTypeInternal, - Triggers: []domain.Trigger{{ + Type: domain.EventTypeInternal, + Consumers: []domain.Consumer{{ ServiceName: "test-service", Host: "http://localhost:8080", Path: "/webhook", @@ -280,8 +280,8 @@ func TestGetExternalHandle(t *testing.T) { setupStoreMock: func(s *MockStore) { s.EXPECT().GetEvent(gomock.Any(), "system.ping").Return(domain.Event{ Name: "system.ping", - Type: domain.TriggerTypeInternal, - Triggers: []domain.Trigger{{ + Type: domain.EventTypeInternal, + Consumers: []domain.Consumer{{ ServiceName: "test-service", Host: "http://localhost:8080", Path: "/webhook", @@ -358,9 +358,9 @@ func TestGetExternalHandle(t *testing.T) { expectedError: true, }, { - name: "event_with_zero_triggers", + name: "event_with_zero_consumers", payload: InternalPayload{ - EventName: "no.triggers.event", + EventName: "no.consumers.event", Data: Data{ "key": "value", }, @@ -374,22 +374,22 @@ func TestGetExternalHandle(t *testing.T) { }, }, setupStoreMock: func(s *MockStore) { - s.EXPECT().GetEvent(gomock.Any(), "no.triggers.event").Return(domain.Event{ - Name: "no.triggers.event", - Type: domain.TriggerTypeInternal, - Triggers: []domain.Trigger{}, // Empty triggers + s.EXPECT().GetEvent(gomock.Any(), "no.consumers.event").Return(domain.Event{ + Name: "no.consumers.event", + Type: domain.EventTypeInternal, + Consumers: []domain.Consumer{}, // Empty consumers }, nil).Times(1) }, setupMock: func(m *pubadapter.MockGenericPublisher) { - // No publish should be called when there are no triggers + // No publish should be called when there are no consumers }, expectedStatus: http.StatusAccepted, expectedError: false, }, { - name: "event_with_multiple_triggers", + name: "event_with_multiple_consumers", payload: InternalPayload{ - EventName: "multi.trigger.event", + EventName: "multi.consumer.event", Data: Data{ "user_id": "456", }, @@ -407,10 +407,10 @@ func TestGetExternalHandle(t *testing.T) { }, }, setupStoreMock: func(s *MockStore) { - s.EXPECT().GetEvent(gomock.Any(), "multi.trigger.event").Return(domain.Event{ - Name: "multi.trigger.event", - Type: domain.TriggerTypeInternal, - Triggers: []domain.Trigger{ + s.EXPECT().GetEvent(gomock.Any(), "multi.consumer.event").Return(domain.Event{ + Name: "multi.consumer.event", + Type: domain.EventTypeInternal, + Consumers: []domain.Consumer{ { ServiceName: "service-one", Host: "http://localhost:8081", @@ -438,7 +438,7 @@ func TestGetExternalHandle(t *testing.T) { gomock.Any(), ). Return(nil). - Times(3) // Should be called once for each trigger + Times(3) // Should be called once for each consumer }, expectedStatus: http.StatusAccepted, expectedError: false, diff --git a/internal/wtrhandler/fetcher_mock.go b/internal/wtrhandler/fetcher_mock.go index c80f392..8d69992 100644 --- a/internal/wtrhandler/fetcher_mock.go +++ b/internal/wtrhandler/fetcher_mock.go @@ -41,18 +41,18 @@ func (m *MockFetcher) EXPECT() *MockFetcherMockRecorder { return m.recorder } -// NotifyTrigger mocks base method. -func (m *MockFetcher) NotifyTrigger(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { +// Notify mocks base method. +func (m *MockFetcher) Notify(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NotifyTrigger", ctx, data, headers, trigger) + ret := m.ctrl.Call(m, "Notify", ctx, data, headers, consumer) ret0, _ := ret[0].(error) return ret0 } -// NotifyTrigger indicates an expected call of NotifyTrigger. -func (mr *MockFetcherMockRecorder) NotifyTrigger(ctx, data, headers, trigger any) *gomock.Call { +// Notify indicates an expected call of Notify. +func (mr *MockFetcherMockRecorder) Notify(ctx, data, headers, consumer any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyTrigger", reflect.TypeOf((*MockFetcher)(nil).NotifyTrigger), ctx, data, headers, trigger) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Notify", reflect.TypeOf((*MockFetcher)(nil).Notify), ctx, data, headers, consumer) } // MockConsumerInsights is a mock of ConsumerInsights interface. diff --git a/internal/wtrhandler/request_handle_asynq.go b/internal/wtrhandler/request_handle_asynq.go index f5ccfbc..377dd3f 100644 --- a/internal/wtrhandler/request_handle_asynq.go +++ b/internal/wtrhandler/request_handle_asynq.go @@ -12,7 +12,7 @@ import ( type RequestPayload struct { EventName string `json:"event_name"` - Trigger Trigger `json:"trigger"` + Consumer Consumer `json:"consumer"` Data map[string]any `json:"data"` Headers map[string]string `json:"headers,omitempty"` } @@ -30,7 +30,7 @@ func (p RequestPayload) mergeHeaders(headers map[string]string) map[string]strin } type Fetcher interface { - NotifyTrigger(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error + Notify(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error } type ConsumerInsights interface { @@ -44,7 +44,7 @@ func GetRequestHandle(fetch Fetcher, insights ConsumerInsights) asyncadapter.Han finished := time.Now() if err := insights.Consumed(ctx, domain.ConsumerMetric{ TopicName: payload.EventName, - ConsumerName: payload.Trigger.ServiceName, + ConsumerName: payload.Consumer.ServiceName, TimeStarted: started, TimeEnded: finished, TimeDurationMs: finished.Sub(started).Milliseconds(), @@ -66,10 +66,10 @@ func GetRequestHandle(fetch Fetcher, insights ConsumerInsights) asyncadapter.Han return fmt.Errorf("get payload: %w", err) } - headers := payload.mergeHeaders(payload.Trigger.Headers) - if err := fetch.NotifyTrigger(ctx, payload.Data, headers, payload.Trigger); err != nil { + headers := payload.mergeHeaders(payload.Consumer.Headers) + if err := fetch.Notify(ctx, payload.Data, headers, payload.Consumer); err != nil { insertInsights(ctx, payload, started, false) - return fmt.Errorf("fetch trigger: %w", err) + return fmt.Errorf("fetch consumer: %w", err) } insertInsights(ctx, payload, started, true) diff --git a/internal/wtrhandler/request_handle_asynq_test.go b/internal/wtrhandler/request_handle_asynq_test.go index 9607336..7812f47 100644 --- a/internal/wtrhandler/request_handle_asynq_test.go +++ b/internal/wtrhandler/request_handle_asynq_test.go @@ -27,12 +27,12 @@ func init() { // mockFetcher implements the Fetcher interface for testing type mockFetcher struct { - notifyTriggerFunc func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error + notifyTriggerFunc func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error } -func (m *mockFetcher) NotifyTrigger(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { +func (m *mockFetcher) Notify(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { if m.notifyTriggerFunc != nil { - return m.notifyTriggerFunc(ctx, data, headers, trigger) + return m.notifyTriggerFunc(ctx, data, headers, consumer) } return nil } @@ -167,7 +167,7 @@ func TestGetRequestHandle_Handler(t *testing.T) { name: "successful_request", payload: RequestPayload{ EventName: "user.created", - Trigger: Trigger{ + Consumer: Consumer{ ServiceName: "user-service", BaseUrl: testServer.URL, Path: "/webhook", @@ -184,7 +184,7 @@ func TestGetRequestHandle_Handler(t *testing.T) { }, }, mockFetcher: &mockFetcher{ - notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { + notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { return nil }, }, @@ -262,8 +262,8 @@ func TestRequestPayload_mergeHeaders_Integration(t *testing.T) { name string data map[string]any headers map[string]string - trigger Trigger - mockNotifyTriggerFn func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error + consumer Consumer + mockNotifyTriggerFn func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error setupMocks func(*MockConsumerInsights) expectedError bool expectedErrMsg string @@ -278,12 +278,12 @@ func TestRequestPayload_mergeHeaders_Integration(t *testing.T) { "Authorization": "Bearer token", "User-Agent": "webhook-client", }, - trigger: Trigger{ + consumer: Consumer{ ServiceName: "user-service", BaseUrl: "http://example.com", Path: "/webhook", }, - mockNotifyTriggerFn: func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { + mockNotifyTriggerFn: func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { // Verify the merged headers are passed correctly assert.Equal(t, "Bearer token", headers["Authorization"]) assert.Equal(t, "webhook-client", headers["User-Agent"]) @@ -308,12 +308,12 @@ func TestRequestPayload_mergeHeaders_Integration(t *testing.T) { "user_id": "123", }, headers: map[string]string{}, - trigger: Trigger{ + consumer: Consumer{ ServiceName: "user-service", BaseUrl: "http://example.com", Path: "/webhook", }, - mockNotifyTriggerFn: func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { + mockNotifyTriggerFn: func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { return assert.AnError }, setupMocks: func(mockInsights *MockConsumerInsights) { @@ -328,7 +328,7 @@ func TestRequestPayload_mergeHeaders_Integration(t *testing.T) { Times(1) }, expectedError: true, - expectedErrMsg: "fetch trigger:", + expectedErrMsg: "fetch consumer:", }, } @@ -350,7 +350,7 @@ func TestRequestPayload_mergeHeaders_Integration(t *testing.T) { payload := RequestPayload{ EventName: "user.created", - Trigger: tt.trigger, + Consumer: tt.consumer, Data: tt.data, Headers: tt.headers, } @@ -386,7 +386,7 @@ func TestMockFetcher_ErrorScenarios(t *testing.T) { { name: "network_error_simulation", mockError: assert.AnError, - expectedErrMsg: "fetch trigger:", + expectedErrMsg: "fetch consumer:", }, } @@ -407,7 +407,7 @@ func TestMockFetcher_ErrorScenarios(t *testing.T) { Times(1) mockFetch := &mockFetcher{ - notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { + notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { return tt.mockError }, } @@ -416,7 +416,7 @@ func TestMockFetcher_ErrorScenarios(t *testing.T) { payload := RequestPayload{ EventName: "user.created", - Trigger: Trigger{ + Consumer: Consumer{ ServiceName: "user-service", BaseUrl: "http://localhost:99999", // Unreachable port Path: "/webhook", @@ -458,7 +458,7 @@ func TestGetRequestHandle_HeaderMerging(t *testing.T) { Times(1) mockFetch := &mockFetcher{ - notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { + notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { receivedHeaders = headers return nil }, @@ -468,7 +468,7 @@ func TestGetRequestHandle_HeaderMerging(t *testing.T) { payload := RequestPayload{ EventName: "user.created", - Trigger: Trigger{ + Consumer: Consumer{ ServiceName: "user-service", BaseUrl: "http://example.com", Path: "/webhook", @@ -522,7 +522,7 @@ func TestGetRequestHandle_DataPassing(t *testing.T) { Times(1) mockFetch := &mockFetcher{ - notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, trigger Trigger) error { + notifyTriggerFunc: func(ctx context.Context, data map[string]any, headers map[string]string, consumer Consumer) error { receivedData = data return nil }, @@ -540,7 +540,7 @@ func TestGetRequestHandle_DataPassing(t *testing.T) { payload := RequestPayload{ EventName: "user.created", - Trigger: Trigger{ + Consumer: Consumer{ ServiceName: "user-service", BaseUrl: "http://example.com", Path: "/webhook", diff --git a/internal/wtrhandler/trigger.go b/internal/wtrhandler/trigger.go index 829c016..1a9ba2e 100644 --- a/internal/wtrhandler/trigger.go +++ b/internal/wtrhandler/trigger.go @@ -5,14 +5,14 @@ import ( "strings" ) -type Trigger struct { +type Consumer struct { ServiceName string `json:"service_name"` BaseUrl string `json:"base_url"` Path string `json:"path"` Headers map[string]string `bson:"headers"` } -func (t *Trigger) GetUrl() string { +func (t *Consumer) GetUrl() string { baseURL := strings.TrimSuffix(t.BaseUrl, "/") path := strings.TrimPrefix(t.Path, "/") return fmt.Sprintf("%s/%s", baseURL, path)