diff --git a/cmd/extractor/extractor.go b/cmd/extractor/extractor.go index 828360c..b9f2bb1 100644 --- a/cmd/extractor/extractor.go +++ b/cmd/extractor/extractor.go @@ -4,6 +4,7 @@ import ( "encoding/json" "flag" "github.com/egor3f/rssalchemy/internal/config" + dummycookies "github.com/egor3f/rssalchemy/internal/cookiemgr/dummy" "github.com/egor3f/rssalchemy/internal/dateparser" "github.com/egor3f/rssalchemy/internal/extractors/pwextractor" "github.com/egor3f/rssalchemy/internal/models" @@ -63,6 +64,7 @@ func main() { return time.Date(2025, 01, 10, 10, 00, 00, 00, time.UTC) }, }, + CookieManager: dummycookies.New(), }) if err != nil { log.Panicf("create pw extractor: %v", err) diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 2fdc748..ab84a3b 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/egor3f/rssalchemy/internal/adapters/natsadapter" "github.com/egor3f/rssalchemy/internal/config" + natscookies "github.com/egor3f/rssalchemy/internal/cookiemgr/nats" "github.com/egor3f/rssalchemy/internal/dateparser" "github.com/egor3f/rssalchemy/internal/extractors/pwextractor" "github.com/egor3f/rssalchemy/internal/models" @@ -49,11 +50,17 @@ func main() { log.Panicf("create nats adapter: %v", err) } + cookieManager, err := natscookies.New(natsc) + if err != nil { + log.Panicf("create cookie manager: %v", err) + } + pwe, err := pwextractor.New(pwextractor.Config{ Proxy: cfg.Proxy, DateParser: &dateparser.DateParser{ CurrentTimeFunc: time.Now, }, + CookieManager: cookieManager, }) if err != nil { log.Panicf("create pw extractor: %v", err) diff --git a/internal/cookiemgr/dummy/dummycookies.go b/internal/cookiemgr/dummy/dummycookies.go new file mode 100644 index 0000000..e37f8f1 --- /dev/null +++ b/internal/cookiemgr/dummy/dummycookies.go @@ -0,0 +1,19 @@ +package dummy + +import "github.com/egor3f/rssalchemy/internal/cookiemgr" + +type CookieManager struct { +} + +func New() *CookieManager { + m := CookieManager{} + return &m +} + +func (m *CookieManager) GetCookies(key string, cookieHeader string) ([][2]string, error) { + return cookiemgr.ParseCookieHeader(cookieHeader) +} + +func (m *CookieManager) UpdateCookies(key string, cookieHeader string, cookies [][2]string) error { + return nil +} diff --git a/internal/cookiemgr/nats/natscookies.go b/internal/cookiemgr/nats/natscookies.go new file mode 100644 index 0000000..3a6dd40 --- /dev/null +++ b/internal/cookiemgr/nats/natscookies.go @@ -0,0 +1,79 @@ +package nats + +import ( + "context" + "crypto/sha256" + "errors" + "fmt" + "github.com/egor3f/rssalchemy/internal/cookiemgr" + "github.com/labstack/gommon/log" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type CookieManager struct { + kv jetstream.KeyValue +} + +func New(natsc *nats.Conn) (*CookieManager, error) { + m := CookieManager{} + + jets, err := jetstream.New(natsc) + if err != nil { + return nil, fmt.Errorf("create jetstream: %w", err) + } + + m.kv, err = jets.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{ + Bucket: "cookie_manager_store", + }) + if err != nil { + return nil, fmt.Errorf("create nats kv: %w", err) + } + + return &m, nil +} + +func (m *CookieManager) GetCookies(key string, cookieHeader string) ([][2]string, error) { + cookies, err := cookiemgr.ParseCookieHeader(cookieHeader) + if err != nil { + return nil, fmt.Errorf("parse cookie header: %w", err) + } + storeKey := m.storeKey(key, cookies) + log.Debugf("Store key = %s", storeKey) + value, err := m.kv.Get(context.TODO(), storeKey) + if err != nil { + if errors.Is(err, jetstream.ErrKeyNotFound) { + return cookies, nil + } + return nil, fmt.Errorf("kv: %w", err) + } + cookies, err = cookiemgr.ParseCookieHeader(string(value.Value())) + if err != nil { + return nil, fmt.Errorf("parse cookies from kv: %w", err) + } + return cookies, nil +} + +func (m *CookieManager) UpdateCookies(key string, oldCookieHeader string, cookies [][2]string) error { + if len(cookies) == 0 { + return nil + } + newCookieValue := cookiemgr.EncodeCookieHeader(cookies) + oldCookies, err := cookiemgr.ParseCookieHeader(oldCookieHeader) + if err != nil { + return fmt.Errorf("parse cookie header: %w", err) + } + storeKey := m.storeKey(key, oldCookies) + _, err = m.kv.PutString(context.TODO(), storeKey, newCookieValue) + if err != nil { + return fmt.Errorf("kv: %w", err) + } + return nil +} + +func (m *CookieManager) storeKey(key string, cookies [][2]string) string { + hash := cookiemgr.CookiesHash(cookies) + keyHash := sha256.New() + keyHash.Write([]byte(key)) + return fmt.Sprintf("%x_%s", keyHash.Sum(nil), hash) +} diff --git a/internal/cookiemgr/utils.go b/internal/cookiemgr/utils.go new file mode 100644 index 0000000..65a6eac --- /dev/null +++ b/internal/cookiemgr/utils.go @@ -0,0 +1,41 @@ +package cookiemgr + +import ( + "crypto/sha256" + "fmt" + "net/url" + "strings" +) + +func ParseCookieHeader(cookieStr string) ([][2]string, error) { + var result [][2]string + + for _, cook := range strings.Split(cookieStr, ";") { + kv := strings.Split(cook, "=") + if len(kv) < 2 { + return nil, fmt.Errorf("failed to parse cookies: split by =: count<2") + } + k, err1 := url.QueryUnescape(kv[0]) + v, err2 := url.QueryUnescape(strings.Join(kv[1:], "=")) + if err1 != nil || err2 != nil { + return nil, fmt.Errorf("failed to parse cookies: unescape k=%w v=%w", err1, err2) + } + result = append(result, [2]string{strings.TrimSpace(k), strings.TrimSpace(v)}) + } + + return result, nil +} + +func EncodeCookieHeader(cookies [][2]string) string { + result := make([]string, len(cookies)) + for i, cook := range cookies { + result[i] = fmt.Sprintf("%s=%s", url.QueryEscape(cook[0]), url.QueryEscape(cook[1])) + } + return strings.Join(result, "; ") +} + +func CookiesHash(cookies [][2]string) string { + hash := sha256.New() + hash.Write([]byte(fmt.Sprintf("%v", cookies))) + return fmt.Sprintf("%x", hash.Sum(nil)) +} diff --git a/internal/extractors/pwextractor/pwextractor.go b/internal/extractors/pwextractor/pwextractor.go index 2d9cb69..087873f 100644 --- a/internal/extractors/pwextractor/pwextractor.go +++ b/internal/extractors/pwextractor/pwextractor.go @@ -28,15 +28,22 @@ type DateParser interface { ParseDate(string) (time.Time, error) } +type CookieManager interface { + GetCookies(key string, cookieHeader string) ([][2]string, error) + UpdateCookies(key string, cookieHeader string, cookies [][2]string) error +} + type PwExtractor struct { - pw *playwright.Playwright - chrome playwright.Browser - dateParser DateParser + pw *playwright.Playwright + chrome playwright.Browser + dateParser DateParser + cookieManager CookieManager } type Config struct { - Proxy string - DateParser DateParser + Proxy string + DateParser DateParser + CookieManager CookieManager } func New(cfg Config) (*PwExtractor, error) { @@ -59,7 +66,13 @@ func New(cfg Config) (*PwExtractor, error) { if err != nil { return nil, fmt.Errorf("run chromium: %w", err) } + e.dateParser = cfg.DateParser + e.cookieManager = cfg.CookieManager + if e.dateParser == nil || e.cookieManager == nil { + panic("you fckd up with di again") + } + return &e, nil } @@ -74,11 +87,21 @@ func (e *PwExtractor) Stop() error { } func (e *PwExtractor) visitPage(task models.Task, cb func(page playwright.Page) error) (errRet error) { + headers := maps.Clone(task.Headers) headers["Sec-Ch-Ua"] = secChUa + var cookieStr string + var cookies [][2]string if v, ok := headers["Cookie"]; ok { cookieStr = v + var err error + cookies, err = e.cookieManager.GetCookies(task.URL, v) + if err != nil { + log.Errorf("cookie manager get: %v", err) + cookies = make([][2]string, 0) + } + log.Debugf("Found cookies: %v", cookies) delete(headers, "Cookie") } @@ -95,17 +118,12 @@ func (e *PwExtractor) visitPage(task models.Task, cb func(page playwright.Page) } }() - if len(cookieStr) > 0 { - cookies, err := parseCookieString(cookieStr) - if err != nil { - return fmt.Errorf("parsing cookies: %w", err) - } - - baseDomain, err := parseBaseDomain(task.URL) - if err != nil { - return fmt.Errorf("parse base domain: %w", err) - } + baseDomain, scheme, err := parseBaseDomain(task.URL) + if err != nil { + return fmt.Errorf("parse base domain: %w", err) + } + if len(cookies) > 0 { var pwCookies []playwright.OptionalCookie for _, cook := range cookies { pwCookies = append(pwCookies, playwright.OptionalCookie{ @@ -131,7 +149,7 @@ func (e *PwExtractor) visitPage(task models.Task, cb func(page playwright.Page) errRet = fmt.Errorf("close page: %w; other error=%w", err, errRet) } }() - log.Debugf("Page opened") + log.Debugf("Page created") if len(task.Headers) > 0 { if err := page.SetExtraHTTPHeaders(task.Headers); err != nil { @@ -142,10 +160,30 @@ func (e *PwExtractor) visitPage(task models.Task, cb func(page playwright.Page) if _, err := page.Goto(task.URL, playwright.PageGotoOptions{Timeout: pwDuration("10s")}); err != nil { return fmt.Errorf("goto page: %w", err) } - log.Debugf("Url %s visited", task.URL) - defer log.Debugf("Visiting page %s finished", task.URL) + log.Debugf("Url %s visited, starting cb", task.URL) - return cb(page) + start := time.Now() + err = cb(page) + log.Debugf("Visiting page %s finished, time=%f secs, err=%v", task.URL, time.Since(start).Seconds(), err) + + if len(cookies) > 0 { + bCookies, err := bCtx.Cookies(fmt.Sprintf("%s://%s", scheme, baseDomain)) + if err != nil { + log.Errorf("browser context get cookies: %v", err) + } else { + newCookies := make([][2]string, len(bCookies)) + for i, cook := range bCookies { + newCookies[i] = [2]string{cook.Name, cook.Value} + } + log.Debugf("Updating cookies: %v", newCookies) + err = e.cookieManager.UpdateCookies(task.URL, cookieStr, newCookies) + if err != nil { + log.Errorf("cookie manager update: %v", err) + } + } + } + + return err } func (e *PwExtractor) Extract(task models.Task) (result *models.TaskResult, errRet error) { @@ -175,7 +213,7 @@ func (e *PwExtractor) Screenshot(task models.Task) (result *models.ScreenshotTas Timeout: pwDuration("5s"), }) if err != nil { - log.Debugf("Wait for network idle: %w", err) + log.Debugf("Wait for network idle: %v", err) } if err := page.SetViewportSize(1280, 800); err != nil { return fmt.Errorf("set viewport size: %w", err) diff --git a/internal/extractors/pwextractor/utils.go b/internal/extractors/pwextractor/utils.go index 071b795..7a83762 100644 --- a/internal/extractors/pwextractor/utils.go +++ b/internal/extractors/pwextractor/utils.go @@ -53,31 +53,16 @@ func parseProxy(s string) (*playwright.Proxy, error) { return proxy, nil } -func parseBaseDomain(urlStr string) (string, error) { +func parseBaseDomain(urlStr string) (domain string, scheme string, err error) { pageUrl, err := url.Parse(urlStr) if err != nil { - return "", fmt.Errorf("task url parsing: %w", err) + return "", "", fmt.Errorf("task url parsing: %w", err) } domainParts := strings.Split(pageUrl.Host, ".") slices.Reverse(domainParts) // com, example, www - return fmt.Sprintf("%s.%s", domainParts[1], domainParts[0]), nil -} - -func parseCookieString(cookieStr string) ([][2]string, error) { - var result [][2]string - - for _, cook := range strings.Split(cookieStr, ";") { - kv := strings.Split(cook, "=") - if len(kv) < 2 { - return nil, fmt.Errorf("failed to parse cookies: split by =: count<2") - } - k, err1 := url.QueryUnescape(kv[0]) - v, err2 := url.QueryUnescape(strings.Join(kv[1:], "=")) - if err1 != nil || err2 != nil { - return nil, fmt.Errorf("failed to parse cookies: unescape k=%w v=%w", err1, err2) - } - result = append(result, [2]string{strings.TrimSpace(k), strings.TrimSpace(v)}) + scheme = pageUrl.Scheme + if !slices.Contains([]string{"https", "http"}, scheme) { + return "", "", fmt.Errorf("bad scheme: %s", scheme) } - - return result, nil + return fmt.Sprintf("%s.%s", domainParts[1], domainParts[0]), scheme, nil }