From 5416f8bc0a80384bf101748a5156e460efa62bec Mon Sep 17 00:00:00 2001 From: Egor Aristov Date: Wed, 15 Jan 2025 18:46:16 +0300 Subject: [PATCH] initial commit --- .gitignore | 1 + README.md | 1 + cmd/webserver/webserver.go | 231 ++++++++++++++++++ cmd/worker/worker.go | 88 +++++++ config.yml | 3 + frontend/wizard/index.html | 66 +++++ frontend/wizard/main.css | 8 + frontend/wizard/main.js | 70 ++++++ go.mod | 51 ++++ go.sum | 151 ++++++++++++ internal/adapters/adapters.go | 22 ++ internal/adapters/natsadapter/natsadapter.go | 143 +++++++++++ .../extractors/pwextractor/pwextractor.go | 177 ++++++++++++++ internal/models/models.go | 52 ++++ 14 files changed, 1064 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 cmd/webserver/webserver.go create mode 100644 cmd/worker/worker.go create mode 100644 config.yml create mode 100644 frontend/wizard/index.html create mode 100644 frontend/wizard/main.css create mode 100644 frontend/wizard/main.js create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/adapters/adapters.go create mode 100644 internal/adapters/natsadapter/natsadapter.go create mode 100644 internal/extractors/pwextractor/pwextractor.go create mode 100644 internal/models/models.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..85e7c1d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.idea/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..f3152b2 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +Project is in POC stage, codebase is really dirty, deployment and usage is not recommended. pull requests are not accepted yet. project is published only for convinience. release date is unknown. diff --git a/cmd/webserver/webserver.go b/cmd/webserver/webserver.go new file mode 100644 index 0000000..88e5f09 --- /dev/null +++ b/cmd/webserver/webserver.go @@ -0,0 +1,231 @@ +package main + +import ( + "bytes" + "compress/flate" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "github.com/egor3f/rssalchemy/internal/adapters/natsadapter" + "github.com/egor3f/rssalchemy/internal/models" + "github.com/ericchiang/css" + "github.com/go-playground/validator/v10" + "github.com/gorilla/feeds" + "github.com/ilyakaznacheev/cleanenv" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + "github.com/labstack/gommon/log" + "github.com/nats-io/nats.go" + "html" + "io" + "net/http" + "os" + "os/signal" + "reflect" + "time" +) + +type Config struct { + WebserverAddress string `yaml:"webserver_address" env:"WEBSERVER_ADDRESS" env-required:"true"` + NatsUrl string `yaml:"nats_url" env:"NATS_URL" env-required:"true"` + Debug bool `yaml:"debug" env:"DEBUG"` +} + +type Specs struct { + URL string `json:"URL" validate:"url"` + SelectorPost string `json:"selector_post" validate:"selector"` + SelectorTitle string `json:"selector_title" validate:"selector"` + SelectorLink string `json:"selector_link" validate:"selector"` + SelectorDescription string `json:"selector_description" validate:"selector"` + SelectorAuthor string `json:"selector_author" validate:"selector"` + SelectorCreated string `json:"selector_created" validate:"selector"` + SelectorContent string `json:"selector_content" validate:"selector"` + SelectorEnclosure string `json:"selector_enclosure" validate:"selector"` + CacheLifetime string `json:"cache_lifetime"` +} + +func main() { + var cfg Config + err := cleanenv.ReadConfig("config.yml", &cfg) + if err != nil { + log.Panicf("reading config failed: %v", err) + } + + if cfg.Debug { + log.SetLevel(log.DEBUG) + log.SetHeader(`${time_rfc3339_nano} ${level}`) + } + + baseCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() + + validate := validator.New(validator.WithRequiredStructEnabled()) + if err := validate.RegisterValidation("selector", validateSelector); err != nil { + log.Panicf("register validation: %v", err) + } + + natsc, err := nats.Connect(cfg.NatsUrl) + if err != nil { + log.Panicf("nats connect failed: %v", err) + } + defer func() { + if err := natsc.Drain(); err != nil { + log.Errorf("nats drain failed: %v", err) + } + }() + + cq, err := natsadapter.New(natsc) + if err != nil { + log.Panicf("create nats adapter: %v", err) + } + + e := echo.New() + e.Use(middleware.Logger()) + e.Use(middleware.Recover()) + e.Static("/", "frontend/wizard") + e.GET( + "/api/v1/render/:specs", func(c echo.Context) error { + specsParam := c.Param("specs") + specs, err := decodeSpecs(specsParam, validate) + if err != nil { + return echo.NewHTTPError(400, fmt.Errorf("decode specs: %w", err)) + } + + task := models.Task{ + URL: specs.URL, + SelectorPost: specs.SelectorPost, + SelectorTitle: specs.SelectorTitle, + SelectorLink: specs.SelectorLink, + SelectorDescription: specs.SelectorDescription, + SelectorAuthor: specs.SelectorAuthor, + SelectorCreated: specs.SelectorCreated, + SelectorContent: specs.SelectorContent, + SelectorEnclosure: specs.SelectorEnclosure, + } + + taskTimeout, _ := time.ParseDuration("20s") + minLifetime := taskTimeout + maxLifetime, _ := time.ParseDuration("24h") + cacheLifetime, err := time.ParseDuration(specs.CacheLifetime) + if err != nil { + return echo.NewHTTPError(400, "invalid cache lifetime") + } + if cacheLifetime < minLifetime { + cacheLifetime = minLifetime + } + if cacheLifetime > maxLifetime { + cacheLifetime = maxLifetime + } + + timeoutCtx, cancel := context.WithTimeout(baseCtx, taskTimeout) + defer cancel() + + encodedTask, err := json.Marshal(task) + if err != nil { + return echo.NewHTTPError(500, fmt.Errorf("task marshal error: %v", err)) + } + + taskResultBytes, err := cq.ProcessWorkCached(timeoutCtx, cacheLifetime, task.CacheKey(), encodedTask) + if err != nil { + return echo.NewHTTPError(500, fmt.Errorf("queued cache failed: %v", err)) + } + + var result models.TaskResult + if err := json.Unmarshal(taskResultBytes, &result); err != nil { + log.Errorf("cached value unmarshal failed: %v", err) + return echo.NewHTTPError(500, fmt.Errorf("cached value unmarshal failed: %v", err)) + } + + atom, err := makeFeed(task, result) + if err != nil { + log.Errorf("make feed failed: %v", err) + return echo.NewHTTPError(500) + } + + c.Response().Header().Set("Content-Type", "text/xml") + return c.String(200, atom) + }, + ) + + go func() { + if err := e.Start(cfg.WebserverAddress); err != nil && err != http.ErrServerClosed { + e.Logger.Errorf("http server error, shutting down: %v", err) + } + }() + <-baseCtx.Done() + log.Infof("stopping webserver gracefully") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := e.Shutdown(ctx); err != nil { + e.Logger.Errorf("failed to shutdown server: %v", err) + } +} + +func makeFeed(task models.Task, result models.TaskResult) (string, error) { + feedTS := time.Now() + if len(result.Items) > 0 { + feedTS = result.Items[0].Created + } + feed := feeds.Feed{ + Title: html.EscapeString(result.Title), + Link: &feeds.Link{Href: task.URL}, + Updated: feedTS, + } + for _, item := range result.Items { + feed.Items = append(feed.Items, &feeds.Item{ + Title: html.EscapeString(item.Title), + Link: &feeds.Link{Href: item.Link}, + Author: &feeds.Author{Name: item.AuthorName}, + Description: item.Description, + Created: item.Created, + Updated: item.Updated, + Content: item.Content, + }) + } + atomFeed := (&feeds.Atom{Feed: &feed}).AtomFeed() + atomFeed.Icon = result.Icon + for i, entry := range atomFeed.Entries { + entry.Author.Uri = result.Items[i].AuthorLink + } + atom, err := feeds.ToXML(atomFeed) + if err != nil { + return "", fmt.Errorf("feed to xml: %w", err) + } + return atom, nil +} + +func decodeSpecs(specsParam string, validate *validator.Validate) (Specs, error) { + decodedSpecsParam, err := base64.StdEncoding.WithPadding(base64.NoPadding).DecodeString(specsParam) + if err != nil { + return Specs{}, fmt.Errorf("failed to decode specs: %w", err) + } + rc := flate.NewReader(bytes.NewReader(decodedSpecsParam)) + decodedSpecsParam, err = io.ReadAll(rc) + if err != nil { + return Specs{}, fmt.Errorf("failed to unzip specs: %w", err) + } + var specs Specs + if err := json.Unmarshal(decodedSpecsParam, &specs); err != nil { + return Specs{}, fmt.Errorf("failed to unmarshal specs: %w", err) + } + if err := validate.Struct(specs); err != nil { + return Specs{}, fmt.Errorf("specs are invalid: %w", err) + } + return specs, nil +} + +func validateSelector(fl validator.FieldLevel) bool { + if fl.Field().Kind() != reflect.String { + return false + } + _, err := css.Parse(fl.Field().String()) + if err != nil { + log.Debugf("selector %s invalid: %v", fl.Field().String(), err) + } + return err == nil +} + +func cdata(s string) string { + return "" +} diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go new file mode 100644 index 0000000..e5e08bf --- /dev/null +++ b/cmd/worker/worker.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "github.com/egor3f/rssalchemy/internal/adapters/natsadapter" + "github.com/egor3f/rssalchemy/internal/extractors/pwextractor" + "github.com/egor3f/rssalchemy/internal/models" + "github.com/ilyakaznacheev/cleanenv" + "github.com/labstack/gommon/log" + "github.com/nats-io/nats.go" + "os" + "os/signal" +) + +type Config struct { + NatsUrl string `yaml:"nats_url" env:"NATS_URL" env-required:"true"` + Debug bool `yaml:"debug" env:"DEBUG"` +} + +func main() { + var cfg Config + err := cleanenv.ReadConfig("config.yml", &cfg) + if err != nil { + log.Panicf("reading config failed: %w", err) + } + + if cfg.Debug { + log.SetLevel(log.DEBUG) + log.SetHeader(`${time_rfc3339_nano} ${level}`) + } + + defer func() { + log.Infof("worker gracefully stopped") + }() + + baseCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() + + natsc, err := nats.Connect(cfg.NatsUrl) + if err != nil { + log.Panicf("nats connect failed: %v", err) + } + defer func() { + if err := natsc.Drain(); err != nil { + log.Errorf("nats drain failed: %v", err) + } + }() + + qc, err := natsadapter.New(natsc) + if err != nil { + log.Panicf("create nats adapter: %v", err) + } + + pwe, err := pwextractor.New() + if err != nil { + log.Panicf("create pw extractor: %v", err) + } + defer func() { + if err := pwe.Stop(); err != nil { + log.Errorf("stop pw extractor: %v", err) + } + }() + + err = qc.ConsumeQueue(baseCtx, func(taskPayload []byte) (cacheKey string, resultPayoad []byte, errRet error) { + var task models.Task + if err := json.Unmarshal(taskPayload, &task); err != nil { + errRet = fmt.Errorf("unmarshal task: %w", err) + return + } + cacheKey = task.CacheKey() + result, err := pwe.Extract(task) + if err != nil { + errRet = fmt.Errorf("extract: %w", err) + return + } + resultPayoad, err = json.Marshal(result) + if err != nil { + errRet = fmt.Errorf("marshal result: %w", err) + return + } + return + }) + if err != nil { + log.Panicf("consume queue: %v", err) + } +} diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..ee5202b --- /dev/null +++ b/config.yml @@ -0,0 +1,3 @@ +webserver_address: "0.0.0.0:5000" +nats_url: "nats://localhost:4222" +debug: true diff --git a/frontend/wizard/index.html b/frontend/wizard/index.html new file mode 100644 index 0000000..cd3d80c --- /dev/null +++ b/frontend/wizard/index.html @@ -0,0 +1,66 @@ + + + + + + + + RSS Alchemy + + + +
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ + + +
Preview in browser
+
+
+ + + diff --git a/frontend/wizard/main.css b/frontend/wizard/main.css new file mode 100644 index 0000000..7b19695 --- /dev/null +++ b/frontend/wizard/main.css @@ -0,0 +1,8 @@ +#ready_url_link { + visibility: hidden; +} + +div.field input { + width: 250px; + margin-bottom: 4px; +} diff --git a/frontend/wizard/main.js b/frontend/wizard/main.js new file mode 100644 index 0000000..f4a0910 --- /dev/null +++ b/frontend/wizard/main.js @@ -0,0 +1,70 @@ +function readSpecsForm() { + let specs = {}; + for (let field of document.forms['wizard'].elements) { + specs[field.name] = field.value; + } + return specs; +} + +function writeSpecsToForm(specs) { + for (let [k, v] of Object.entries(specs)) { + document.forms['wizard'].elements[k].value = v; + } +} + +async function encodeSpecs(specs) { + let byteArray = new TextEncoder().encode(JSON.stringify(specs)); + let cs = new CompressionStream('deflate-raw'); + let writer = cs.writable.getWriter(); + writer.write(byteArray); + writer.close(); + let response = new Response(cs.readable); + let respBuffer = await response.arrayBuffer(); + let b64str = btoa(String.fromCharCode.apply(null, new Uint8Array(respBuffer))); + return b64str.replaceAll('=', ''); +} + +async function decodeSpecs(str) { + const byteArray = Uint8Array.from(atob(str), c => c.charCodeAt(0)); + let ds = new DecompressionStream('deflate-raw'); + let writer = ds.writable.getWriter(); + writer.write(byteArray); + writer.close(); + let response = new Response(ds.readable); + let respText = await response.text(); + return JSON.parse(respText); +} + +function displayUrl(url) { + let link = document.getElementById('ready_url_link'); + link.href = url; + link.style.visibility = 'visible'; + let readyUrlInput = document.getElementById('url_input'); + readyUrlInput.value = url; + readyUrlInput.focus(); + readyUrlInput.select(); + document.getElementById('cont_url_len').innerText = `len=${url.length}`; +} + +function baseUrl() { + return document.location.origin + '/api/v1/render/'; +} + +async function genUrl() { + let specs = readSpecsForm(); + let encodedSpecs = await encodeSpecs(specs); + let url = baseUrl() + encodedSpecs; + displayUrl(url); +} + +async function editUrl() { + let url = document.getElementById('url_input').value; + let specs = await decodeSpecs(url.replace(baseUrl(), '')); + writeSpecsToForm(specs); + displayUrl(url); +} + +document.addEventListener('DOMContentLoaded', ev => { + document.getElementById('btn_gen_url').addEventListener('click', genUrl); + document.getElementById('btn_edit').addEventListener('click', editUrl); +}); diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7b4ea13 --- /dev/null +++ b/go.mod @@ -0,0 +1,51 @@ +module github.com/egor3f/rssalchemy + +go 1.23 + +require ( + github.com/ericchiang/css v1.4.0 + github.com/go-playground/validator/v10 v10.23.0 + github.com/gorilla/feeds v1.2.0 + github.com/ilyakaznacheev/cleanenv v1.5.0 + github.com/labstack/echo/v4 v4.13.3 + github.com/labstack/gommon v0.4.2 + github.com/markusmobius/go-dateparser v1.2.3 + github.com/nats-io/nats.go v1.38.0 + github.com/playwright-community/playwright-go v0.4901.0 +) + +require ( + github.com/BurntSushi/toml v1.2.1 // indirect + github.com/deckarep/golang-set/v2 v2.7.0 // indirect + github.com/elliotchance/pie/v2 v2.7.0 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect + github.com/go-jose/go-jose/v3 v3.0.3 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-stack/stack v1.8.1 // indirect + github.com/hablullah/go-hijri v1.0.2 // indirect + github.com/hablullah/go-juliandays v1.0.0 // indirect + github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 // indirect + github.com/joho/godotenv v1.5.1 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/leodido/go-urn v1.4.0 // indirect + github.com/magefile/mage v1.14.0 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/tetratelabs/wazero v1.2.1 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasttemplate v1.2.2 // indirect + github.com/wasilibs/go-re2 v1.3.0 // indirect + golang.org/x/crypto v0.32.0 // indirect + golang.org/x/exp v0.0.0-20220321173239-a90fa8a75705 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/time v0.8.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 // indirect +) + +replace github.com/ericchiang/css => github.com/egor3f/css v0.0.0-20250115151140-52c8c51084e5 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..79d8928 --- /dev/null +++ b/go.sum @@ -0,0 +1,151 @@ +github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak= +github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set/v2 v2.7.0 h1:gIloKvD7yH2oip4VLhsv3JyLLFnC0Y2mlusgcvJYW5k= +github.com/deckarep/golang-set/v2 v2.7.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/egor3f/css v0.0.0-20250115151140-52c8c51084e5 h1:rqpFTlOasDC5OXOf8NA+XEdjPClBnPGxsQ484OXx6l4= +github.com/egor3f/css v0.0.0-20250115151140-52c8c51084e5/go.mod h1:sVSdL+MFR9Q4cKJMQzpIkHIDOLiK+7Wmjjhq7D+MubA= +github.com/elliotchance/pie/v2 v2.7.0 h1:FqoIKg4uj0G/CrLGuMS9ejnFKa92lxE1dEgBD3pShXg= +github.com/elliotchance/pie/v2 v2.7.0/go.mod h1:18t0dgGFH006g4eVdDtWfgFZPQEgl10IoEO8YWEq3Og= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= +github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o= +github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= +github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/gorilla/feeds v1.2.0 h1:O6pBiXJ5JHhPvqy53NsjKOThq+dNFm8+DFrxBEdzSCc= +github.com/gorilla/feeds v1.2.0/go.mod h1:WMib8uJP3BbY+X8Szd1rA5Pzhdfh+HCCAYT2z7Fza6Y= +github.com/hablullah/go-hijri v1.0.2 h1:drT/MZpSZJQXo7jftf5fthArShcaMtsal0Zf/dnmp6k= +github.com/hablullah/go-hijri v1.0.2/go.mod h1:OS5qyYLDjORXzK4O1adFw9Q5WfhOcMdAKglDkcTxgWQ= +github.com/hablullah/go-juliandays v1.0.0 h1:A8YM7wIj16SzlKT0SRJc9CD29iiaUzpBLzh5hr0/5p0= +github.com/hablullah/go-juliandays v1.0.0/go.mod h1:0JOYq4oFOuDja+oospuc61YoX+uNEn7Z6uHYTbBzdGc= +github.com/ilyakaznacheev/cleanenv v1.5.0 h1:0VNZXggJE2OYdXE87bfSSwGxeiGt9moSR2lOrsHHvr4= +github.com/ilyakaznacheev/cleanenv v1.5.0/go.mod h1:a5aDzaJrLCQZsazHol1w8InnDcOX0OColm64SlIi6gk= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958 h1:qxLoi6CAcXVzjfvu+KXIXJOAsQB62LXjsfbOaErsVzE= +github.com/jalaali/go-jalaali v0.0.0-20210801064154-80525e88d958/go.mod h1:Wqfu7mjUHj9WDzSSPI5KfBclTTEnLveRUFr/ujWnTgE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/labstack/echo/v4 v4.13.3 h1:pwhpCPrTl5qry5HRdM5FwdXnhXSLSY+WE+YQSeCaafY= +github.com/labstack/echo/v4 v4.13.3/go.mod h1:o90YNEeQWjDozo584l7AwhJMHN0bOC4tAfg+Xox9q5g= +github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= +github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= +github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/markusmobius/go-dateparser v1.2.3 h1:TvrsIvr5uk+3v6poDjaicnAFJ5IgtFHgLiuMY2Eb7Nw= +github.com/markusmobius/go-dateparser v1.2.3/go.mod h1:cMwQRrBUQlK1UI5TIFHEcvpsMbkWrQLXuaPNMFzuYLk= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= +github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= +github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/playwright-community/playwright-go v0.4901.0 h1:d+1KxF5PNAHZ0gTMQ9bPSyYRWii8soJ7Rt0gLWDejc4= +github.com/playwright-community/playwright-go v0.4901.0/go.mod h1:kBNWs/w2aJ2ZUp1wEOOFLXgOqvppFngM5OS+qyhl+ZM= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tetratelabs/wazero v1.2.1 h1:J4X2hrGzJvt+wqltuvcSjHQ7ujQxA9gb6PeMs4qlUWs= +github.com/tetratelabs/wazero v1.2.1/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= +github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= +github.com/wasilibs/go-re2 v1.3.0 h1:LFhBNzoStM3wMie6rN2slD1cuYH2CGiHpvNL3UtcsMw= +github.com/wasilibs/go-re2 v1.3.0/go.mod h1:AafrCXVvGRJJOImMajgJ2M7rVmWyisVK7sFshbxnVrg= +github.com/wasilibs/nottinygc v0.4.0 h1:h1TJMihMC4neN6Zq+WKpLxgd9xCFMw7O9ETLwY2exJQ= +github.com/wasilibs/nottinygc v0.4.0/go.mod h1:oDcIotskuYNMpqMF23l7Z8uzD4TC0WXHK8jetlB3HIo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/exp v0.0.0-20220321173239-a90fa8a75705 h1:ba9YlqfDGTTQ5aZ2fwOoQ1hf32QySyQkR6ODGDzHlnE= +golang.org/x/exp v0.0.0-20220321173239-a90fa8a75705/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3 h1:slmdOY3vp8a7KQbHkL+FLbvbkgMqmXojpFUO/jENuqQ= +olympos.io/encoding/edn v0.0.0-20201019073823-d3554ca0b0a3/go.mod h1:oVgVk4OWVDi43qWBEyGhXgYxt7+ED4iYNpTngSLX2Iw= diff --git a/internal/adapters/adapters.go b/internal/adapters/adapters.go new file mode 100644 index 0000000..169a4f7 --- /dev/null +++ b/internal/adapters/adapters.go @@ -0,0 +1,22 @@ +package adapters + +import ( + "context" + "time" +) + +type CachedWorkQueue interface { + ProcessWorkCached( + ctx context.Context, + cacheLifetime time.Duration, + cacheKey string, + taskPayload []byte, + ) ([]byte, error) +} + +type QueueConsumer interface { + ConsumeQueue( + ctx context.Context, + taskFunc func(taskPayload []byte) (cacheKey string, result []byte, err error), + ) error +} diff --git a/internal/adapters/natsadapter/natsadapter.go b/internal/adapters/natsadapter/natsadapter.go new file mode 100644 index 0000000..9064d3b --- /dev/null +++ b/internal/adapters/natsadapter/natsadapter.go @@ -0,0 +1,143 @@ +package natsadapter + +import ( + "context" + "fmt" + "github.com/labstack/gommon/log" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "time" +) + +const StreamName = "RENDER_TASKS" +const SubjectPrefix = "render_tasks" + +var DedupWindow, _ = time.ParseDuration("10s") + +type NatsAdapter struct { + jets jetstream.JetStream + jstream jetstream.Stream + kv jetstream.KeyValue +} + +func New(natsc *nats.Conn) (*NatsAdapter, error) { + jets, err := jetstream.New(natsc) + if err != nil { + return nil, fmt.Errorf("create jetstream: %w", err) + } + jstream, err := jets.CreateStream(context.TODO(), jetstream.StreamConfig{ + Name: StreamName, + Subjects: []string{fmt.Sprintf("%s.>", SubjectPrefix)}, + Retention: jetstream.WorkQueuePolicy, + Duplicates: DedupWindow, + }) + if err != nil { + return nil, fmt.Errorf("create js stream: %w", err) + } + kv, err := jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{ + Bucket: "render_cache", + }) + if err != nil { + return nil, fmt.Errorf("create nats kv: %w", err) + } + return &NatsAdapter{jets: jets, jstream: jstream, kv: kv}, nil +} + +func (na *NatsAdapter) ProcessWorkCached( + ctx context.Context, + cacheLifetime time.Duration, + cacheKey string, + taskPayload []byte, +) (result []byte, err error) { + if cacheLifetime < DedupWindow { + // if cache lifetime is less than dedup window, we can run into situation + // when cache already expired, but new task will be considered duplicate + // so client will neither trigger new task nor retrieve cached value + cacheLifetime = DedupWindow + } + + watcher, err := na.kv.Watch(ctx, cacheKey) + if err != nil { + return nil, fmt.Errorf("cache watch failed: %w", err) + } + defer watcher.Stop() + + var lastUpdate jetstream.KeyValueEntry + for { + select { + case upd := <-watcher.Updates(): + if upd != nil { + lastUpdate = upd + if time.Since(upd.Created()) <= cacheLifetime { + log.Infof("using cached value for task: %s, payload=%.100s", cacheKey, lastUpdate.Value()) + return lastUpdate.Value(), nil + } + } else { + log.Infof("sending task to queue: %s", cacheKey) + _, err = na.jets.Publish( + ctx, + fmt.Sprintf("%s.%s", SubjectPrefix, cacheKey), + taskPayload, + jetstream.WithMsgID(cacheKey), + ) + if err != nil { + return nil, fmt.Errorf("nats publish error: %v", err) + } + } + case <-ctx.Done(): + log.Warnf("task cancelled by context: %s", cacheKey) + // anyway, using cached lastUpdate + if lastUpdate != nil { + return lastUpdate.Value(), ctx.Err() + } else { + return nil, ctx.Err() + } + } + } +} + +func (na *NatsAdapter) ConsumeQueue( + ctx context.Context, + taskFunc func(taskPayload []byte) (cacheKey string, result []byte, err error), +) error { + cons, err := na.jstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{}) + if err != nil { + return fmt.Errorf("create js consumer: %w", err) + } + consCtx, err := cons.Consume(func(msg jetstream.Msg) { + metadata, err := msg.Metadata() + if err != nil { + log.Errorf("msg metadata: %v", err) + return + } + seq := metadata.Sequence.Stream + if err := msg.InProgress(); err != nil { + log.Errorf("task seq=%d inProgress: %v", seq, err) + } + log.Infof("got task seq=%d payload=%s", seq, msg.Data()) + cacheKey, resultPayload, taskErr := taskFunc(msg.Data()) + if err := msg.DoubleAck(ctx); err != nil { + log.Errorf("double ack seq=%d: %v", seq, err) + } + if taskErr != nil { + log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr) + if err := msg.Nak(); err != nil { + log.Errorf("nak %d: %v", seq, err) + } + return + } + log.Infof("task seq=%d cachekey=%s finished, payload=%.100s", seq, cacheKey, resultPayload) + if _, err := na.kv.Put(ctx, cacheKey, resultPayload); err != nil { + log.Errorf("put seq=%d to cache: %v", seq, err) + return + } + }) + if err != nil { + return fmt.Errorf("consume context: %w", err) + } + log.Infof("ready to consume tasks") + <-ctx.Done() + log.Infof("stopping consumer") + consCtx.Stop() + return nil +} diff --git a/internal/extractors/pwextractor/pwextractor.go b/internal/extractors/pwextractor/pwextractor.go new file mode 100644 index 0000000..86d62b3 --- /dev/null +++ b/internal/extractors/pwextractor/pwextractor.go @@ -0,0 +1,177 @@ +package pwextractor + +import ( + "fmt" + "github.com/egor3f/rssalchemy/internal/models" + "github.com/labstack/gommon/log" + "github.com/markusmobius/go-dateparser" + "github.com/playwright-community/playwright-go" + "net/url" + "strings" + "time" +) + +type PwExtractor struct { + pw *playwright.Playwright + chrome playwright.Browser +} + +func New() (*PwExtractor, error) { + e := PwExtractor{} + var err error + e.pw, err = playwright.Run() + if err != nil { + return nil, fmt.Errorf("run playwright: %w", err) + } + e.chrome, err = e.pw.Chromium.Launch(playwright.BrowserTypeLaunchOptions{ + ChromiumSandbox: playwright.Bool(true), + HandleSIGINT: playwright.Bool(false), + Timeout: pwDuration("5s"), + }) + if err != nil { + return nil, fmt.Errorf("run chromium: %w", err) + } + return &e, nil +} + +func (e *PwExtractor) Stop() error { + if err := e.chrome.Close(); err != nil { + return fmt.Errorf("closing chrome: %w", err) + } + if err := e.pw.Stop(); err != nil { + return fmt.Errorf("stopping playwright: %w", err) + } + return nil +} + +func (e *PwExtractor) Extract(task models.Task) (result *models.TaskResult, errRet error) { + page, err := e.chrome.NewPage() + if err != nil { + return nil, fmt.Errorf("browser new page: %w", err) + } + defer func() { + err := page.Close() + if err != nil { + errRet = fmt.Errorf("close page: %w; other error=%w", err, errRet) + } + }() + log.Debugf("Page opened") + + if _, err := page.Goto(task.URL); err != nil { + return nil, fmt.Errorf("goto page: %w", err) + } + log.Debugf("Url %s visited", task.URL) + + if err := page.WaitForLoadState(playwright.PageWaitForLoadStateOptions{ + State: playwright.LoadStateNetworkidle, + Timeout: pwDuration("5s"), + }); err != nil { + log.Warnf("waiting for page load: %v", err) + } + + result = &models.TaskResult{} + + result.Title, err = page.Title() + if err != nil { + return nil, fmt.Errorf("page title: %w", err) + } + + iconUrl, err := page.Locator("link[rel=apple-touch-icon]").First(). + GetAttribute("href", playwright.LocatorGetAttributeOptions{Timeout: pwDuration("100ms")}) + if err != nil { + log.Warnf("page icon url: %v", err) + } else { + result.Icon = absUrl(iconUrl, page) + } + + posts, err := page.Locator(task.SelectorPost).All() + if err != nil { + return nil, fmt.Errorf("post locator: %w", err) + } + if len(posts) == 0 { + return nil, fmt.Errorf("no posts on page") + } + for _, post := range posts { + item, err := e.extractPost(task, post) + if err != nil { + log.Errorf("extract post fields: %v", err) + continue + } + if len(item.Title) == 0 || len(item.Link) == 0 { + log.Warnf("post has no required fields, skip") + continue + } + result.Items = append(result.Items, item) + } + if len(result.Items) == 0 { + return nil, fmt.Errorf("extract failed for all posts") + } + + return result, nil +} + +func (e *PwExtractor) extractPost(task models.Task, post playwright.Locator) (models.FeedItem, error) { + fieldIdx := 0 + must := func(s string, err error) string { + fieldIdx++ + if err != nil { + log.Errorf("extract post field %d: %v", fieldIdx, err) + return "" + } + log.Debugf("field=%d res=%.100s", fieldIdx, s) + return s + } + var item models.FeedItem + const defTimeout = "100ms" + defOpt := playwright.LocatorTextContentOptions{Timeout: pwDuration(defTimeout)} + defOptAttr := playwright.LocatorGetAttributeOptions{Timeout: pwDuration(defTimeout)} + log.Debugf("---- POST: ----") + + item.Title = must(post.Locator(task.SelectorTitle).First().TextContent(defOpt)) + + item.Link = must(post.Locator(task.SelectorLink).First().GetAttribute("href", defOptAttr)) + page, _ := post.Page() + item.Link = absUrl(item.Link, page) + + item.Description = must(post.Locator(task.SelectorDescription).First().TextContent(defOpt)) + + item.AuthorName = must(post.Locator(task.SelectorAuthor).First().TextContent(defOpt)) + + item.AuthorLink = must(post.Locator(task.SelectorAuthor).First().GetAttribute("href", defOptAttr)) + item.AuthorLink = absUrl(item.AuthorLink, page) + + item.Content = must(post.Locator(task.SelectorContent).First().TextContent(defOpt)) + + item.Enclosure = must(post.Locator(task.SelectorEnclosure).First().GetAttribute("src", defOptAttr)) + + createdDateStr := must(post.Locator(task.SelectorCreated).First().TextContent(defOpt)) + log.Debugf("date=%s", createdDateStr) + createdDate, err := dateparser.Parse(nil, createdDateStr) + if err != nil { + log.Errorf("dateparser: %v", err) + } else { + item.Created = createdDate.Time + } + + return item, nil +} + +func absUrl(link string, page playwright.Page) string { + if strings.HasPrefix(link, "/") { + pageUrl, _ := url.Parse(page.URL()) + link = fmt.Sprintf("%s://%s%s", pageUrl.Scheme, pageUrl.Host, link) + } + log.Debugf("link=%s", link) + return link +} + +// pwDuration converts string like "10s" to milliseconds float64 pointer +// needed for Playwright timeouts (wtf? why they don't use normal Durations?) +func pwDuration(s string) *float64 { + dur, err := time.ParseDuration(s) + if err != nil { + panic(fmt.Errorf("failed to parse duration %s: %w", s, err)) + } + f64 := float64(dur.Milliseconds()) + return &f64 +} diff --git a/internal/models/models.go b/internal/models/models.go new file mode 100644 index 0000000..50a0757 --- /dev/null +++ b/internal/models/models.go @@ -0,0 +1,52 @@ +package models + +import ( + "crypto/sha256" + "fmt" + "time" +) + +type Task struct { + // While adding new fields, dont forget to alter caching func + URL string + SelectorPost string + SelectorTitle string + SelectorLink string + SelectorDescription string + SelectorAuthor string + SelectorCreated string + SelectorContent string + SelectorEnclosure string +} + +func (t Task) CacheKey() string { + h := sha256.New() + h.Write([]byte(t.URL)) + h.Write([]byte(t.SelectorPost)) + h.Write([]byte(t.SelectorTitle)) + h.Write([]byte(t.SelectorLink)) + h.Write([]byte(t.SelectorDescription)) + h.Write([]byte(t.SelectorAuthor)) + h.Write([]byte(t.SelectorCreated)) + h.Write([]byte(t.SelectorContent)) + h.Write([]byte(t.SelectorEnclosure)) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +type FeedItem struct { + Title string + Created time.Time + Updated time.Time + AuthorName string + Link string + Description string + Content string + Enclosure string + AuthorLink string +} + +type TaskResult struct { + Title string + Items []FeedItem + Icon string +}