stream name parameter

This commit is contained in:
Egor Aristov 2025-02-07 16:38:53 +03:00
parent b271c8573e
commit c69e7ca2ba
4 changed files with 19 additions and 13 deletions

3
.githooks/pre-commit Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env bash
go vet ./...

View File

@ -40,7 +40,7 @@ func main() {
}
}()
cq, err := natsadapter.New(natsc)
cq, err := natsadapter.New(natsc, "RENDER_TASKS")
if err != nil {
log.Panicf("create nats adapter: %v", err)
}

View File

@ -45,7 +45,7 @@ func main() {
}
}()
qc, err := natsadapter.New(natsc)
qc, err := natsadapter.New(natsc, "RENDER_TASKS")
if err != nil {
log.Panicf("create nats adapter: %v", err)
}

View File

@ -10,30 +10,33 @@ import (
"time"
)
const StreamName = "RENDER_TASKS"
const SubjectPrefix = "render_tasks"
type NatsAdapter struct {
jets jetstream.JetStream
jstream jetstream.Stream
kv jetstream.KeyValue
streamName string
runningMu sync.Mutex
running map[string]struct{}
}
func New(natsc *nats.Conn) (*NatsAdapter, error) {
func New(natsc *nats.Conn, streamName string) (*NatsAdapter, error) {
na := NatsAdapter{}
var err error
if len(streamName) == 0 {
return nil, fmt.Errorf("stream name is empty")
}
na.streamName = streamName
na.jets, err = jetstream.New(natsc)
if err != nil {
return nil, fmt.Errorf("create jetstream: %w", err)
}
na.jstream, err = na.jets.CreateStream(context.TODO(), jetstream.StreamConfig{
Name: StreamName,
Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)},
na.jstream, err = na.jets.CreateOrUpdateStream(context.TODO(), jetstream.StreamConfig{
Name: streamName,
Subjects: []string{fmt.Sprintf("%s.>", streamName)},
Retention: jetstream.WorkQueuePolicy,
AllowDirect: true,
})
@ -94,7 +97,7 @@ func (na *NatsAdapter) ProcessWorkCached(
log.Infof("sending task to queue: %s", cacheKey)
_, err = na.jets.Publish(
ctx,
fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey),
fmt.Sprintf("%s.%s", na.streamName, cacheKey),
taskPayload,
)
if err != nil {