stream name parameter

This commit is contained in:
Egor Aristov 2025-02-07 16:38:53 +03:00
parent 5dc056df23
commit 8203955494
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F
4 changed files with 19 additions and 13 deletions

3
.githooks/pre-commit Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
go vet ./...

View File

@ -40,7 +40,7 @@ func main() {
} }
}() }()
cq, err := natsadapter.New(natsc) cq, err := natsadapter.New(natsc, "RENDER_TASKS")
if err != nil { if err != nil {
log.Panicf("create nats adapter: %v", err) log.Panicf("create nats adapter: %v", err)
} }

View File

@ -45,7 +45,7 @@ func main() {
} }
}() }()
qc, err := natsadapter.New(natsc) qc, err := natsadapter.New(natsc, "RENDER_TASKS")
if err != nil { if err != nil {
log.Panicf("create nats adapter: %v", err) log.Panicf("create nats adapter: %v", err)
} }

View File

@ -10,30 +10,33 @@ import (
"time" "time"
) )
const StreamName = "RENDER_TASKS"
const SubjectPrefix = "render_tasks"
type NatsAdapter struct { type NatsAdapter struct {
jets jetstream.JetStream jets jetstream.JetStream
jstream jetstream.Stream jstream jetstream.Stream
kv jetstream.KeyValue kv jetstream.KeyValue
streamName string
runningMu sync.Mutex runningMu sync.Mutex
running map[string]struct{} running map[string]struct{}
} }
func New(natsc *nats.Conn) (*NatsAdapter, error) { func New(natsc *nats.Conn, streamName string) (*NatsAdapter, error) {
na := NatsAdapter{} na := NatsAdapter{}
var err error var err error
if len(streamName) == 0 {
return nil, fmt.Errorf("stream name is empty")
}
na.streamName = streamName
na.jets, err = jetstream.New(natsc) na.jets, err = jetstream.New(natsc)
if err != nil { if err != nil {
return nil, fmt.Errorf("create jetstream: %w", err) return nil, fmt.Errorf("create jetstream: %w", err)
} }
na.jstream, err = na.jets.CreateStream(context.TODO(), jetstream.StreamConfig{ na.jstream, err = na.jets.CreateOrUpdateStream(context.TODO(), jetstream.StreamConfig{
Name: StreamName, Name: streamName,
Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)}, Subjects: []string{fmt.Sprintf("%s.>", streamName)},
Retention: jetstream.WorkQueuePolicy, Retention: jetstream.WorkQueuePolicy,
AllowDirect: true, AllowDirect: true,
}) })
@ -94,7 +97,7 @@ func (na *NatsAdapter) ProcessWorkCached(
log.Infof("sending task to queue: %s", cacheKey) log.Infof("sending task to queue: %s", cacheKey)
_, err = na.jets.Publish( _, err = na.jets.Publish(
ctx, ctx,
fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey), fmt.Sprintf("%s.%s", na.streamName, cacheKey),
taskPayload, taskPayload,
) )
if err != nil { if err != nil {