debug msg
This commit is contained in:
parent
c374bb9784
commit
96d4a5015c
@ -47,7 +47,7 @@ func main() {
|
|||||||
|
|
||||||
cmd := exec.Command("node", path.Join(cwd, "ts/dist/index.js"))
|
cmd := exec.Command("node", path.Join(cwd, "ts/dist/index.js"))
|
||||||
|
|
||||||
ipc, err := kittenipc.NewParent(cmd, &localApi)
|
ipc, err := kittenipc.NewParent(cmd, nil, &localApi)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic(err)
|
log.Panic(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -28,7 +28,7 @@ class TsIpcApi {
|
|||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
const localApi = new TsIpcApi();
|
const localApi = new TsIpcApi();
|
||||||
const ipc = new ChildIPC(localApi);
|
const ipc = new ChildIPC(undefined, localApi);
|
||||||
const remoteApi = new GoIpcApi(ipc);
|
const remoteApi = new GoIpcApi(ipc);
|
||||||
|
|
||||||
await ipc.start();
|
await ipc.start();
|
||||||
|
|||||||
@ -11,13 +11,17 @@ type ChildIPC struct {
|
|||||||
*ipcCommon
|
*ipcCommon
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChild(localApis ...any) (*ChildIPC, error) {
|
func NewChild(opts *Options, localApis ...any) (*ChildIPC, error) {
|
||||||
|
if opts == nil {
|
||||||
|
opts = &Options{}
|
||||||
|
}
|
||||||
c := ChildIPC{
|
c := ChildIPC{
|
||||||
ipcCommon: &ipcCommon{
|
ipcCommon: &ipcCommon{
|
||||||
localApis: mapTypeNames(localApis),
|
localApis: mapTypeNames(localApis),
|
||||||
pendingCalls: make(map[int64]*pendingCall),
|
pendingCalls: make(map[int64]*pendingCall),
|
||||||
errCh: make(chan error, 1),
|
errCh: make(chan error, 1),
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
|
debugMessages: opts.DebugMessages,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
@ -26,6 +27,10 @@ type pendingCall struct {
|
|||||||
resultChan chan callResult
|
resultChan chan callResult
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
DebugMessages bool
|
||||||
|
}
|
||||||
|
|
||||||
type ipcCommon struct {
|
type ipcCommon struct {
|
||||||
localApis map[string]any
|
localApis map[string]any
|
||||||
socketPath string
|
socketPath string
|
||||||
@ -38,6 +43,7 @@ type ipcCommon struct {
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
writeMu sync.Mutex
|
writeMu sync.Mutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
debugMessages bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ipc *ipcCommon) readConn() {
|
func (ipc *ipcCommon) readConn() {
|
||||||
@ -46,6 +52,9 @@ func (ipc *ipcCommon) readConn() {
|
|||||||
for scn.Scan() {
|
for scn.Scan() {
|
||||||
var msg Message
|
var msg Message
|
||||||
msgBytes := scn.Bytes()
|
msgBytes := scn.Bytes()
|
||||||
|
if ipc.debugMessages {
|
||||||
|
log.Printf("[ipc recv] %s", string(msgBytes))
|
||||||
|
}
|
||||||
if err := json.Unmarshal(msgBytes, &msg); err != nil {
|
if err := json.Unmarshal(msgBytes, &msg); err != nil {
|
||||||
ipc.raiseErr(fmt.Errorf("unmarshal message: %w", err))
|
ipc.raiseErr(fmt.Errorf("unmarshal message: %w", err))
|
||||||
break
|
break
|
||||||
@ -71,6 +80,9 @@ func (ipc *ipcCommon) sendMsg(msg Message) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("marshal message: %w", err)
|
return fmt.Errorf("marshal message: %w", err)
|
||||||
}
|
}
|
||||||
|
if ipc.debugMessages {
|
||||||
|
log.Printf("[ipc send] %s", string(data))
|
||||||
|
}
|
||||||
data = append(data, '\n')
|
data = append(data, '\n')
|
||||||
|
|
||||||
ipc.writeMu.Lock()
|
ipc.writeMu.Lock()
|
||||||
|
|||||||
@ -11,27 +11,27 @@ import (
|
|||||||
func TestNewParent(t *testing.T) {
|
func TestNewParent(t *testing.T) {
|
||||||
t.Run("socket argument in command", func(t *testing.T) {
|
t.Run("socket argument in command", func(t *testing.T) {
|
||||||
cmd := exec.Command("/bin/sh", ipcSocketArg, "/tmp/kek")
|
cmd := exec.Command("/bin/sh", ipcSocketArg, "/tmp/kek")
|
||||||
_, err := NewParent(cmd)
|
_, err := NewParent(cmd, nil)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("nonexistent binary", func(t *testing.T) {
|
t.Run("nonexistent binary", func(t *testing.T) {
|
||||||
cmd := exec.Command("/nonexistent/binary")
|
cmd := exec.Command("/nonexistent/binary")
|
||||||
p, err := NewParent(cmd)
|
p, err := NewParent(cmd, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Error(t, p.Start())
|
assert.Error(t, p.Start())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("connection timeout", func(t *testing.T) {
|
t.Run("connection timeout", func(t *testing.T) {
|
||||||
cmd := exec.Command("../testdata/sleep15.sh")
|
cmd := exec.Command("../testdata/sleep15.sh")
|
||||||
p, err := NewParent(cmd)
|
p, err := NewParent(cmd, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Error(t, p.Start())
|
assert.Error(t, p.Start())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("child finished before accepting connection", func(t *testing.T) {
|
t.Run("child finished before accepting connection", func(t *testing.T) {
|
||||||
cmd := exec.Command("../testdata/sleep3.sh")
|
cmd := exec.Command("../testdata/sleep3.sh")
|
||||||
p, err := NewParent(cmd)
|
p, err := NewParent(cmd, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
assert.Error(t, p.Start())
|
assert.Error(t, p.Start())
|
||||||
|
|||||||
@ -22,18 +22,22 @@ type ParentIPC struct {
|
|||||||
cmdErr error
|
cmdErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewParent(cmd *exec.Cmd, localApis ...any) (*ParentIPC, error) {
|
func NewParent(cmd *exec.Cmd, opts *Options, localApis ...any) (*ParentIPC, error) {
|
||||||
return NewParentWithContext(context.Background(), cmd, localApis...)
|
return NewParentWithContext(context.Background(), cmd, opts, localApis...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewParentWithContext(ctx context.Context, cmd *exec.Cmd, localApis ...any) (*ParentIPC, error) {
|
func NewParentWithContext(ctx context.Context, cmd *exec.Cmd, opts *Options, localApis ...any) (*ParentIPC, error) {
|
||||||
|
if opts == nil {
|
||||||
|
opts = &Options{}
|
||||||
|
}
|
||||||
p := ParentIPC{
|
p := ParentIPC{
|
||||||
ipcCommon: &ipcCommon{
|
ipcCommon: &ipcCommon{
|
||||||
localApis: mapTypeNames(localApis),
|
localApis: mapTypeNames(localApis),
|
||||||
pendingCalls: make(map[int64]*pendingCall),
|
pendingCalls: make(map[int64]*pendingCall),
|
||||||
errCh: make(chan error, 1),
|
errCh: make(chan error, 1),
|
||||||
socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d-%d.sock", os.Getpid(), rand.Int63())),
|
socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d-%d.sock", os.Getpid(), rand.Int63())),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
debugMessages: opts.DebugMessages,
|
||||||
},
|
},
|
||||||
cmd: cmd,
|
cmd: cmd,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,10 +1,10 @@
|
|||||||
import * as net from 'node:net';
|
import * as net from 'node:net';
|
||||||
import {IPCCommon} from './common.js';
|
import {IPCCommon, type IPCOptions} from './common.js';
|
||||||
import {socketPathFromArgs} from './util.js';
|
import {socketPathFromArgs} from './util.js';
|
||||||
|
|
||||||
export class ChildIPC extends IPCCommon {
|
export class ChildIPC extends IPCCommon {
|
||||||
constructor(...localApis: object[]) {
|
constructor(opts?: IPCOptions, ...localApis: object[]) {
|
||||||
super(localApis, socketPathFromArgs());
|
super(localApis, socketPathFromArgs(), opts);
|
||||||
}
|
}
|
||||||
|
|
||||||
async start(): Promise<void> {
|
async start(): Promise<void> {
|
||||||
|
|||||||
@ -4,6 +4,10 @@ import {AsyncQueue} from './asyncqueue.js';
|
|||||||
import type {CallMessage, CallResult, Message, ResponseMessage, Vals} from './protocol.js';
|
import type {CallMessage, CallResult, Message, ResponseMessage, Vals} from './protocol.js';
|
||||||
import {MsgType} from './protocol.js';
|
import {MsgType} from './protocol.js';
|
||||||
|
|
||||||
|
export interface IPCOptions {
|
||||||
|
debugMessages?: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
export abstract class IPCCommon {
|
export abstract class IPCCommon {
|
||||||
protected localApis: Record<string, any>;
|
protected localApis: Record<string, any>;
|
||||||
protected socketPath: string;
|
protected socketPath: string;
|
||||||
@ -13,12 +17,14 @@ export abstract class IPCCommon {
|
|||||||
protected stopRequested: boolean = false;
|
protected stopRequested: boolean = false;
|
||||||
protected processingCalls: number = 0;
|
protected processingCalls: number = 0;
|
||||||
protected ready = false;
|
protected ready = false;
|
||||||
|
protected debugMessages: boolean;
|
||||||
|
|
||||||
protected errorQueue = new AsyncQueue<Error>();
|
protected errorQueue = new AsyncQueue<Error>();
|
||||||
protected onClose?: () => void;
|
protected onClose?: () => void;
|
||||||
|
|
||||||
protected constructor(localApis: object[], socketPath: string) {
|
protected constructor(localApis: object[], socketPath: string, opts?: IPCOptions) {
|
||||||
this.socketPath = socketPath;
|
this.socketPath = socketPath;
|
||||||
|
this.debugMessages = opts?.debugMessages ?? false;
|
||||||
|
|
||||||
this.localApis = {};
|
this.localApis = {};
|
||||||
for (const localApi of localApis) {
|
for (const localApi of localApis) {
|
||||||
@ -47,6 +53,9 @@ export abstract class IPCCommon {
|
|||||||
|
|
||||||
rl.on('line', (line) => {
|
rl.on('line', (line) => {
|
||||||
try {
|
try {
|
||||||
|
if (this.debugMessages) {
|
||||||
|
console.log(`[ipc recv] ${line}`);
|
||||||
|
}
|
||||||
const msg: Message = JSON.parse(line);
|
const msg: Message = JSON.parse(line);
|
||||||
this.processMsg(msg);
|
this.processMsg(msg);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -73,6 +82,9 @@ export abstract class IPCCommon {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
const data = JSON.stringify(msg) + '\n';
|
const data = JSON.stringify(msg) + '\n';
|
||||||
|
if (this.debugMessages) {
|
||||||
|
console.log(`[ipc send] ${JSON.stringify(msg)}`);
|
||||||
|
}
|
||||||
this.conn.write(data);
|
this.conn.write(data);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
this.raiseErr(new Error(`send response for ${ msg.id }: ${ e }`));
|
this.raiseErr(new Error(`send response for ${ msg.id }: ${ e }`));
|
||||||
|
|||||||
@ -1,2 +1,3 @@
|
|||||||
export {ParentIPC} from './parent.js';
|
export {ParentIPC} from './parent.js';
|
||||||
export {ChildIPC} from './child.js';
|
export {ChildIPC} from './child.js';
|
||||||
|
export type {IPCOptions} from './common.js';
|
||||||
|
|||||||
@ -4,7 +4,7 @@ import * as path from 'node:path';
|
|||||||
import * as fs from 'node:fs';
|
import * as fs from 'node:fs';
|
||||||
import * as crypto from 'node:crypto';
|
import * as crypto from 'node:crypto';
|
||||||
import {type ChildProcess, spawn} from 'node:child_process';
|
import {type ChildProcess, spawn} from 'node:child_process';
|
||||||
import {IPCCommon} from './common.js';
|
import {IPCCommon, type IPCOptions} from './common.js';
|
||||||
import {timeout} from './util.js';
|
import {timeout} from './util.js';
|
||||||
|
|
||||||
const IPC_SOCKET_ARG = 'ipc-socket';
|
const IPC_SOCKET_ARG = 'ipc-socket';
|
||||||
@ -18,9 +18,9 @@ export class ParentIPC extends IPCCommon {
|
|||||||
private cmdExitResult: { code: number | null, signal: string | null } | null = null;
|
private cmdExitResult: { code: number | null, signal: string | null } | null = null;
|
||||||
private cmdExitCallbacks: ((result: { code: number | null, signal: string | null }) => void)[] = [];
|
private cmdExitCallbacks: ((result: { code: number | null, signal: string | null }) => void)[] = [];
|
||||||
|
|
||||||
constructor(cmdPath: string, cmdArgs: string[], ...localApis: object[]) {
|
constructor(cmdPath: string, cmdArgs: string[], opts?: IPCOptions, ...localApis: object[]) {
|
||||||
const socketPath = path.join(os.tmpdir(), `kitten-ipc-${ process.pid }-${ crypto.randomInt(2**48 - 1) }.sock`);
|
const socketPath = path.join(os.tmpdir(), `kitten-ipc-${ process.pid }-${ crypto.randomInt(2**48 - 1) }.sock`);
|
||||||
super(localApis, socketPath);
|
super(localApis, socketPath, opts);
|
||||||
|
|
||||||
this.cmdPath = cmdPath;
|
this.cmdPath = cmdPath;
|
||||||
if (cmdArgs.includes(`--${ IPC_SOCKET_ARG }`)) {
|
if (cmdArgs.includes(`--${ IPC_SOCKET_ARG }`)) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user