diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100644 index 0000000..3459c53 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +go vet ./... diff --git a/cmd/webserver/webserver.go b/cmd/webserver/webserver.go index e132acd..54ca2e3 100644 --- a/cmd/webserver/webserver.go +++ b/cmd/webserver/webserver.go @@ -40,7 +40,7 @@ func main() { } }() - cq, err := natsadapter.New(natsc) + cq, err := natsadapter.New(natsc, "RENDER_TASKS") if err != nil { log.Panicf("create nats adapter: %v", err) } diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index ab84a3b..8d88e68 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -45,7 +45,7 @@ func main() { } }() - qc, err := natsadapter.New(natsc) + qc, err := natsadapter.New(natsc, "RENDER_TASKS") if err != nil { log.Panicf("create nats adapter: %v", err) } diff --git a/internal/adapters/natsadapter/natsadapter.go b/internal/adapters/natsadapter/natsadapter.go index 7b8ce6a..1d4a650 100644 --- a/internal/adapters/natsadapter/natsadapter.go +++ b/internal/adapters/natsadapter/natsadapter.go @@ -10,30 +10,33 @@ import ( "time" ) -const StreamName = "RENDER_TASKS" -const SubjectPrefix = "render_tasks" - type NatsAdapter struct { - jets jetstream.JetStream - jstream jetstream.Stream - kv jetstream.KeyValue + jets jetstream.JetStream + jstream jetstream.Stream + kv jetstream.KeyValue + streamName string runningMu sync.Mutex running map[string]struct{} } -func New(natsc *nats.Conn) (*NatsAdapter, error) { +func New(natsc *nats.Conn, streamName string) (*NatsAdapter, error) { na := NatsAdapter{} var err error + if len(streamName) == 0 { + return nil, fmt.Errorf("stream name is empty") + } + na.streamName = streamName + na.jets, err = jetstream.New(natsc) if err != nil { return nil, fmt.Errorf("create jetstream: %w", err) } - na.jstream, err = na.jets.CreateStream(context.TODO(), jetstream.StreamConfig{ - Name: StreamName, - Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)}, + na.jstream, err = na.jets.CreateOrUpdateStream(context.TODO(), jetstream.StreamConfig{ + Name: streamName, + Subjects: []string{fmt.Sprintf("%s.>", streamName)}, Retention: jetstream.WorkQueuePolicy, AllowDirect: true, }) @@ -94,7 +97,7 @@ func (na *NatsAdapter) ProcessWorkCached( log.Infof("sending task to queue: %s", cacheKey) _, err = na.jets.Publish( ctx, - fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey), + fmt.Sprintf("%s.%s", na.streamName, cacheKey), taskPayload, ) if err != nil {