diff --git a/lib/ts/src/asyncqueue.ts b/lib/ts/src/asyncqueue.ts new file mode 100644 index 0000000..fe109af --- /dev/null +++ b/lib/ts/src/asyncqueue.ts @@ -0,0 +1,24 @@ +export class AsyncQueue { + private store: T[] = []; + private collectors: ((val: T[]) => void)[] = []; + + put(val: T) { + this.store.push(val); + for(const collector of this.collectors) { + collector(this.store); + } + this.collectors = []; + } + + async collect(): Promise { + if(this.store.length > 0) { + const store = this.store; + this.store = []; + return new Promise(resolve => resolve(store)); + } else { + return new Promise(resolve => { + this.collectors.push(resolve); + }) + } + } +} diff --git a/lib/ts/src/lib.ts b/lib/ts/src/lib.ts index 695d119..6ee531d 100644 --- a/lib/ts/src/lib.ts +++ b/lib/ts/src/lib.ts @@ -5,6 +5,7 @@ import * as os from 'node:os'; import * as path from 'node:path'; import * as fs from 'node:fs'; import * as util from 'node:util'; +import {AsyncQueue} from './asyncqueue.js'; const IPC_SOCKET_ARG = 'ipc-socket'; @@ -44,7 +45,8 @@ abstract class IPCCommon { protected pendingCalls: Record void> = {}; protected stopRequested: boolean = false; protected processingCalls: number = 0; - protected onError?: (err: Error) => void; + + protected errorQueue = new AsyncQueue(); protected onClose?: () => void; protected constructor(localApis: object[], socketPath: string) { @@ -87,7 +89,7 @@ abstract class IPCCommon { protected processMsg(msg: Message): void { switch (msg.type) { case MsgType.Call: - this.handleCall(msg).catch(e => this.onError?(e) : null); + this.handleCall(msg).catch(this.errorQueue.put); break; case MsgType.Response: this.handleResponse(msg); @@ -196,7 +198,7 @@ abstract class IPCCommon { } protected raiseErr(err: Error): void { - if (this.onError) this.onError(err); + this.errorQueue.put(err); } } @@ -226,7 +228,6 @@ export class ParentIPC extends IPCCommon { } catch { } - await new Promise((resolve, reject) => { this.listener.listen(this.socketPath, () => { resolve(); @@ -264,7 +265,7 @@ export class ParentIPC extends IPCCommon { } async wait(): Promise { - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { if (!this.cmd) throw new Error('Command is not started yet'); this.cmd.addListener('close', (code, signal) => { if (signal || code) { @@ -274,9 +275,12 @@ export class ParentIPC extends IPCCommon { resolve(); } }); - this.onError = (err) => { - reject(err); - }; + const errors = await this.errorQueue.collect(); + if(errors.length === 1) { + reject(errors[0]); + } else if(errors.length > 1) { + reject(new Error(errors.map(Error.toString).join(', '))); + } }); } } @@ -298,10 +302,13 @@ export class ChildIPC extends IPCCommon { } async wait(): Promise { - return new Promise((resolve, reject) => { - this.onError = (err) => { - reject(err); - }; + return new Promise(async (resolve, reject) => { + const errors = await this.errorQueue.collect(); + if(errors.length === 1) { + reject(errors[0]); + } else if(errors.length > 1) { + reject(new Error(errors.map(Error.toString).join(', '))); + } this.onClose = () => { if (this.processingCalls === 0) { this.conn?.destroy();