kittenipc/lib/ts/lib.ts
2025-10-24 12:18:07 +03:00

311 lines
8.5 KiB
TypeScript

import * as net from 'node:net';
import * as readline from 'node:readline';
import {type ChildProcess, spawn} from 'node:child_process';
import * as os from 'node:os';
import * as path from 'node:path';
import * as fs from 'node:fs';
import * as util from 'node:util';
import {QueuedEvent} from 'ts-events';
const IPC_SOCKET_ARG = '--ipc-socket';
enum MsgType {
Call = 1,
Response = 2,
}
type Vals = any[];
interface CallMessage {
type: MsgType.Call,
id: number,
method: string;
params: Vals;
}
interface ResponseMessage {
type: MsgType.Response,
id: number,
result?: Vals;
error?: string;
}
type Message = CallMessage | ResponseMessage;
interface CallResult {
result: Vals;
error: Error | null;
}
abstract class IPCCommon {
protected localApi: any;
protected socketPath: string;
protected conn: net.Socket | null = null;
protected nextId: number = 0;
protected pendingCalls: Record<number, (result: CallResult) => void> = {};
protected errors: QueuedEvent<Error>;
protected constructor(localApi: any, socketPath: string) {
this.localApi = localApi;
this.socketPath = socketPath;
this.errors = new QueuedEvent();
}
protected readConn(): void {
if (!this.conn) throw new Error('no connection');
const rl = readline.createInterface({
input: this.conn,
crlfDelay: Infinity,
});
rl.on('line', (line) => {
try {
const msg: Message = JSON.parse(line);
this.processMsg(msg);
} catch (e) {
this.raiseErr(new Error(`${e}`));
}
});
rl.on('error', (e) => {
this.raiseErr(e);
});
rl.on('close', () => {
this.raiseErr(new Error('connection closed'));
});
}
protected processMsg(msg: Message): void {
switch (msg.type) {
case MsgType.Call:
this.handleCall(msg);
break;
case MsgType.Response:
this.handleResponse(msg);
break;
}
}
protected handleCall(msg: CallMessage): void {
if (!this.localApi) {
this.sendMsg({type: MsgType.Response, id: msg.id, error: 'remote side does not accept ipc calls'});
return;
}
const method = this.localApi[msg.method];
if (!method || typeof method !== 'function') {
this.sendMsg({type: MsgType.Response, id: msg.id, error: `method not found: ${msg.method}`});
return;
}
const argsCount = method.length;
if (msg.params.length !== argsCount) {
this.sendMsg({
type: MsgType.Response,
id: msg.id,
error: `argument count mismatch: expected ${argsCount}, got ${msg.params.length}`
});
return;
}
try {
const result = method.apply(this.localApi, msg.params);
if (result instanceof Promise) {
result
.then((res) => {
this.sendMsg({type: MsgType.Response, id: msg.id, result: [res]});
})
.catch((err) => {
this.sendMsg({type: MsgType.Response, id: msg.id, error: `${err}`});
});
} else {
this.sendMsg({type: MsgType.Response, id: msg.id, result: [result]});
}
} catch (err) {
this.sendMsg({type: MsgType.Response, id: msg.id, error: `${err}`});
}
}
protected sendMsg(msg: Message): void {
if (!this.conn) throw new Error('no connection');
try {
const data = JSON.stringify(msg) + '\n';
this.conn.write(data);
} catch (e) {
this.raiseErr(new Error(`send response for ${msg.id}: ${e}`));
}
}
protected handleResponse(msg: ResponseMessage): void {
const callback = this.pendingCalls[msg.id];
if (!callback) {
this.raiseErr(new Error(`received response for unknown msgId: ${msg.id}`));
return;
}
delete this.pendingCalls[msg.id];
const err = msg.error ? new Error(`remote error: ${msg.error}`) : null;
callback({ result: msg.result || [], error: err });
}
protected raiseErr(err: Error): void {
this.errors.post(err);
}
call(method: string, ...params: Vals): Promise<Vals> {
return new Promise((resolve, reject) => {
const id = this.nextId++;
this.pendingCalls[id] = (result: CallResult) => {
if (result.error) {
reject(result.error);
} else {
resolve(result.result);
}
};
try {
this.sendMsg({type: MsgType.Call, id, method, params});
} catch (e) {
delete this.pendingCalls[id];
reject(new Error(`send call: ${e}`));
}
});
}
}
class ParentIPC extends IPCCommon {
private readonly cmdPath: string;
private readonly cmdArgs: string[];
private cmd: ChildProcess | null = null;
private readonly listener: net.Server;
constructor(cmdPath: string, cmdArgs: string[], localApi: any) {
const socketPath = path.join(os.tmpdir(), `kitten-ipc-${process.pid}.sock`);
super(localApi, socketPath);
this.cmdPath = cmdPath;
if (cmdArgs.includes(IPC_SOCKET_ARG)) {
throw new Error(`you should not use '${IPC_SOCKET_ARG}' argument in your command`);
}
this.cmdArgs = cmdArgs;
this.listener = net.createServer();
}
async start(): Promise<void> {
try {
fs.unlinkSync(this.socketPath);
} catch {}
await new Promise<void>((resolve, reject) => {
this.listener.listen(this.socketPath, () => {
resolve();
});
this.listener.on('error', reject);
});
const cmdArgs = [...this.cmdArgs, IPC_SOCKET_ARG, this.socketPath];
this.cmd = spawn(this.cmdPath, cmdArgs, {stdio: 'inherit'});
this.cmd.on('error', (err) => {
this.raiseErr(err);
});
this.acceptConn().catch();
}
private async acceptConn(): Promise<void> {
const acceptTimeout = 10000;
const acceptPromise = new Promise<net.Socket>((resolve, reject) => {
this.listener.once('connection', (conn) => {
resolve(conn);
});
this.listener.once('error', reject);
});
try {
this.conn = await timeout(acceptPromise, acceptTimeout);
this.readConn();
} catch (e) {
if (this.cmd) this.cmd.kill();
this.raiseErr(e as Error);
}
}
async wait(): Promise<void> {
return new Promise((resolve, reject) => {
if(!this.cmd) throw new Error('Command is not started yet');
this.cmd.addListener('close', (code, signal) => {
if(signal || code) {
if(signal) reject(new Error(`Process exited with signal ${signal}`));
else reject(new Error(`Process exited with code ${code}`));
} else {
resolve();
}
});
this.errors.attach(err => {
reject(err);
})
})
}
}
export class ChildIPC extends IPCCommon {
constructor(localApi: any) {
super(localApi, socketPathFromArgs());
}
async start(): Promise<void> {
return new Promise((resolve, reject) => {
this.conn = net.createConnection(this.socketPath, () => {
this.readConn();
resolve();
});
this.conn.on('error', reject);
});
}
async wait(): Promise<void> {
return new Promise((resolve, reject) => {
});
}
}
function socketPathFromArgs(): string {
const {values} = util.parseArgs({options: {
IPC_SOCKET_ARG: {
type: 'string',
}
}});
if(!values.IPC_SOCKET_ARG) {
throw new Error('ipc socket path is missing');
}
return values.IPC_SOCKET_ARG;
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// throws on timeout
function timeout<T>(prom: Promise<T>, ms: number): Promise<T> {
return Promise.race(
[prom, new Promise((res, reject) => setTimeout(reject, ms))]
) as Promise<T>;
}