From af4c94ac24cef8b80c5c462182862fb4e4817ef9 Mon Sep 17 00:00:00 2001 From: Egor Aristov Date: Sat, 8 Feb 2025 23:01:16 +0300 Subject: [PATCH] some refactoring (sol_I_d) --- cmd/webserver/webserver.go | 4 +- internal/adapters/adapters.go | 17 ++-- internal/adapters/natsadapter/natsadapter.go | 87 +++++++++++--------- internal/api/http/handler.go | 30 ++++--- 4 files changed, 81 insertions(+), 57 deletions(-) diff --git a/cmd/webserver/webserver.go b/cmd/webserver/webserver.go index 54ca2e3..cc5a449 100644 --- a/cmd/webserver/webserver.go +++ b/cmd/webserver/webserver.go @@ -40,7 +40,7 @@ func main() { } }() - cq, err := natsadapter.New(natsc, "RENDER_TASKS") + na, err := natsadapter.New(natsc, "RENDER_TASKS") if err != nil { log.Panicf("create nats adapter: %v", err) } @@ -51,7 +51,7 @@ func main() { e.StaticFS("/", echo.MustSubFS(wizard_vue.EmbedFS, wizard_vue.FSPrefix)) - apiHandler := httpApi.New(cq) + apiHandler := httpApi.New(na, na) apiHandler.SetupRoutes(e.Group("/api/v1")) go func() { diff --git a/internal/adapters/adapters.go b/internal/adapters/adapters.go index e1d3dbc..1f0fdb0 100644 --- a/internal/adapters/adapters.go +++ b/internal/adapters/adapters.go @@ -2,16 +2,19 @@ package adapters import ( "context" + "fmt" "time" ) -type CachedWorkQueue interface { - ProcessWorkCached( - ctx context.Context, - cacheLifetime time.Duration, - cacheKey string, - taskPayload []byte, - ) (result []byte, err error) +type WorkQueue interface { + Enqueue(ctx context.Context, key string, payload []byte) (result []byte, err error) +} + +var ErrKeyNotFound = fmt.Errorf("key not found") + +type Cache interface { + Get(key string) (result []byte, ts time.Time, err error) + Set(key string, payload []byte) (err error) } type QueueConsumer interface { diff --git a/internal/adapters/natsadapter/natsadapter.go b/internal/adapters/natsadapter/natsadapter.go index 1d4a650..ed8613a 100644 --- a/internal/adapters/natsadapter/natsadapter.go +++ b/internal/adapters/natsadapter/natsadapter.go @@ -2,7 +2,9 @@ package natsadapter import ( "context" + "errors" "fmt" + "github.com/egor3f/rssalchemy/internal/adapters" "github.com/labstack/gommon/log" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" @@ -56,67 +58,76 @@ func New(natsc *nats.Conn, streamName string) (*NatsAdapter, error) { return &na, nil } -func (na *NatsAdapter) ProcessWorkCached( - ctx context.Context, - cacheLifetime time.Duration, - cacheKey string, - taskPayload []byte, -) (result []byte, err error) { - +func (na *NatsAdapter) Enqueue(ctx context.Context, key string, payload []byte) ([]byte, error) { // prevent resubmitting already running task na.runningMu.Lock() - _, alreadyRunning := na.running[cacheKey] - na.running[cacheKey] = struct{}{} + _, alreadyRunning := na.running[key] + na.running[key] = struct{}{} na.runningMu.Unlock() defer func() { na.runningMu.Lock() - delete(na.running, cacheKey) + delete(na.running, key) na.runningMu.Unlock() }() - watcher, err := na.kv.Watch(ctx, cacheKey) + watcher, err := na.kv.Watch(ctx, key) if err != nil { - return nil, fmt.Errorf("cache watch failed: %w", err) + return nil, fmt.Errorf("nats watch failed: %w", err) } defer watcher.Stop() - var lastUpdate jetstream.KeyValueEntry + var taskEnqueued bool for { select { case upd := <-watcher.Updates(): if upd != nil { - lastUpdate = upd - if time.Since(upd.Created()) <= cacheLifetime { - log.Infof("using cached value for task: %s, payload=%.100s", cacheKey, lastUpdate.Value()) - return lastUpdate.Value(), nil - } - } else { - if alreadyRunning { - log.Infof("already running: %s", cacheKey) - } else { - log.Infof("sending task to queue: %s", cacheKey) - _, err = na.jets.Publish( - ctx, - fmt.Sprintf("%s.%s", na.streamName, cacheKey), - taskPayload, - ) - if err != nil { - return nil, fmt.Errorf("nats publish error: %v", err) - } + if !taskEnqueued { + // old value from cache, skipping + continue } + log.Infof("got value for task: %s, payload=%.100s", key, upd.Value()) + return upd.Value(), nil + } + taskEnqueued = true + if alreadyRunning { + log.Infof("already running: %s", key) + continue + } + log.Infof("sending task to queue: %s", key) + _, err = na.jets.Publish( + ctx, + fmt.Sprintf("%s.%s", na.streamName, key), + payload, + ) + if err != nil { + return nil, fmt.Errorf("nats publish error: %v", err) } case <-ctx.Done(): - log.Warnf("task cancelled by context: %s", cacheKey) - // anyway, using cached lastUpdate - if lastUpdate != nil { - return lastUpdate.Value(), ctx.Err() - } else { - return nil, ctx.Err() - } + log.Warnf("task cancelled by context: %s", key) + return nil, ctx.Err() } } } +func (na *NatsAdapter) Get(key string) (result []byte, ts time.Time, err error) { + entry, err := na.kv.Get(context.TODO(), key) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return nil, time.Time{}, adapters.ErrKeyNotFound + } + return nil, time.Time{}, fmt.Errorf("nats: %w", err) + } + return entry.Value(), entry.Created(), nil +} + +func (na *NatsAdapter) Set(key string, payload []byte) error { + _, err := na.kv.Put(context.TODO(), key, payload) + if err != nil { + return fmt.Errorf("nats: %w", err) + } + return nil +} + func (na *NatsAdapter) ConsumeQueue( ctx context.Context, taskFunc func(taskPayload []byte) (cacheKey string, result []byte, err error), diff --git a/internal/api/http/handler.go b/internal/api/http/handler.go index e07e09e..0fff5a6 100644 --- a/internal/api/http/handler.go +++ b/internal/api/http/handler.go @@ -6,6 +6,7 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" "github.com/egor3f/rssalchemy/internal/adapters" "github.com/egor3f/rssalchemy/internal/models" @@ -23,18 +24,22 @@ import ( ) const ( - taskTimeout = 45 * time.Second + taskTimeout = 1 * time.Minute minLifetime = taskTimeout maxLifetime = 24 * time.Hour ) type Handler struct { - validate *validator.Validate - CachedQueue adapters.CachedWorkQueue + validate *validator.Validate + workQueue adapters.WorkQueue + cache adapters.Cache } -func New(cq adapters.CachedWorkQueue) *Handler { - h := Handler{CachedQueue: cq} +func New(wq adapters.WorkQueue, cache adapters.Cache) *Handler { + if wq == nil || cache == nil { + panic("you fckd up with di again") + } + h := Handler{workQueue: wq, cache: cache} h.validate = validator.New(validator.WithRequiredStructEnabled()) if err := h.validate.RegisterValidation("selector", validators.ValidateSelector); err != nil { log.Panicf("register validation: %v", err) @@ -100,9 +105,15 @@ func (h *Handler) handleRender(c echo.Context) error { return echo.NewHTTPError(500, fmt.Errorf("task marshal error: %v", err)) } - taskResultBytes, err := h.CachedQueue.ProcessWorkCached(timeoutCtx, cacheLifetime, task.CacheKey(), encodedTask) - if err != nil { - return echo.NewHTTPError(500, fmt.Errorf("queued cache failed: %v", err)) + taskResultBytes, cachedTS, err := h.cache.Get(task.CacheKey()) + if err != nil && !errors.Is(err, adapters.ErrKeyNotFound) { + return echo.NewHTTPError(500, fmt.Errorf("cache failed: %v", err)) + } + if errors.Is(err, adapters.ErrKeyNotFound) || time.Since(cachedTS) > cacheLifetime { + taskResultBytes, err = h.workQueue.Enqueue(timeoutCtx, task.CacheKey(), encodedTask) + if err != nil { + return echo.NewHTTPError(500, fmt.Errorf("task enqueue failed: %v", err)) + } } var result models.TaskResult @@ -140,8 +151,7 @@ func (h *Handler) handlePageScreenshot(c echo.Context) error { return echo.NewHTTPError(500, fmt.Errorf("task marshal error: %v", err)) } - cacheLifetime := minLifetime - taskResultBytes, err := h.CachedQueue.ProcessWorkCached(timeoutCtx, cacheLifetime, task.CacheKey(), encodedTask) + taskResultBytes, err := h.workQueue.Enqueue(timeoutCtx, task.CacheKey(), encodedTask) if err != nil { return echo.NewHTTPError(500, fmt.Errorf("queued cache failed: %v", err)) }