import * as net from 'node:net'; import * as readline from 'node:readline'; import {type ChildProcess, spawn} from 'node:child_process'; import * as os from 'node:os'; import * as path from 'node:path'; import * as fs from 'node:fs'; import * as util from 'node:util'; import * as crypto from 'node:crypto'; import {AsyncQueue} from './asyncqueue.js'; const IPC_SOCKET_ARG = 'ipc-socket'; type JSONSerializable = string | number | boolean; enum MsgType { Call = 1, Response = 2, } type Vals = any[]; interface CallMessage { type: MsgType.Call, id: number, method: string; args: Vals; } interface ResponseMessage { type: MsgType.Response, id: number, result?: Vals; error?: string; } type Message = CallMessage | ResponseMessage; interface CallResult { result: Vals; error: Error | null; } abstract class IPCCommon { protected localApis: Record; protected socketPath: string; protected conn: net.Socket | null = null; protected nextId: number = 0; protected pendingCalls: Record void> = {}; protected stopRequested: boolean = false; protected processingCalls: number = 0; protected ready = false; protected errorQueue = new AsyncQueue(); protected onClose?: () => void; protected constructor(localApis: object[], socketPath: string) { this.socketPath = socketPath; this.localApis = {}; for (const localApi of localApis) { this.localApis[localApi.constructor.name] = localApi; } } protected readConn(): void { if (!this.conn) throw new Error('no connection'); const rl = readline.createInterface({ input: this.conn, crlfDelay: Infinity, }); this.conn.on('error', (e) => { this.raiseErr(e); }); this.conn.on('close', (hadError: boolean) => { if (hadError) { this.raiseErr(new Error('connection closed due to error')); } }); rl.on('line', (line) => { try { const msg: Message = JSON.parse(line); this.processMsg(msg); } catch (e) { this.raiseErr(new Error(`${ e }`)); } }); this.ready = true; } protected processMsg(msg: Message): void { switch (msg.type) { case MsgType.Call: this.handleCall(msg).catch((e) => this.errorQueue.put(e)); break; case MsgType.Response: this.handleResponse(msg); break; } } protected sendMsg(msg: Message): void { if (!this.conn) throw new Error('no connection'); try { const data = JSON.stringify(msg) + '\n'; this.conn.write(data); } catch (e) { this.raiseErr(new Error(`send response for ${ msg.id }: ${ e }`)); } } protected async handleCall(msg: CallMessage) { const [endpointName, methodName] = msg.method.split('.'); if (!endpointName || !methodName) { this.sendMsg({type: MsgType.Response, id: msg.id, error: `call malformed: ${ msg.method }`}); return; } const endpoint = this.localApis[endpointName]; if (!endpoint) { this.sendMsg({type: MsgType.Response, id: msg.id, error: `endpoint not found: ${ endpointName }`}); return; } const method: Function = endpoint[methodName]; if (!method || typeof method !== 'function') { this.sendMsg({type: MsgType.Response, id: msg.id, error: `method not found: ${ msg.method }`}); return; } const argsCount = method.length; if (msg.args.length !== argsCount) { this.sendMsg({ type: MsgType.Response, id: msg.id, error: `argument count mismatch: expected ${ argsCount }, got ${ msg.args.length }` }); return; } try { this.processingCalls++; let result = method.apply(endpoint, msg.args.map(this.deserialize)); if (result instanceof Promise) { result = await result; } result = this.serialize(result); this.sendMsg({type: MsgType.Response, id: msg.id, result: [result]}); } catch (err) { this.sendMsg({type: MsgType.Response, id: msg.id, error: `${ err }`}); } finally { this.processingCalls--; } if (this.stopRequested) { if (this.onClose) this.onClose(); } } protected handleResponse(msg: ResponseMessage): void { const callback = this.pendingCalls[msg.id]; if (!callback) { this.raiseErr(new Error(`received response for unknown msgId: ${ msg.id }`)); return; } delete this.pendingCalls[msg.id]; const err = msg.error ? new Error(`remote error: ${ msg.error }`) : null; callback({result: msg.result || [], error: err}); } call(method: string, ...args: Vals): Promise { return new Promise((resolve, reject) => { const id = this.nextId++; this.pendingCalls[id] = (result: CallResult) => { if (result.error) { reject(result.error); } else { resolve(result.result); } }; try { this.sendMsg({type: MsgType.Call, id, method, args: args.map(this.serialize)}); } catch (e) { delete this.pendingCalls[id]; reject(new Error(`send call: ${ e }`)); } }); } public serialize(arg: any): any { // noinspection FallThroughInSwitchStatementJS switch (typeof arg) { case 'string': case 'boolean': case 'number': return arg; case 'object': if(arg instanceof Buffer) { return arg.toString('base64'); } else { throw new Error(`cannot serialize ${arg}`); } default: throw new Error(`cannot serialize ${typeof arg}`); } } public deserialize(arg: any): any { // noinspection FallThroughInSwitchStatementJS switch (typeof arg) { case 'string': case 'boolean': case 'number': return arg; case 'object': const keys = Object.entries(arg).map(p => p[0]).sort(); if(keys[0] === 'd' && keys[1] === 't') { const type = arg['t']; const data = arg['d']; switch (type) { case 'blob': return Buffer.from(data, 'base64'); default: throw new Error(`custom object type ${type} is not supported`); } } else { throw new Error(`cannot deserialize object with keys ${keys}`); } default: throw new Error(`cannot deserialize ${typeof arg}`); } } stop() { if (this.stopRequested) { throw new Error('close already requested'); } if (!this.conn || this.conn.readyState === 'closed') { throw new Error('connection already closed'); } this.stopRequested = true; if (this.onClose) this.onClose(); } protected raiseErr(err: Error): void { this.errorQueue.put(err); } } export class ParentIPC extends IPCCommon { private readonly cmdPath: string; private readonly cmdArgs: string[]; private cmd: ChildProcess | null = null; private readonly listener: net.Server; private cmdExitResult: { code: number | null, signal: string | null } | null = null; private cmdExitCallbacks: ((result: { code: number | null, signal: string | null }) => void)[] = []; constructor(cmdPath: string, cmdArgs: string[], ...localApis: object[]) { const socketPath = path.join(os.tmpdir(), `kitten-ipc-${ process.pid }-${ crypto.randomInt(2**48 - 1) }.sock`); super(localApis, socketPath); this.cmdPath = cmdPath; if (cmdArgs.includes(`--${ IPC_SOCKET_ARG }`)) { throw new Error(`you should not use '--${ IPC_SOCKET_ARG }' argument in your command`); } this.cmdArgs = cmdArgs; this.listener = net.createServer(); } async start(): Promise { try { fs.unlinkSync(this.socketPath); } catch { } await new Promise((resolve, reject) => { this.listener.listen(this.socketPath, () => { resolve(); }); this.listener.on('error', reject); }); const cmdArgs = [...this.cmdArgs, `--${ IPC_SOCKET_ARG }`, this.socketPath]; this.cmd = spawn(this.cmdPath, cmdArgs, {stdio: 'inherit'}); this.cmd.on('error', (err) => { this.raiseErr(err); }); this.cmd.on('close', (code, signal) => { const result = { code, signal }; this.cmdExitResult = result; for (const cb of this.cmdExitCallbacks) cb(result); this.cmdExitCallbacks = []; }); await this.acceptConn(); } private async acceptConn(): Promise { const acceptTimeout = 10000; const acceptPromise = new Promise((resolve, reject) => { this.listener.once('connection', (conn) => { resolve(conn); }); this.listener.once('error', reject); }); const exitPromise = new Promise((_, reject) => { if (this.cmdExitResult) { reject(new Error(`command exited before connection established`)); } else { this.cmdExitCallbacks.push(() => { reject(new Error(`command exited before connection established`)); }); } }); try { this.conn = await timeout(Promise.race([acceptPromise, exitPromise]), acceptTimeout); this.readConn(); } catch (e) { if (this.cmd) this.cmd.kill(); throw e; } } async wait(): Promise { if (!this.cmd) { throw new Error('Command is not started yet'); } const exitPromise = new Promise<{ code: number | null, signal: string | null }>((resolve) => { if (this.cmdExitResult) { resolve(this.cmdExitResult); } else { this.cmdExitCallbacks.push(resolve); } }); try { await Promise.race([ exitPromise.then(({ code, signal }) => { if (signal || code) { if (signal) throw new Error(`Process exited with signal ${ signal }`); else throw new Error(`Process exited with code ${ code }`); } else if (!this.ready) { throw new Error('command exited before connection established'); } }), this.errorQueue.collect().then((errors) => { if (errors.length === 1) { throw errors[0]; } else if (errors.length > 1) { throw new Error(errors.map(e => e.toString()).join(', ')); } }), ]); } finally { try { fs.unlinkSync(this.socketPath); } catch {} } } } export class ChildIPC extends IPCCommon { constructor(...localApis: object[]) { super(localApis, socketPathFromArgs()); } async start(): Promise { return new Promise((resolve, reject) => { this.conn = net.createConnection(this.socketPath, () => { this.readConn(); resolve(); }); this.conn.on('error', reject); }); } async wait(): Promise { const closePromise = new Promise((resolve) => { this.onClose = () => { if (this.processingCalls === 0) { this.conn?.destroy(); resolve(); } }; if (this.stopRequested && this.processingCalls === 0) { this.conn?.destroy(); resolve(); } }); const errorPromise = this.errorQueue.collect().then((errors) => { if (errors.length === 1) { throw errors[0]; } else if (errors.length > 1) { throw new Error(errors.map(e => e.toString()).join(', ')); } }); await Promise.race([closePromise, errorPromise]); } } function socketPathFromArgs(): string { const {values} = util.parseArgs({ options: { [IPC_SOCKET_ARG]: { type: 'string', } } }); if (!values[IPC_SOCKET_ARG]) { throw new Error('ipc socket path is missing'); } return values[IPC_SOCKET_ARG]; } function sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } // throws on timeout function timeout(prom: Promise, ms: number): Promise { return Promise.race( [ prom, new Promise((res, reject) => { setTimeout(() => {reject(new Error('timed out'))}, ms) })] ) as Promise; }