This commit is contained in:
Egor Aristov 2025-10-24 12:09:55 +03:00
parent fd2a413b12
commit c926bfa00c
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F

View File

@ -73,7 +73,7 @@ func (ipc *ipcCommon) processMsg(msg Message) {
} }
} }
func (ipc *ipcCommon) writeMsg(msg Message) error { func (ipc *ipcCommon) sendMsg(msg Message) error {
data, err := json.Marshal(msg) data, err := json.Marshal(msg)
if err != nil { if err != nil {
@ -138,13 +138,12 @@ func (ipc *ipcCommon) sendResponse(id int64, result []any, err error) {
msg.Error = err.Error() msg.Error = err.Error()
} }
if err := ipc.writeMsg(msg); err != nil { if err := ipc.sendMsg(msg); err != nil {
ipc.raiseErr(fmt.Errorf("send response for id=%d: %w", id, err)) ipc.raiseErr(fmt.Errorf("send response for id=%d: %w", id, err))
} }
} }
func (ipc *ipcCommon) handleResponse(msg Message) { func (ipc *ipcCommon) handleResponse(msg Message) {
ipc.mu.Lock() ipc.mu.Lock()
ch, ok := ipc.pendingCalls[msg.Id] ch, ok := ipc.pendingCalls[msg.Id]
if ok { if ok {
@ -157,20 +156,21 @@ func (ipc *ipcCommon) handleResponse(msg Message) {
return return
} }
var err error var res mo.Result[Vals]
if msg.Error != "" { if msg.Error == "" {
err = fmt.Errorf("remote error: %s", msg.Error) res = mo.Ok[Vals](msg.Result)
} else {
res = mo.Err[Vals](fmt.Errorf("remote error: %s", msg.Error))
} }
ch <- res
ch <- callResult{result: msg.Result, err: err}
close(ch) close(ch)
} }
func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) { func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) {
ipc.mu.Lock() ipc.mu.Lock()
id := ipc.nextId id := ipc.nextId
ipc.nextId++ ipc.nextId++
resChan := make(chan callResult, 1) resChan := make(chan mo.Result[Vals], 1)
ipc.pendingCalls[id] = resChan ipc.pendingCalls[id] = resChan
ipc.mu.Unlock() ipc.mu.Unlock()
@ -181,7 +181,7 @@ func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) {
Params: params, Params: params,
} }
if err := ipc.writeMsg(msg); err != nil { if err := ipc.sendMsg(msg); err != nil {
ipc.mu.Lock() ipc.mu.Lock()
delete(ipc.pendingCalls, id) delete(ipc.pendingCalls, id)
ipc.mu.Unlock() ipc.mu.Unlock()
@ -189,7 +189,7 @@ func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) {
} }
result := <-resChan result := <-resChan
return result.result, result.err return result.Get()
} }
func (ipc *ipcCommon) raiseErr(err error) { func (ipc *ipcCommon) raiseErr(err error) {
@ -218,7 +218,7 @@ func NewParent(cmd *exec.Cmd, localApi any) (*ParentIPC, error) {
p := ParentIPC{ p := ParentIPC{
ipcCommon: &ipcCommon{ ipcCommon: &ipcCommon{
localApi: localApi, localApi: localApi,
pendingCalls: make(map[int64]chan callResult), pendingCalls: make(map[int64]chan mo.Result[Vals]),
errCh: make(chan error, 1), errCh: make(chan error, 1),
socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d.sock", os.Getpid())), socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d.sock", os.Getpid())),
}, },
@ -252,11 +252,7 @@ func (p *ParentIPC) Start() error {
return fmt.Errorf("cmd start: %w", err) return fmt.Errorf("cmd start: %w", err)
} }
if err := p.acceptConn(); err != nil { return p.acceptConn()
return fmt.Errorf("accept connection: %w", err)
}
return nil
} }
func (p *ParentIPC) acceptConn() error { func (p *ParentIPC) acceptConn() error {
@ -295,7 +291,7 @@ func (p *ParentIPC) closeSock() error {
return nil return nil
} }
func (p *ParentIPC) Wait() error { func (p *ParentIPC) Wait() (retErr error) {
waitErrCh := make(chan error, 1) waitErrCh := make(chan error, 1)
go func() { go func() {
@ -304,16 +300,16 @@ func (p *ParentIPC) Wait() error {
select { select {
case err := <-p.errCh: case err := <-p.errCh:
runtimeErr := fmt.Errorf("runtime error: %w", err) retErr = fmt.Errorf("ipc internal error: %w", err)
killErr := p.cmd.Process.Kill()
return mergeErr(runtimeErr, killErr)
case err := <-waitErrCh: case err := <-waitErrCh:
if err != nil { retErr = fmt.Errorf("cmd wait: %w", err)
return fmt.Errorf("cmd wait: %w", err)
}
} }
return nil killErr := p.cmd.Process.Kill()
retErr = mergeErr(retErr, killErr)
p.cleanup()
return
} }
type ChildIPC struct { type ChildIPC struct {
@ -324,7 +320,7 @@ func NewChild(localApi any) (*ChildIPC, error) {
c := ChildIPC{ c := ChildIPC{
ipcCommon: &ipcCommon{ ipcCommon: &ipcCommon{
localApi: localApi, localApi: localApi,
pendingCalls: make(map[int64]chan callResult), pendingCalls: make(map[int64]chan mo.Result[Vals]),
errCh: make(chan error, 1), errCh: make(chan error, 1),
}, },
} }