diff --git a/lib/golang/lib.go b/lib/golang/lib.go index 41d6282..e6a0b93 100644 --- a/lib/golang/lib.go +++ b/lib/golang/lib.go @@ -73,7 +73,7 @@ func (ipc *ipcCommon) processMsg(msg Message) { } } -func (ipc *ipcCommon) writeMsg(msg Message) error { +func (ipc *ipcCommon) sendMsg(msg Message) error { data, err := json.Marshal(msg) if err != nil { @@ -138,13 +138,12 @@ func (ipc *ipcCommon) sendResponse(id int64, result []any, err error) { msg.Error = err.Error() } - if err := ipc.writeMsg(msg); err != nil { + if err := ipc.sendMsg(msg); err != nil { ipc.raiseErr(fmt.Errorf("send response for id=%d: %w", id, err)) } } func (ipc *ipcCommon) handleResponse(msg Message) { - ipc.mu.Lock() ch, ok := ipc.pendingCalls[msg.Id] if ok { @@ -157,20 +156,21 @@ func (ipc *ipcCommon) handleResponse(msg Message) { return } - var err error - if msg.Error != "" { - err = fmt.Errorf("remote error: %s", msg.Error) + var res mo.Result[Vals] + if msg.Error == "" { + res = mo.Ok[Vals](msg.Result) + } else { + res = mo.Err[Vals](fmt.Errorf("remote error: %s", msg.Error)) } - - ch <- callResult{result: msg.Result, err: err} + ch <- res close(ch) } -func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) { +func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) { ipc.mu.Lock() id := ipc.nextId ipc.nextId++ - resChan := make(chan callResult, 1) + resChan := make(chan mo.Result[Vals], 1) ipc.pendingCalls[id] = resChan ipc.mu.Unlock() @@ -181,7 +181,7 @@ func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) { Params: params, } - if err := ipc.writeMsg(msg); err != nil { + if err := ipc.sendMsg(msg); err != nil { ipc.mu.Lock() delete(ipc.pendingCalls, id) ipc.mu.Unlock() @@ -189,7 +189,7 @@ func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) { } result := <-resChan - return result.result, result.err + return result.Get() } func (ipc *ipcCommon) raiseErr(err error) { @@ -218,7 +218,7 @@ func NewParent(cmd *exec.Cmd, localApi any) (*ParentIPC, error) { p := ParentIPC{ ipcCommon: &ipcCommon{ localApi: localApi, - pendingCalls: make(map[int64]chan callResult), + pendingCalls: make(map[int64]chan mo.Result[Vals]), errCh: make(chan error, 1), socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d.sock", os.Getpid())), }, @@ -252,11 +252,7 @@ func (p *ParentIPC) Start() error { return fmt.Errorf("cmd start: %w", err) } - if err := p.acceptConn(); err != nil { - return fmt.Errorf("accept connection: %w", err) - } - - return nil + return p.acceptConn() } func (p *ParentIPC) acceptConn() error { @@ -295,7 +291,7 @@ func (p *ParentIPC) closeSock() error { return nil } -func (p *ParentIPC) Wait() error { +func (p *ParentIPC) Wait() (retErr error) { waitErrCh := make(chan error, 1) go func() { @@ -304,16 +300,16 @@ func (p *ParentIPC) Wait() error { select { case err := <-p.errCh: - runtimeErr := fmt.Errorf("runtime error: %w", err) - killErr := p.cmd.Process.Kill() - return mergeErr(runtimeErr, killErr) + retErr = fmt.Errorf("ipc internal error: %w", err) case err := <-waitErrCh: - if err != nil { - return fmt.Errorf("cmd wait: %w", err) - } + retErr = fmt.Errorf("cmd wait: %w", err) } - return nil + killErr := p.cmd.Process.Kill() + retErr = mergeErr(retErr, killErr) + p.cleanup() + + return } type ChildIPC struct { @@ -324,7 +320,7 @@ func NewChild(localApi any) (*ChildIPC, error) { c := ChildIPC{ ipcCommon: &ipcCommon{ localApi: localApi, - pendingCalls: make(map[int64]chan callResult), + pendingCalls: make(map[int64]chan mo.Result[Vals]), errCh: make(chan error, 1), }, }