prevent resubmitting already running task
This commit is contained in:
parent
031b062277
commit
87ceeb4376
@ -6,42 +6,51 @@ import (
|
||||
"github.com/labstack/gommon/log"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const StreamName = "RENDER_TASKS"
|
||||
const SubjectPrefix = "render_tasks"
|
||||
|
||||
var DedupWindow, _ = time.ParseDuration("10s")
|
||||
|
||||
type NatsAdapter struct {
|
||||
jets jetstream.JetStream
|
||||
jstream jetstream.Stream
|
||||
kv jetstream.KeyValue
|
||||
|
||||
runningMu sync.Mutex
|
||||
running map[string]struct{}
|
||||
}
|
||||
|
||||
func New(natsc *nats.Conn) (*NatsAdapter, error) {
|
||||
jets, err := jetstream.New(natsc)
|
||||
na := NatsAdapter{}
|
||||
var err error
|
||||
|
||||
na.jets, err = jetstream.New(natsc)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create jetstream: %w", err)
|
||||
}
|
||||
jstream, err := jets.CreateStream(context.TODO(), jetstream.StreamConfig{
|
||||
|
||||
na.jstream, err = na.jets.CreateStream(context.TODO(), jetstream.StreamConfig{
|
||||
Name: StreamName,
|
||||
Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)},
|
||||
Retention: jetstream.WorkQueuePolicy,
|
||||
Duplicates: DedupWindow,
|
||||
AllowDirect: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create js stream: %w", err)
|
||||
}
|
||||
kv, err := jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
|
||||
|
||||
na.kv, err = na.jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
|
||||
Bucket: "render_cache",
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create nats kv: %w", err)
|
||||
}
|
||||
return &NatsAdapter{jets: jets, jstream: jstream, kv: kv}, nil
|
||||
|
||||
na.running = make(map[string]struct{})
|
||||
|
||||
return &na, nil
|
||||
}
|
||||
|
||||
func (na *NatsAdapter) ProcessWorkCached(
|
||||
@ -50,12 +59,17 @@ func (na *NatsAdapter) ProcessWorkCached(
|
||||
cacheKey string,
|
||||
taskPayload []byte,
|
||||
) (result []byte, err error) {
|
||||
if cacheLifetime < DedupWindow {
|
||||
// if cache lifetime is less than dedup window, we can run into situation
|
||||
// when cache already expired, but new task will be considered duplicate
|
||||
// so client will neither trigger new task nor retrieve cached value
|
||||
cacheLifetime = DedupWindow
|
||||
}
|
||||
|
||||
// prevent resubmitting already running task
|
||||
na.runningMu.Lock()
|
||||
_, alreadyRunning := na.running[cacheKey]
|
||||
na.running[cacheKey] = struct{}{}
|
||||
na.runningMu.Unlock()
|
||||
defer func() {
|
||||
na.runningMu.Lock()
|
||||
delete(na.running, cacheKey)
|
||||
na.runningMu.Unlock()
|
||||
}()
|
||||
|
||||
watcher, err := na.kv.Watch(ctx, cacheKey)
|
||||
if err != nil {
|
||||
@ -74,15 +88,18 @@ func (na *NatsAdapter) ProcessWorkCached(
|
||||
return lastUpdate.Value(), nil
|
||||
}
|
||||
} else {
|
||||
log.Infof("sending task to queue: %s", cacheKey)
|
||||
_, err = na.jets.Publish(
|
||||
ctx,
|
||||
fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey),
|
||||
taskPayload,
|
||||
//jetstream.WithMsgID(cacheKey),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("nats publish error: %v", err)
|
||||
if alreadyRunning {
|
||||
log.Infof("already running: %s", cacheKey)
|
||||
} else {
|
||||
log.Infof("sending task to queue: %s", cacheKey)
|
||||
_, err = na.jets.Publish(
|
||||
ctx,
|
||||
fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey),
|
||||
taskPayload,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("nats publish error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user