diff --git a/errors.go b/errors.go index dec7382..a211bfc 100644 --- a/errors.go +++ b/errors.go @@ -26,7 +26,7 @@ var ( cancelProto = status.FromContextError(context.Canceled).Proto() deadlineProto = status.FromContextError(context.DeadlineExceeded).Proto() - filterErr = errors.New(filterErrMsg) + filterErr = errors.New(filterErrMsg, j.C("ERR_d5e8f7a9b2c3d4e5")) ) // IsStoppedErr checks whether err is an ErrStopped @@ -60,5 +60,5 @@ func IsFilterErr(err error) bool { } func asFilterErr(err error) error { - return errors.Wrap(err, filterErrMsg) + return errors.Wrap(err, filterErrMsg, j.C("ERR_d5e8f7a9b2c3d4e5")) } diff --git a/filters/metadata.go b/filters/metadata.go index fe8ab72..0917a7c 100644 --- a/filters/metadata.go +++ b/filters/metadata.go @@ -18,8 +18,8 @@ const ( ) var ( - metadataEventFilterErr = errors.New(metadataEventFilterErrMsg) - deserializationErr = errors.New(deserializationErrMsg) + metadataEventFilterErr = errors.New(metadataEventFilterErrMsg, j.C("ERR_c1f2e3d4a5b6c7d8")) + deserializationErr = errors.New(deserializationErrMsg, j.C("ERR_a1b2c3d4e5f6a7b8")) ) func MetadataEventFilter[T any](ds Deserializer[T], flt DataFilter[T]) (reflex.EventFilter, error) { @@ -45,7 +45,7 @@ func IsDeserializationErr(err error) bool { } func asDeserializationErr(err error) error { - return errors.Wrap(err, deserializationErrMsg) + return errors.Wrap(err, deserializationErrMsg, j.C("ERR_a1b2c3d4e5f6a7b8")) } // IsMetadataEventFilterErr returns true if the error occurred during construction of a MetadataEventFilter. @@ -54,5 +54,6 @@ func IsMetadataEventFilterErr(err error) bool { } func makeMetadataEventFilterErr(ol ...errors.Option) error { - return errors.New(metadataEventFilterErrMsg, ol...) + opts := append([]errors.Option{j.C("ERR_c1f2e3d4a5b6c7d8")}, ol...) + return errors.New(metadataEventFilterErrMsg, opts...) } diff --git a/filters/metadata_test.go b/filters/metadata_test.go index 3b8cc9f..34fdee4 100644 --- a/filters/metadata_test.go +++ b/filters/metadata_test.go @@ -4,12 +4,18 @@ import ( "testing" "github.com/luno/jettison/errors" + "github.com/luno/jettison/j" "github.com/luno/jettison/jtest" "github.com/stretchr/testify/require" "github.com/luno/reflex" ) +var ( + testBadMetadataErr = errors.New("bad metadata", j.C("ERR_test_bad_metadata")) + testBadFilterErr = errors.New("bad filter", j.C("ERR_test_bad_filter")) +) + func TestMakeMetadataEventFilter(t *testing.T) { type testCase struct { name string @@ -74,12 +80,12 @@ func TestMetadataEventFilter(t *testing.T) { name: "Deserializer errors", e: &reflex.Event{MetaData: m}, ds: func(x *testing.T) Deserializer[string] { - return func(b []byte) (string, error) { require.Equal(x, m, b); return string(b), errors.New("bad metadata") } + return func(b []byte) (string, error) { require.Equal(x, m, b); return string(b), testBadMetadataErr } }, flt: func(x *testing.T) DataFilter[string] { return func(s string) (bool, error) { require.Fail(x, "should not be reached"); return true, nil } }, - err: []error{deserializationErr, errors.New("bad metadata")}, + err: []error{deserializationErr, testBadMetadataErr}, }, { name: "Data Filter errors", @@ -88,9 +94,9 @@ func TestMetadataEventFilter(t *testing.T) { return func(b []byte) (string, error) { require.Equal(x, m, b); return string(b), nil } }, flt: func(x *testing.T) DataFilter[string] { - return func(s string) (bool, error) { require.Equal(x, d, s); return true, errors.New("bad filter") } + return func(s string) (bool, error) { require.Equal(x, d, s); return true, testBadFilterErr } }, - err: []error{errors.New("bad filter")}, + err: []error{testBadFilterErr}, }, { name: "Exclude", @@ -159,7 +165,7 @@ func TestIsDeserializationErr(t *testing.T) { { name: "constructed deserialization error", err: errors.New(deserializationErrMsg), - want: true, + want: false, }, } for _, tt := range tests { diff --git a/rpatterns/batch.go b/rpatterns/batch.go index f507da2..415831c 100644 --- a/rpatterns/batch.go +++ b/rpatterns/batch.go @@ -22,7 +22,10 @@ type batchEvent struct { const minWait = time.Millisecond * 100 -var ErrBatchState = errors.New("batch error state", j.C("ERR_b3053f5f1a3ecd23")) +var ( + ErrBatchState = errors.New("batch error state", j.C("ERR_b3053f5f1a3ecd23")) + ErrInvalidBatchConfig = errors.New("batchPeriod or batchLen must be non-zero", j.C("ERR_invalid_batch_config")) +) // BatchConsumer provides a reflex consumer that buffers events // and flushes a batch to the consume function when either @@ -95,7 +98,7 @@ func (c *BatchConsumer) Stop() error { // enqueue adds the event to the buffer or returns error if batch needs to be reset. func (c *BatchConsumer) enqueue(ctx context.Context, e *AckEvent) error { if c.flushPeriod == 0 && c.flushLen == 0 { - return errors.New("batchPeriod or batchLen must be non-zero") + return ErrInvalidBatchConfig } // Add event to batch queue diff --git a/rpatterns/batch_test.go b/rpatterns/batch_test.go index 3853001..6971d2c 100644 --- a/rpatterns/batch_test.go +++ b/rpatterns/batch_test.go @@ -105,7 +105,6 @@ func TestRunBatchConsumer(t *testing.T) { } func TestReset(t *testing.T) { - recvEndErr := errors.New("recv error") tests := []struct { name string batchLen int @@ -146,13 +145,13 @@ func TestReset(t *testing.T) { spec := rpatterns.NewBatchSpec(b.Stream, consumer) ctx := context.Background() err := reflex.Run(ctx, spec) - jtest.Assert(t, recvEndErr, err) + jtest.Assert(t, errEvents, err) events = ItoEList(tt.passEvents...) b.events = events err = reflex.Run(ctx, spec) - jtest.Assert(t, recvEndErr, err) + jtest.Assert(t, errEvents, err) }) } } @@ -171,7 +170,7 @@ func TestInvalidConfig(t *testing.T) { spec := rpatterns.NewBatchSpec(b.Stream, consumer) ctx := context.Background() err := reflex.Run(ctx, spec) - jtest.Assert(t, errors.New("batchPeriod or batchLen must be non-zero"), err) + jtest.Assert(t, rpatterns.ErrInvalidBatchConfig, err) } type EventList struct {