diff --git a/internal/adapters/adapters.go b/internal/adapters/adapters.go index 169a4f7..e1d3dbc 100644 --- a/internal/adapters/adapters.go +++ b/internal/adapters/adapters.go @@ -11,7 +11,7 @@ type CachedWorkQueue interface { cacheLifetime time.Duration, cacheKey string, taskPayload []byte, - ) ([]byte, error) + ) (result []byte, err error) } type QueueConsumer interface { diff --git a/internal/adapters/natsadapter/natsadapter.go b/internal/adapters/natsadapter/natsadapter.go index 9064d3b..0e030ff 100644 --- a/internal/adapters/natsadapter/natsadapter.go +++ b/internal/adapters/natsadapter/natsadapter.go @@ -115,10 +115,18 @@ func (na *NatsAdapter) ConsumeQueue( log.Errorf("task seq=%d inProgress: %v", seq, err) } log.Infof("got task seq=%d payload=%s", seq, msg.Data()) + + defer func() { + if err := recover(); err != nil { + log.Errorf("recovered panic from consumer: %v", err) + } + }() cacheKey, resultPayload, taskErr := taskFunc(msg.Data()) + if err := msg.DoubleAck(ctx); err != nil { log.Errorf("double ack seq=%d: %v", seq, err) } + if taskErr != nil { log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr) if err := msg.Nak(); err != nil { @@ -126,6 +134,7 @@ func (na *NatsAdapter) ConsumeQueue( } return } + log.Infof("task seq=%d cachekey=%s finished, payload=%.100s", seq, cacheKey, resultPayload) if _, err := na.kv.Put(ctx, cacheKey, resultPayload); err != nil { log.Errorf("put seq=%d to cache: %v", seq, err)