change consumer to durable

This commit is contained in:
Egor Aristov 2025-01-21 19:44:33 +03:00
parent 155cb37735
commit e3ad088dbd
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F
2 changed files with 4 additions and 3 deletions

View File

@ -118,7 +118,9 @@ func (na *NatsAdapter) ConsumeQueue(
ctx context.Context,
taskFunc func(taskPayload []byte) (cacheKey string, result []byte, err error),
) error {
cons, err := na.jstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{})
cons, err := na.jstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "worker",
})
if err != nil {
return fmt.Errorf("create js consumer: %w", err)
}
@ -147,7 +149,7 @@ func (na *NatsAdapter) ConsumeQueue(
if taskErr != nil {
log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr)
if err := msg.Nak(); err != nil {
if err := msg.Term(); err != nil {
log.Errorf("nak %d: %v", seq, err)
}
return

View File

@ -161,7 +161,6 @@ func (p *pageParser) parse() (*models.TaskResult, error) {
func (p *pageParser) waitFullLoad() {
timeout := pwDuration("5s")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err := p.page.WaitForLoadState(playwright.PageWaitForLoadStateOptions{