multithread

This commit is contained in:
Egor Aristov 2025-03-23 13:11:21 +03:00
parent 31bbc97f9b
commit c8c7affdd5
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F
2 changed files with 36 additions and 18 deletions

View File

@ -18,7 +18,7 @@ import (
) )
func main() { func main() {
log.SetLevel(log.DEBUG) //log.SetLevel(log.DEBUG)
log.SetHeader(`${time_rfc3339_nano} ${level}`) log.SetHeader(`${time_rfc3339_nano} ${level}`)
outFile := flag.String("o", "", "Output file name") outFile := flag.String("o", "", "Output file name")

View File

@ -7,6 +7,7 @@ import (
"github.com/egor3f/rssalchemy/internal/models" "github.com/egor3f/rssalchemy/internal/models"
"github.com/labstack/gommon/log" "github.com/labstack/gommon/log"
"github.com/playwright-community/playwright-go" "github.com/playwright-community/playwright-go"
"golang.org/x/sync/errgroup"
) )
// Timeouts // Timeouts
@ -18,12 +19,10 @@ type pageParser struct {
task models.Task task models.Task
page playwright.Page page playwright.Page
dateParser DateParser dateParser DateParser
// next fields only for debugging. Shit code, to do better later
postIdx int
fieldIdx int
} }
const MAX_CONCURRENT_POSTS = 1 // todo: config
func (p *pageParser) parse() (*models.TaskResult, error) { func (p *pageParser) parse() (*models.TaskResult, error) {
var result models.TaskResult var result models.TaskResult
var err error var err error
@ -52,18 +51,39 @@ func (p *pageParser) parse() (*models.TaskResult, error) {
} }
log.Debugf("Posts count=%d", len(posts)) log.Debugf("Posts count=%d", len(posts))
eg := errgroup.Group{}
eg.SetLimit(MAX_CONCURRENT_POSTS)
itemsChan := make(chan models.FeedItem)
go func() {
for item := range itemsChan {
result.Items = append(result.Items, item)
}
}()
for _, post := range posts { for _, post := range posts {
// todo: post order
eg.Go(func() (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("recovered panic: %v", e)
}
}()
item, err := p.extractPost(post) item, err := p.extractPost(post)
if err != nil { if err != nil {
log.Errorf("extract post fields: %v", err) log.Errorf("extract post fields: %v", err)
continue
} }
if len(item.Title) == 0 || len(item.Link) == 0 || item.Created.IsZero() { if len(item.Title) == 0 || len(item.Link) == 0 || item.Created.IsZero() {
log.Warnf("post has no required fields, skip") log.Warnf("post has no required fields, skip")
continue
} }
result.Items = append(result.Items, item) itemsChan <- item
return nil
})
} }
if err := eg.Wait(); err != nil {
return nil, fmt.Errorf("extract posts: %w", err)
}
if len(result.Items) == 0 { if len(result.Items) == 0 {
return nil, fmt.Errorf("extract failed for all posts") return nil, fmt.Errorf("extract failed for all posts")
} }
@ -88,8 +108,6 @@ func (p *pageParser) waitFullLoad() {
} }
func (p *pageParser) extractPost(post playwright.Locator) (models.FeedItem, error) { func (p *pageParser) extractPost(post playwright.Locator) (models.FeedItem, error) {
p.fieldIdx = 0
p.postIdx++
var item models.FeedItem var item models.FeedItem
item.Title = newLocator(post, p.task.SelectorTitle).First().InnerText() item.Title = newLocator(post, p.task.SelectorTitle).First().InnerText()
@ -170,7 +188,7 @@ func (l *locator) checkVisible() bool {
return false return false
} }
if !visible { if !visible {
log.Warnf("locator %s is not visible", l) log.Debugf("locator %s is not visible", l)
} }
return visible return visible
} }