From 6913733ad58b682f07b5a0675a9e2845af630072 Mon Sep 17 00:00:00 2001 From: Egor Aristov Date: Tue, 21 Jan 2025 19:44:33 +0300 Subject: [PATCH] change consumer to durable --- internal/adapters/natsadapter/natsadapter.go | 6 ++++-- internal/extractors/pwextractor/pwextractor.go | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/adapters/natsadapter/natsadapter.go b/internal/adapters/natsadapter/natsadapter.go index c0be658..d53e811 100644 --- a/internal/adapters/natsadapter/natsadapter.go +++ b/internal/adapters/natsadapter/natsadapter.go @@ -118,7 +118,9 @@ func (na *NatsAdapter) ConsumeQueue( ctx context.Context, taskFunc func(taskPayload []byte) (cacheKey string, result []byte, err error), ) error { - cons, err := na.jstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{}) + cons, err := na.jstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "worker", + }) if err != nil { return fmt.Errorf("create js consumer: %w", err) } @@ -147,7 +149,7 @@ func (na *NatsAdapter) ConsumeQueue( if taskErr != nil { log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr) - if err := msg.Nak(); err != nil { + if err := msg.Term(); err != nil { log.Errorf("nak %d: %v", seq, err) } return diff --git a/internal/extractors/pwextractor/pwextractor.go b/internal/extractors/pwextractor/pwextractor.go index d6ecf27..a6c5ef9 100644 --- a/internal/extractors/pwextractor/pwextractor.go +++ b/internal/extractors/pwextractor/pwextractor.go @@ -161,7 +161,6 @@ func (p *pageParser) parse() (*models.TaskResult, error) { func (p *pageParser) waitFullLoad() { timeout := pwDuration("5s") ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go func() { err := p.page.WaitForLoadState(playwright.PageWaitForLoadStateOptions{