Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/setup/backoffice/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func Start(
backoffice.GetHealthCheckHandler(),
backoffice.CreateConsumer(cache, store),
backoffice.GetEvent(cache, store),
backoffice.GetEvents(cache, store),
backoffice.GetPathEventHandle(cache, store),
backoffice.GetEvents(cache, store),
backoffice.GetRegisterTaskConsumerArchived(cache, store),
backoffice.RemoveEvent(cache, store),
backoffice.GetInsightsHandle(insightsStore),
Expand Down
1 change: 1 addition & 0 deletions deployment/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CREATE TABLE IF NOT EXISTS events (
service_name VARCHAR(255) NOT NULL,
state VARCHAR(100) NOT NULL,
consumers JSONB NOT NULL,
opts JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMP NULL
Expand Down
13 changes: 7 additions & 6 deletions example/event_data.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
{
"name": "payment.charged",
"type": "external",
"option": {
"wq_type": "low_throughput",
"max_retries": 3,
"retention": "168h",
"unique_ttl": "60s",
"schedule_in": "100ms"
},
"consumers": [
{
"service_name": "external-service",
Expand All @@ -9,12 +16,6 @@
"path": "/payment/charged",
"headers": {
"Content-Type": "application/json"
},
"option": {
"wq_type": "low_throughput",
"max_retries": 3,
"retention": "168h",
"unique_ttl": "60s"
}
}
]
Expand Down
5 changes: 0 additions & 5 deletions example/publisher_data.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,5 @@
},
"metadata": {
"correlation_id": "5e4dd662-9eba-4321-9c97-0b4ee0942f8b"
},
"opts": {
"max_retries": 3,
"wq_type": "external",
"schedule_in": "100ms"
}
}
6 changes: 3 additions & 3 deletions internal/backoffice/path_event_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func TestGetPathEventHandle(t *testing.T) {
ServiceName: "user-service",
State: "active",
Type: "external",
Option: domain.Opt{
MaxRetries: 3,
},
Consumers: []domain.Consumer{
{
ServiceName: "notification-service",
Host: "https://api.example.com",
Path: "/webhook",
Headers: map[string]string{"Content-Type": "application/json"},
Option: domain.Opt{
MaxRetries: 3,
},
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions internal/backoffice/register_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type EventDto struct {
Name string `json:"name"`
Type domain.Type `json:"type"`
Option domain.Opt `json:"option" bson:"option"`
Consumers []domain.Consumer `json:"consumers"`
}

Expand All @@ -27,6 +28,7 @@ func (e *EventDto) ToDomain() domain.Event {
Name: e.Name,
ServiceName: env.InternalServiceName,
Type: e.Type,
Option: e.Option,
Consumers: e.Consumers,
}
}
Expand Down
8 changes: 3 additions & 5 deletions internal/domain/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Event struct {
ServiceName string `json:"service_name" bson:"service_name"`
State string `json:"state" bson:"state"`
Type Type `json:"type" bson:"type"`
Option Opt `json:"option" bson:"option"`
Consumers []Consumer `json:"consumers" bson:"consumers"`
}

Expand All @@ -24,10 +25,8 @@ func (e *Event) Validate() error {
return fmt.Errorf("invalid event type: %s", e.Type)
}

for _, consumer := range e.Consumers {
if err := consumer.Option.Validate(); err != nil {
return fmt.Errorf("invalid consumer option: %w", err)
}
if err := e.Option.Validate(); err != nil {
return fmt.Errorf("invalid event option: %w", err)
}

return nil
Expand All @@ -38,7 +37,6 @@ type Consumer struct {
Host string `json:"host" bson:"host"`
Path string `json:"path" bson:"path"`
Headers map[string]string `json:"headers" bson:"headers"`
Option Opt `json:"option" bson:"option"`
}

type Opt struct {
Expand Down
5 changes: 5 additions & 0 deletions internal/interstore/model_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ type ModelEvent struct {
ServiceName string
State string
Consumers []byte
Option []byte
}

func (m ModelEvent) ToDomain() domain.Event {
var consumers []domain.Consumer
json.Unmarshal(m.Consumers, &consumers)

var option domain.Opt
json.Unmarshal(m.Option, &option)

return domain.Event{
ID: m.ID,
Name: m.Name,
ServiceName: m.ServiceName,
State: m.State,
Consumers: consumers,
Option: option,
}
}
35 changes: 29 additions & 6 deletions internal/interstore/postgres_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (r *PostgresStore) GetAllEvents(ctx context.Context) ([]domain.Event, error
&event.ServiceName,
&event.State,
&event.Consumers,
&event.Option,
); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
Expand All @@ -77,17 +78,19 @@ func (r *PostgresStore) GetInternalEvent(ctx context.Context, eventName string)
l := ctxlogger.GetLogger(ctx)

query := `
SELECT id, name, service_name, consumers
SELECT id, name, service_name, consumers, opts
FROM events
WHERE name = $1 AND deleted_at IS NULL
`
var event domain.Event
var consumersJSON []byte
var optsJSON []byte
err := r.db.QueryRowContext(ctx, query, eventName).Scan(
&event.ID,
&event.Name,
&event.ServiceName,
&consumersJSON,
&optsJSON,
)

if errors.Is(err, sql.ErrNoRows) {
Expand All @@ -104,6 +107,10 @@ func (r *PostgresStore) GetInternalEvent(ctx context.Context, eventName string)
return domain.Event{}, fmt.Errorf("failed to unmarshal consumers: %w", err)
}

if err := json.Unmarshal(optsJSON, &event.Option); err != nil {
return domain.Event{}, fmt.Errorf("failed to unmarshal event option: %w", err)
}

return event, nil
}

Expand Down Expand Up @@ -140,6 +147,7 @@ func (r *PostgresStore) GetInternalEvents(ctx context.Context, filters domain.Fi
&event.ServiceName,
&event.State,
&event.Consumers,
&event.Option,
); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
Expand All @@ -161,13 +169,18 @@ func (r *PostgresStore) Save(ctx context.Context, event domain.Event) error {
return fmt.Errorf("failed to marshal consumers: %w", err)
}

optsJSON, err := json.Marshal(event.Option)
if err != nil {
return fmt.Errorf("failed to marshal event option: %w", err)
}

if event.State == "" {
event.State = "active"
}

query := `
INSERT INTO events (id, name, service_name, state, consumers, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO events (id, name, service_name, state, consumers, opts, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`

now := time.Now()
Expand All @@ -177,6 +190,7 @@ func (r *PostgresStore) Save(ctx context.Context, event domain.Event) error {
event.ServiceName,
event.State,
consumersJSON,
optsJSON,
now,
now,
)
Expand All @@ -194,7 +208,8 @@ const modelEventFields = `
name,
service_name,
state,
consumers
consumers,
opts
`

func (r *PostgresStore) GetEventByID(ctx context.Context, eventID uuid.UUID) (domain.Event, error) {
Expand All @@ -209,6 +224,7 @@ func (r *PostgresStore) GetEventByID(ctx context.Context, eventID uuid.UUID) (do
&event.ServiceName,
&event.State,
&event.Consumers,
&event.Option,
)

if errors.Is(err, sql.ErrNoRows) {
Expand Down Expand Up @@ -252,6 +268,7 @@ func (r *PostgresStore) GetAllSchedulers(ctx context.Context, state string) ([]d
&event.ServiceName,
&event.State,
&event.Consumers,
&event.Option,
); err != nil {
l.Error("Error on scan row", "error", err)
return nil, fmt.Errorf("failed to scan row: %w", err)
Expand Down Expand Up @@ -283,15 +300,21 @@ func (r *PostgresStore) UpdateEvent(ctx context.Context, event domain.Event) err
events SET name = $2,
service_name = $3,
state = $4,
consumers = $5
consumers = $5,
opts = $6
WHERE id = $1 AND deleted_at IS NULL;`

consumersJSON, err := json.Marshal(event.Consumers)
if err != nil {
return fmt.Errorf("failed to marshal consumers: %w", err)
}

if _, err := r.db.ExecContext(ctx, query, event.ID, event.Name, event.ServiceName, event.State, consumersJSON); err != nil {
optsJSON, err := json.Marshal(event.Option)
if err != nil {
return fmt.Errorf("failed to marshal event option: %w", err)
}

if _, err := r.db.ExecContext(ctx, query, event.ID, event.Name, event.ServiceName, event.State, consumersJSON, optsJSON); err != nil {
return fmt.Errorf("failed to update event: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/wtrhandler/external_handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func PublisherEvent(
eventType = domain.EventTypeInternal.String()
}

config := event.Option.ToAsynqOptions()
for _, consumer := range event.Consumers {
config := consumer.Option.ToAsynqOptions()

input := RequestPayload{
EventName: event.Name,
Expand Down
38 changes: 0 additions & 38 deletions internal/wtrhandler/external_handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/IsaacDSC/gqueue/internal/cfg"
"github.com/IsaacDSC/gqueue/internal/domain"
"github.com/IsaacDSC/gqueue/pkg/intertime"
"github.com/IsaacDSC/gqueue/pkg/pubadapter"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -54,10 +52,6 @@ func TestGetExternalHandle(t *testing.T) {
"Content-Type": "application/json",
},
},
Opts: domain.Opt{
MaxRetries: 3,
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "user.created").Return(domain.Event{
Expand Down Expand Up @@ -114,13 +108,6 @@ func TestGetExternalHandle(t *testing.T) {
"X-Correlation": "corr_def456",
},
},
Opts: domain.Opt{
MaxRetries: 5,
Retention: intertime.Duration(24 * time.Hour),
UniqueTTL: intertime.Duration(1 * time.Hour),
ScheduleIn: intertime.Duration(30 * time.Second),
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "order.completed").Return(domain.Event{
Expand Down Expand Up @@ -174,11 +161,6 @@ func TestGetExternalHandle(t *testing.T) {
"X-Service": "notification-worker",
},
},
Opts: domain.Opt{
MaxRetries: 2,
Deadline: func() *time.Time { t := time.Now().Add(5 * time.Minute); return &t }(),
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "notification.send").Return(domain.Event{
Expand Down Expand Up @@ -227,10 +209,6 @@ func TestGetExternalHandle(t *testing.T) {
Version: "1.0",
Environment: "test",
},
Opts: domain.Opt{
MaxRetries: 3,
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "payment.failed").Return(domain.Event{
Expand Down Expand Up @@ -273,9 +251,6 @@ func TestGetExternalHandle(t *testing.T) {
Version: "1.0",
Environment: "test",
},
Opts: domain.Opt{
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "system.ping").Return(domain.Event{
Expand Down Expand Up @@ -319,9 +294,6 @@ func TestGetExternalHandle(t *testing.T) {
Version: "1.0",
Environment: "test",
},
Opts: domain.Opt{
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "nonexistent.event").Return(domain.Event{}, domain.EventNotFound).Times(1)
Expand All @@ -344,9 +316,6 @@ func TestGetExternalHandle(t *testing.T) {
Version: "1.0",
Environment: "test",
},
Opts: domain.Opt{
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "error.event").Return(domain.Event{}, errors.New("database connection error")).Times(1)
Expand All @@ -369,9 +338,6 @@ func TestGetExternalHandle(t *testing.T) {
Version: "1.0",
Environment: "test",
},
Opts: domain.Opt{
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "no.consumers.event").Return(domain.Event{
Expand Down Expand Up @@ -401,10 +367,6 @@ func TestGetExternalHandle(t *testing.T) {
"Content-Type": "application/json",
},
},
Opts: domain.Opt{
MaxRetries: 3,
WqType: pubadapter.LowLatency,
},
},
setupStoreMock: func(s *MockStore) {
s.EXPECT().GetEvent(gomock.Any(), "multi.consumer.event").Return(domain.Event{
Expand Down
Loading