From 5d45ec67c33950dbb878894f9c5de58d852a9e1d Mon Sep 17 00:00:00 2001 From: Egor Aristov Date: Sun, 19 Jan 2025 18:19:45 +0300 Subject: [PATCH] prevent resubmitting already running task --- internal/adapters/natsadapter/natsadapter.go | 61 +++++++++++++------- 1 file changed, 39 insertions(+), 22 deletions(-) diff --git a/internal/adapters/natsadapter/natsadapter.go b/internal/adapters/natsadapter/natsadapter.go index 3c0c99e..c0be658 100644 --- a/internal/adapters/natsadapter/natsadapter.go +++ b/internal/adapters/natsadapter/natsadapter.go @@ -6,42 +6,51 @@ import ( "github.com/labstack/gommon/log" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" + "sync" "time" ) const StreamName = "RENDER_TASKS" const SubjectPrefix = "render_tasks" -var DedupWindow, _ = time.ParseDuration("10s") - type NatsAdapter struct { jets jetstream.JetStream jstream jetstream.Stream kv jetstream.KeyValue + + runningMu sync.Mutex + running map[string]struct{} } func New(natsc *nats.Conn) (*NatsAdapter, error) { - jets, err := jetstream.New(natsc) + na := NatsAdapter{} + var err error + + na.jets, err = jetstream.New(natsc) if err != nil { return nil, fmt.Errorf("create jetstream: %w", err) } - jstream, err := jets.CreateStream(context.TODO(), jetstream.StreamConfig{ + + na.jstream, err = na.jets.CreateStream(context.TODO(), jetstream.StreamConfig{ Name: StreamName, Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)}, Retention: jetstream.WorkQueuePolicy, - Duplicates: DedupWindow, AllowDirect: true, }) if err != nil { return nil, fmt.Errorf("create js stream: %w", err) } - kv, err := jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{ + + na.kv, err = na.jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{ Bucket: "render_cache", }) if err != nil { return nil, fmt.Errorf("create nats kv: %w", err) } - return &NatsAdapter{jets: jets, jstream: jstream, kv: kv}, nil + + na.running = make(map[string]struct{}) + + return &na, nil } func (na *NatsAdapter) ProcessWorkCached( @@ -50,12 +59,17 @@ func (na *NatsAdapter) ProcessWorkCached( cacheKey string, taskPayload []byte, ) (result []byte, err error) { - if cacheLifetime < DedupWindow { - // if cache lifetime is less than dedup window, we can run into situation - // when cache already expired, but new task will be considered duplicate - // so client will neither trigger new task nor retrieve cached value - cacheLifetime = DedupWindow - } + + // prevent resubmitting already running task + na.runningMu.Lock() + _, alreadyRunning := na.running[cacheKey] + na.running[cacheKey] = struct{}{} + na.runningMu.Unlock() + defer func() { + na.runningMu.Lock() + delete(na.running, cacheKey) + na.runningMu.Unlock() + }() watcher, err := na.kv.Watch(ctx, cacheKey) if err != nil { @@ -74,15 +88,18 @@ func (na *NatsAdapter) ProcessWorkCached( return lastUpdate.Value(), nil } } else { - log.Infof("sending task to queue: %s", cacheKey) - _, err = na.jets.Publish( - ctx, - fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey), - taskPayload, - //jetstream.WithMsgID(cacheKey), - ) - if err != nil { - return nil, fmt.Errorf("nats publish error: %v", err) + if alreadyRunning { + log.Infof("already running: %s", cacheKey) + } else { + log.Infof("sending task to queue: %s", cacheKey) + _, err = na.jets.Publish( + ctx, + fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey), + taskPayload, + ) + if err != nil { + return nil, fmt.Errorf("nats publish error: %v", err) + } } } case <-ctx.Done():