recover panics in consumer

This commit is contained in:
Egor Aristov 2025-01-16 12:44:01 +03:00
parent 5416f8bc0a
commit 8450957654
2 changed files with 10 additions and 1 deletions

View File

@ -11,7 +11,7 @@ type CachedWorkQueue interface {
cacheLifetime time.Duration, cacheLifetime time.Duration,
cacheKey string, cacheKey string,
taskPayload []byte, taskPayload []byte,
) ([]byte, error) ) (result []byte, err error)
} }
type QueueConsumer interface { type QueueConsumer interface {

View File

@ -115,10 +115,18 @@ func (na *NatsAdapter) ConsumeQueue(
log.Errorf("task seq=%d inProgress: %v", seq, err) log.Errorf("task seq=%d inProgress: %v", seq, err)
} }
log.Infof("got task seq=%d payload=%s", seq, msg.Data()) log.Infof("got task seq=%d payload=%s", seq, msg.Data())
defer func() {
if err := recover(); err != nil {
log.Errorf("recovered panic from consumer: %v", err)
}
}()
cacheKey, resultPayload, taskErr := taskFunc(msg.Data()) cacheKey, resultPayload, taskErr := taskFunc(msg.Data())
if err := msg.DoubleAck(ctx); err != nil { if err := msg.DoubleAck(ctx); err != nil {
log.Errorf("double ack seq=%d: %v", seq, err) log.Errorf("double ack seq=%d: %v", seq, err)
} }
if taskErr != nil { if taskErr != nil {
log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr) log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr)
if err := msg.Nak(); err != nil { if err := msg.Nak(); err != nil {
@ -126,6 +134,7 @@ func (na *NatsAdapter) ConsumeQueue(
} }
return return
} }
log.Infof("task seq=%d cachekey=%s finished, payload=%.100s", seq, cacheKey, resultPayload) log.Infof("task seq=%d cachekey=%s finished, payload=%.100s", seq, cacheKey, resultPayload)
if _, err := na.kv.Put(ctx, cacheKey, resultPayload); err != nil { if _, err := na.kv.Put(ctx, cacheKey, resultPayload); err != nil {
log.Errorf("put seq=%d to cache: %v", seq, err) log.Errorf("put seq=%d to cache: %v", seq, err)