import * as net from 'node:net'; import * as readline from 'node:readline'; import {type ChildProcess, spawn} from 'node:child_process'; import * as os from 'node:os'; import * as path from 'node:path'; import * as fs from 'node:fs'; import * as util from 'node:util'; import {QueuedEvent} from 'ts-events'; const IPC_SOCKET_ARG = '--ipc-socket'; enum MsgType { Call = 1, Response = 2, } type Vals = any[]; interface CallMessage { type: MsgType.Call, id: number, method: string; params: Vals; } interface ResponseMessage { type: MsgType.Response, id: number, result?: Vals; error?: string; } type Message = CallMessage | ResponseMessage; interface CallResult { result: Vals; error: Error | null; } abstract class IPCCommon { protected localApi: any; protected socketPath: string; protected conn: net.Socket | null = null; protected nextId: number = 0; protected pendingCalls: Record void> = {}; protected errors: QueuedEvent; protected constructor(localApi: any, socketPath: string) { this.localApi = localApi; this.socketPath = socketPath; this.errors = new QueuedEvent(); } protected readConn(): void { if (!this.conn) throw new Error('no connection'); const rl = readline.createInterface({ input: this.conn, crlfDelay: Infinity, }); rl.on('line', (line) => { try { const msg: Message = JSON.parse(line); this.processMsg(msg); } catch (e) { this.raiseErr(new Error(`${e}`)); } }); rl.on('error', (e) => { this.raiseErr(e); }); rl.on('close', () => { this.raiseErr(new Error('connection closed')); }); } protected processMsg(msg: Message): void { switch (msg.type) { case MsgType.Call: this.handleCall(msg); break; case MsgType.Response: this.handleResponse(msg); break; } } protected handleCall(msg: CallMessage): void { if (!this.localApi) { this.sendMsg({type: MsgType.Response, id: msg.id, error: 'remote side does not accept ipc calls'}); return; } const method = this.localApi[msg.method]; if (!method || typeof method !== 'function') { this.sendMsg({type: MsgType.Response, id: msg.id, error: `method not found: ${msg.method}`}); return; } const argsCount = method.length; if (msg.params.length !== argsCount) { this.sendMsg({ type: MsgType.Response, id: msg.id, error: `argument count mismatch: expected ${argsCount}, got ${msg.params.length}` }); return; } try { const result = method.apply(this.localApi, msg.params); if (result instanceof Promise) { result .then((res) => { this.sendMsg({type: MsgType.Response, id: msg.id, result: [res]}); }) .catch((err) => { this.sendMsg({type: MsgType.Response, id: msg.id, error: `${err}`}); }); } else { this.sendMsg({type: MsgType.Response, id: msg.id, result: [result]}); } } catch (err) { this.sendMsg({type: MsgType.Response, id: msg.id, error: `${err}`}); } } protected sendMsg(msg: Message): void { if (!this.conn) throw new Error('no connection'); try { const data = JSON.stringify(msg) + '\n'; this.conn.write(data); } catch (e) { this.raiseErr(new Error(`send response for ${msg.id}: ${e}`)); } } protected handleResponse(msg: ResponseMessage): void { const callback = this.pendingCalls[msg.id]; if (!callback) { this.raiseErr(new Error(`received response for unknown msgId: ${msg.id}`)); return; } delete this.pendingCalls[msg.id]; const err = msg.error ? new Error(`remote error: ${msg.error}`) : null; callback({ result: msg.result || [], error: err }); } protected raiseErr(err: Error): void { this.errors.post(err); } call(method: string, ...params: Vals): Promise { return new Promise((resolve, reject) => { const id = this.nextId++; this.pendingCalls[id] = (result: CallResult) => { if (result.error) { reject(result.error); } else { resolve(result.result); } }; try { this.sendMsg({ type: MsgType.Call, id, method, params, }); } catch (e) { delete this.pendingCalls[id]; reject(new Error(`send call: ${e}`)); } }); } } class ParentIPC extends IPCCommon { private readonly cmdPath: string; private readonly cmdArgs: string[]; private cmd: ChildProcess | null = null; private readonly listener: net.Server; constructor(cmdPath: string, cmdArgs: string[], localApi: any) { const socketPath = path.join(os.tmpdir(), `kitten-ipc-${process.pid}.sock`); super(localApi, socketPath); this.cmdPath = cmdPath; if (cmdArgs.includes(IPC_SOCKET_ARG)) { throw new Error(`you should not use '${IPC_SOCKET_ARG}' argument in your command`); } this.cmdArgs = cmdArgs; this.listener = net.createServer(); } async start(): Promise { try { fs.unlinkSync(this.socketPath); } catch {} await new Promise((resolve, reject) => { this.listener.listen(this.socketPath, () => { resolve(); }); this.listener.on('error', reject); }); const cmdArgs = [...this.cmdArgs, IPC_SOCKET_ARG, this.socketPath]; this.cmd = spawn(this.cmdPath, cmdArgs, {stdio: 'inherit'}); this.cmd.on('error', (err) => { this.raiseErr(err); }); this.acceptConn().catch(); } private async acceptConn(): Promise { const acceptTimeout = 10000; const acceptPromise = new Promise((resolve, reject) => { this.listener.once('connection', (conn) => { resolve(conn); }); this.listener.once('error', reject); }); try { this.conn = await timeout(acceptPromise, acceptTimeout); this.readConn(); } catch (e) { if (this.cmd) this.cmd.kill(); this.raiseErr(e as Error); } } async wait(): Promise { return new Promise((resolve, reject) => { if(!this.cmd) throw new Error('Command is not started yet'); this.cmd.addListener('close', (code, signal) => { if(signal || code) { if(signal) reject(new Error(`Process exited with signal ${signal}`)); else reject(new Error(`Process exited with code ${code}`)); } else { resolve(); } }); this.errors.attach(err => { reject(err); }) }) } } export class ChildIPC extends IPCCommon { constructor(localApi: any) { super(localApi, socketPathFromArgs()); } async start(): Promise { return new Promise((resolve, reject) => { this.conn = net.createConnection(this.socketPath, () => { this.readConn(); resolve(); }); this.conn.on('error', reject); }); } async wait(): Promise { return new Promise((resolve, reject) => { }); } } function socketPathFromArgs(): string { const {values} = util.parseArgs({options: { IPC_SOCKET_ARG: { type: 'string', } }}); if(!values.IPC_SOCKET_ARG) { throw new Error('ipc socket path is missing'); } return values.IPC_SOCKET_ARG; } function sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } // throws on timeout function timeout(prom: Promise, ms: number): Promise { return Promise.race( [prom, new Promise((res, reject) => setTimeout(reject, ms))] ) as Promise; }