prevent resubmitting already running task
This commit is contained in:
parent
bf88ba1a36
commit
5d45ec67c3
@ -6,42 +6,51 @@ import (
|
|||||||
"github.com/labstack/gommon/log"
|
"github.com/labstack/gommon/log"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/nats-io/nats.go/jetstream"
|
"github.com/nats-io/nats.go/jetstream"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const StreamName = "RENDER_TASKS"
|
const StreamName = "RENDER_TASKS"
|
||||||
const SubjectPrefix = "render_tasks"
|
const SubjectPrefix = "render_tasks"
|
||||||
|
|
||||||
var DedupWindow, _ = time.ParseDuration("10s")
|
|
||||||
|
|
||||||
type NatsAdapter struct {
|
type NatsAdapter struct {
|
||||||
jets jetstream.JetStream
|
jets jetstream.JetStream
|
||||||
jstream jetstream.Stream
|
jstream jetstream.Stream
|
||||||
kv jetstream.KeyValue
|
kv jetstream.KeyValue
|
||||||
|
|
||||||
|
runningMu sync.Mutex
|
||||||
|
running map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(natsc *nats.Conn) (*NatsAdapter, error) {
|
func New(natsc *nats.Conn) (*NatsAdapter, error) {
|
||||||
jets, err := jetstream.New(natsc)
|
na := NatsAdapter{}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
na.jets, err = jetstream.New(natsc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create jetstream: %w", err)
|
return nil, fmt.Errorf("create jetstream: %w", err)
|
||||||
}
|
}
|
||||||
jstream, err := jets.CreateStream(context.TODO(), jetstream.StreamConfig{
|
|
||||||
|
na.jstream, err = na.jets.CreateStream(context.TODO(), jetstream.StreamConfig{
|
||||||
Name: StreamName,
|
Name: StreamName,
|
||||||
Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)},
|
Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)},
|
||||||
Retention: jetstream.WorkQueuePolicy,
|
Retention: jetstream.WorkQueuePolicy,
|
||||||
Duplicates: DedupWindow,
|
|
||||||
AllowDirect: true,
|
AllowDirect: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create js stream: %w", err)
|
return nil, fmt.Errorf("create js stream: %w", err)
|
||||||
}
|
}
|
||||||
kv, err := jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
|
|
||||||
|
na.kv, err = na.jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
|
||||||
Bucket: "render_cache",
|
Bucket: "render_cache",
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create nats kv: %w", err)
|
return nil, fmt.Errorf("create nats kv: %w", err)
|
||||||
}
|
}
|
||||||
return &NatsAdapter{jets: jets, jstream: jstream, kv: kv}, nil
|
|
||||||
|
na.running = make(map[string]struct{})
|
||||||
|
|
||||||
|
return &na, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (na *NatsAdapter) ProcessWorkCached(
|
func (na *NatsAdapter) ProcessWorkCached(
|
||||||
@ -50,12 +59,17 @@ func (na *NatsAdapter) ProcessWorkCached(
|
|||||||
cacheKey string,
|
cacheKey string,
|
||||||
taskPayload []byte,
|
taskPayload []byte,
|
||||||
) (result []byte, err error) {
|
) (result []byte, err error) {
|
||||||
if cacheLifetime < DedupWindow {
|
|
||||||
// if cache lifetime is less than dedup window, we can run into situation
|
// prevent resubmitting already running task
|
||||||
// when cache already expired, but new task will be considered duplicate
|
na.runningMu.Lock()
|
||||||
// so client will neither trigger new task nor retrieve cached value
|
_, alreadyRunning := na.running[cacheKey]
|
||||||
cacheLifetime = DedupWindow
|
na.running[cacheKey] = struct{}{}
|
||||||
}
|
na.runningMu.Unlock()
|
||||||
|
defer func() {
|
||||||
|
na.runningMu.Lock()
|
||||||
|
delete(na.running, cacheKey)
|
||||||
|
na.runningMu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
watcher, err := na.kv.Watch(ctx, cacheKey)
|
watcher, err := na.kv.Watch(ctx, cacheKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -74,15 +88,18 @@ func (na *NatsAdapter) ProcessWorkCached(
|
|||||||
return lastUpdate.Value(), nil
|
return lastUpdate.Value(), nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Infof("sending task to queue: %s", cacheKey)
|
if alreadyRunning {
|
||||||
_, err = na.jets.Publish(
|
log.Infof("already running: %s", cacheKey)
|
||||||
ctx,
|
} else {
|
||||||
fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey),
|
log.Infof("sending task to queue: %s", cacheKey)
|
||||||
taskPayload,
|
_, err = na.jets.Publish(
|
||||||
//jetstream.WithMsgID(cacheKey),
|
ctx,
|
||||||
)
|
fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey),
|
||||||
if err != nil {
|
taskPayload,
|
||||||
return nil, fmt.Errorf("nats publish error: %v", err)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("nats publish error: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user