Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ graph TB
Consumers[Consumer Services]

%% Flows
Services -->|Create Events<br/>Register Triggers<br/>Publish Events| API
Services -->|Create Events<br/>Register Consumers<br/>Publish Events| API
API --> Database
API --> Queue
Queue --> Worker
Expand Down
6 changes: 3 additions & 3 deletions deployment/dashboard.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
-- CONSUMERS INSCRITOS NOS TOPICOS
SELECT
e.unique_key as topic,
e.name as topic,
CONCAT(elem->>'host', elem->>'path') AS consumer
FROM events AS e
CROSS JOIN LATERAL jsonb_array_elements(e.triggers) AS elem
CROSS JOIN LATERAL jsonb_array_elements(e.consumers) AS elem
WHERE e.deleted_at IS NULL
AND jsonb_typeof(e.triggers) = 'array'
AND jsonb_typeof(e.consumers) = 'array'
AND elem ? 'host'
ORDER BY topic desc;
5 changes: 2 additions & 3 deletions deployment/tables.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
-- Internal Events Table
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
unique_key TEXT NOT NULL UNIQUE,
name VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL UNIQUE,
service_name VARCHAR(255) NOT NULL,
state VARCHAR(100) NOT NULL,
triggers JSONB NOT NULL,
consumers JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMP NULL
Expand Down
2 changes: 1 addition & 1 deletion example/event_data.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "payment.charged",
"type": "external",
"triggers": [
"consumers": [
{
"service_name": "external-service",
"type": "persistent",
Expand Down
4 changes: 1 addition & 3 deletions example/path_event_data.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
{
"name": "payment.processed",
"service_name": "my-app",
"repo_url": "http://github.com/my-org/my-repo",
"team_owner": "my-team",
"triggers": [
"consumers": [
{
"service_name": "consumer-1",
"type": "persistent",
Expand Down
4 changes: 2 additions & 2 deletions internal/asynqstore/cache_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type Cacher interface {
FindAllTriggers(ctx context.Context) ([]domain.Event, error)
FindAllConsumers(ctx context.Context) ([]domain.Event, error)
FindAllQueues(ctx context.Context) ([]asynqtask.Queue, error)
FindArchivedTasks(ctx context.Context, queue string) ([]string, error)
GetMsgArchivedTask(ctx context.Context, queue, task string) (asynqtask.RawMsg, error)
Expand All @@ -34,7 +34,7 @@ var _ Cacher = (*Cache)(nil)

const archivedKey = "gqueue:consumers:schedule:archived"

func (c Cache) FindAllTriggers(ctx context.Context) ([]domain.Event, error) {
func (c Cache) FindAllConsumers(ctx context.Context) ([]domain.Event, error) {
cResult, err := c.cache.Get(ctx, archivedKey).Result()
if errors.Is(err, redis.Nil) {
return nil, asynqtask.ErrorNotFound
Expand Down
6 changes: 3 additions & 3 deletions internal/asynqtask/archived_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type CacheManager interface {
FindAllTriggers(ctx context.Context) ([]domain.Event, error)
FindAllConsumers(ctx context.Context) ([]domain.Event, error)
FindAllQueues(ctx context.Context) ([]Queue, error)
FindArchivedTasks(ctx context.Context, queue string) ([]string, error)
GetMsgArchivedTask(ctx context.Context, queue, task string) (RawMsg, error)
Expand Down Expand Up @@ -42,7 +42,7 @@ func (n TaskManager) NotifyListeners(ctx context.Context) error {
l := ctxlogger.GetLogger(ctx)
l.Debug("Starting notification process")

events, err := n.cm.FindAllTriggers(ctx)
events, err := n.cm.FindAllConsumers(ctx)
if errors.Is(err, ErrorNotFound) {
results, err := n.store.GetAllSchedulers(ctx, "archived")
if errors.Is(domain.EventNotFound, err) {
Expand Down Expand Up @@ -80,7 +80,7 @@ func (n TaskManager) NotifyListeners(ctx context.Context) error {
QueueName: msg.Queue,
Tasks: msg.Tasks,
Data: msg.Msg["data"],
Schedulers: events.Triggers,
Schedulers: events.Consumers,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/asynqtask/fetch_msg_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type FetchMsg struct {
QueueName string
Tasks []string
Data any
Schedulers []domain.Trigger
Schedulers []domain.Consumer
}

type TaskArchivedData struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/backoffice/get_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func GetEvent(cc cachemanager.Cache, repo Repository) httpadapter.HttpHandle {

var event domain.Event
if err := cc.Once(ctx, key, &event, defaultTTL, func(ctx context.Context) (any, error) {
return repo.GetInternalEvent(ctx, eventName, serviceName, "active")
return repo.GetInternalEvent(ctx, eventName)
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion internal/backoffice/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type Repository interface {
Save(ctx context.Context, event domain.Event) error
GetInternalEvent(ctx context.Context, eventName, serviceName string, state string) (domain.Event, error)
GetInternalEvent(ctx context.Context, eventName string) (domain.Event, error)
GetInternalEvents(ctx context.Context, filters domain.FilterEvents) ([]domain.Event, error)
DisabledEvent(ctx context.Context, eventID uuid.UUID) error
UpdateEvent(ctx context.Context, event domain.Event) error
Expand Down
2 changes: 1 addition & 1 deletion internal/backoffice/path_event_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestGetPathEventHandle(t *testing.T) {
ServiceName: "user-service",
State: "active",
Type: "external",
Triggers: []domain.Trigger{
Consumers: []domain.Consumer{
{
ServiceName: "notification-service",
Host: "https://api.example.com",
Expand Down
8 changes: 4 additions & 4 deletions internal/backoffice/register_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
)

type EventDto struct {
Name string `json:"name"`
Type domain.Type `json:"type"`
Triggers []domain.Trigger `json:"triggers"`
Name string `json:"name"`
Type domain.Type `json:"type"`
Consumers []domain.Consumer `json:"consumers"`
}

func (e *EventDto) ToDomain() domain.Event {
Expand All @@ -27,7 +27,7 @@ func (e *EventDto) ToDomain() domain.Event {
Name: e.Name,
ServiceName: env.InternalServiceName,
Type: e.Type,
Triggers: e.Triggers,
Consumers: e.Consumers,
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/backoffice/repository_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 13 additions & 13 deletions internal/domain/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@ import (
)

type Event struct {
ID uuid.UUID `json:"id" bson:"id"`
Name string `json:"name" bson:"name"`
ServiceName string `json:"service_name" bson:"service_name"`
State string `json:"state" bson:"state"`
Type Type `json:"type" bson:"type"`
Triggers []Trigger `json:"triggers" bson:"triggers"`
ID uuid.UUID `json:"id" bson:"id"`
Name string `json:"name" bson:"name"`
ServiceName string `json:"service_name" bson:"service_name"`
State string `json:"state" bson:"state"`
Type Type `json:"type" bson:"type"`
Consumers []Consumer `json:"consumers" bson:"consumers"`
}

func (e *Event) Validate() error {
if e.Type != TriggerTypeInternal && e.Type != TriggerTypeExternal {
if e.Type != EventTypeInternal && e.Type != EventTypeExternal {
return fmt.Errorf("invalid event type: %s", e.Type)
}

for _, trigger := range e.Triggers {
if err := trigger.Option.Validate(); err != nil {
return fmt.Errorf("invalid trigger option: %w", err)
for _, consumer := range e.Consumers {
if err := consumer.Option.Validate(); err != nil {
return fmt.Errorf("invalid consumer option: %w", err)
}
}

return nil
}

type Trigger struct {
type Consumer struct {
ServiceName string `json:"service_name" bson:"service_name"`
Host string `json:"host" bson:"host"`
Path string `json:"path" bson:"path"`
Expand All @@ -60,8 +60,8 @@ func (vt Type) String() string {
}

const (
TriggerTypeInternal Type = "internal"
TriggerTypeExternal Type = "external"
EventTypeInternal Type = "internal"
EventTypeExternal Type = "external"
)

func (o Opt) Validate() error {
Expand Down
6 changes: 3 additions & 3 deletions internal/fetcher/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func NewNotification() *Notification {
return &Notification{}
}

func (n Notification) NotifyTrigger(ctx context.Context, data map[string]any, headers map[string]string, trigger wtrhandler.Trigger) error {
url := trigger.GetUrl()
func (n Notification) Notify(ctx context.Context, data map[string]any, headers map[string]string, consumer wtrhandler.Consumer) error {
url := consumer.GetUrl()
return fetch(ctx, url, data, headers)
}

Expand Down Expand Up @@ -50,7 +50,7 @@ func fetch(ctx context.Context, url string, data any, headers map[string]string)
req.Header.Set(key, value)
}

// #nosec G704 -- SSRF is intentional: this function sends webhooks to user-configured trigger endpoints
// #nosec G704 -- SSRF is intentional: this function sends webhooks to user-configured consumers endpoints
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("post request: %w", err)
Expand Down
Loading