asyncqueue for ts

This commit is contained in:
Egor Aristov 2025-11-08 13:35:07 +03:00
parent ede31be47c
commit d32060fbe9
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F
2 changed files with 43 additions and 12 deletions

24
lib/ts/src/asyncqueue.ts Normal file
View File

@ -0,0 +1,24 @@
export class AsyncQueue<T> {
private store: T[] = [];
private collectors: ((val: T[]) => void)[] = [];
put(val: T) {
this.store.push(val);
for(const collector of this.collectors) {
collector(this.store);
}
this.collectors = [];
}
async collect(): Promise<T[]> {
if(this.store.length > 0) {
const store = this.store;
this.store = [];
return new Promise(resolve => resolve(store));
} else {
return new Promise(resolve => {
this.collectors.push(resolve);
})
}
}
}

View File

@ -5,6 +5,7 @@ import * as os from 'node:os';
import * as path from 'node:path';
import * as fs from 'node:fs';
import * as util from 'node:util';
import {AsyncQueue} from './asyncqueue.js';
const IPC_SOCKET_ARG = 'ipc-socket';
@ -44,7 +45,8 @@ abstract class IPCCommon {
protected pendingCalls: Record<number, (result: CallResult) => void> = {};
protected stopRequested: boolean = false;
protected processingCalls: number = 0;
protected onError?: (err: Error) => void;
protected errorQueue = new AsyncQueue<Error>();
protected onClose?: () => void;
protected constructor(localApis: object[], socketPath: string) {
@ -87,7 +89,7 @@ abstract class IPCCommon {
protected processMsg(msg: Message): void {
switch (msg.type) {
case MsgType.Call:
this.handleCall(msg).catch(e => this.onError?(e) : null);
this.handleCall(msg).catch(this.errorQueue.put);
break;
case MsgType.Response:
this.handleResponse(msg);
@ -196,7 +198,7 @@ abstract class IPCCommon {
}
protected raiseErr(err: Error): void {
if (this.onError) this.onError(err);
this.errorQueue.put(err);
}
}
@ -226,7 +228,6 @@ export class ParentIPC extends IPCCommon {
} catch {
}
await new Promise<void>((resolve, reject) => {
this.listener.listen(this.socketPath, () => {
resolve();
@ -264,7 +265,7 @@ export class ParentIPC extends IPCCommon {
}
async wait(): Promise<void> {
return new Promise((resolve, reject) => {
return new Promise(async (resolve, reject) => {
if (!this.cmd) throw new Error('Command is not started yet');
this.cmd.addListener('close', (code, signal) => {
if (signal || code) {
@ -274,9 +275,12 @@ export class ParentIPC extends IPCCommon {
resolve();
}
});
this.onError = (err) => {
reject(err);
};
const errors = await this.errorQueue.collect();
if(errors.length === 1) {
reject(errors[0]);
} else if(errors.length > 1) {
reject(new Error(errors.map(Error.toString).join(', ')));
}
});
}
}
@ -298,10 +302,13 @@ export class ChildIPC extends IPCCommon {
}
async wait(): Promise<void> {
return new Promise((resolve, reject) => {
this.onError = (err) => {
reject(err);
};
return new Promise(async (resolve, reject) => {
const errors = await this.errorQueue.collect();
if(errors.length === 1) {
reject(errors[0]);
} else if(errors.length > 1) {
reject(new Error(errors.map(Error.toString).join(', ')));
}
this.onClose = () => {
if (this.processingCalls === 0) {
this.conn?.destroy();