change consumer to durable
This commit is contained in:
parent
97731f4c37
commit
6913733ad5
@ -118,7 +118,9 @@ func (na *NatsAdapter) ConsumeQueue(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
taskFunc func(taskPayload []byte) (cacheKey string, result []byte, err error),
|
taskFunc func(taskPayload []byte) (cacheKey string, result []byte, err error),
|
||||||
) error {
|
) error {
|
||||||
cons, err := na.jstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{})
|
cons, err := na.jstream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||||
|
Durable: "worker",
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("create js consumer: %w", err)
|
return fmt.Errorf("create js consumer: %w", err)
|
||||||
}
|
}
|
||||||
@ -147,7 +149,7 @@ func (na *NatsAdapter) ConsumeQueue(
|
|||||||
|
|
||||||
if taskErr != nil {
|
if taskErr != nil {
|
||||||
log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr)
|
log.Errorf("taskFunc seq=%d error, discarding task: %v", seq, taskErr)
|
||||||
if err := msg.Nak(); err != nil {
|
if err := msg.Term(); err != nil {
|
||||||
log.Errorf("nak %d: %v", seq, err)
|
log.Errorf("nak %d: %v", seq, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|||||||
@ -161,7 +161,6 @@ func (p *pageParser) parse() (*models.TaskResult, error) {
|
|||||||
func (p *pageParser) waitFullLoad() {
|
func (p *pageParser) waitFullLoad() {
|
||||||
timeout := pwDuration("5s")
|
timeout := pwDuration("5s")
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := p.page.WaitForLoadState(playwright.PageWaitForLoadStateOptions{
|
err := p.page.WaitForLoadState(playwright.PageWaitForLoadStateOptions{
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user