diff --git a/cmd/extractor/extractor.go b/cmd/extractor/extractor.go index 9b9ffca..fbf2573 100644 --- a/cmd/extractor/extractor.go +++ b/cmd/extractor/extractor.go @@ -18,7 +18,7 @@ import ( ) func main() { - log.SetLevel(log.DEBUG) + //log.SetLevel(log.DEBUG) log.SetHeader(`${time_rfc3339_nano} ${level}`) outFile := flag.String("o", "", "Output file name") diff --git a/internal/extractors/pwextractor/pageparser.go b/internal/extractors/pwextractor/pageparser.go index ef1a99e..e3d2e89 100644 --- a/internal/extractors/pwextractor/pageparser.go +++ b/internal/extractors/pwextractor/pageparser.go @@ -7,6 +7,7 @@ import ( "github.com/egor3f/rssalchemy/internal/models" "github.com/labstack/gommon/log" "github.com/playwright-community/playwright-go" + "golang.org/x/sync/errgroup" ) // Timeouts @@ -18,12 +19,10 @@ type pageParser struct { task models.Task page playwright.Page dateParser DateParser - - // next fields only for debugging. Shit code, to do better later - postIdx int - fieldIdx int } +const MAX_CONCURRENT_POSTS = 1 // todo: config + func (p *pageParser) parse() (*models.TaskResult, error) { var result models.TaskResult var err error @@ -52,18 +51,39 @@ func (p *pageParser) parse() (*models.TaskResult, error) { } log.Debugf("Posts count=%d", len(posts)) + eg := errgroup.Group{} + eg.SetLimit(MAX_CONCURRENT_POSTS) + itemsChan := make(chan models.FeedItem) + + go func() { + for item := range itemsChan { + result.Items = append(result.Items, item) + } + }() + for _, post := range posts { - item, err := p.extractPost(post) - if err != nil { - log.Errorf("extract post fields: %v", err) - continue - } - if len(item.Title) == 0 || len(item.Link) == 0 || item.Created.IsZero() { - log.Warnf("post has no required fields, skip") - continue - } - result.Items = append(result.Items, item) + // todo: post order + eg.Go(func() (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("recovered panic: %v", e) + } + }() + item, err := p.extractPost(post) + if err != nil { + log.Errorf("extract post fields: %v", err) + } + if len(item.Title) == 0 || len(item.Link) == 0 || item.Created.IsZero() { + log.Warnf("post has no required fields, skip") + } + itemsChan <- item + return nil + }) } + if err := eg.Wait(); err != nil { + return nil, fmt.Errorf("extract posts: %w", err) + } + if len(result.Items) == 0 { return nil, fmt.Errorf("extract failed for all posts") } @@ -88,8 +108,6 @@ func (p *pageParser) waitFullLoad() { } func (p *pageParser) extractPost(post playwright.Locator) (models.FeedItem, error) { - p.fieldIdx = 0 - p.postIdx++ var item models.FeedItem item.Title = newLocator(post, p.task.SelectorTitle).First().InnerText() @@ -170,7 +188,7 @@ func (l *locator) checkVisible() bool { return false } if !visible { - log.Warnf("locator %s is not visible", l) + log.Debugf("locator %s is not visible", l) } return visible }