diff --git a/lib/golang/lib.go b/lib/golang/lib.go index 791f15d..ae4081c 100644 --- a/lib/golang/lib.go +++ b/lib/golang/lib.go @@ -49,13 +49,18 @@ type Callable interface { Call(method string, params ...any) (Vals, error) } +type pendingCall struct { + resultChan chan mo.Result[Vals] + resultType reflect.Type +} + type ipcCommon struct { localApis map[string]any socketPath string conn net.Conn errCh chan error nextId int64 - pendingCalls map[int64]chan mo.Result[Vals] + pendingCalls map[int64]*pendingCall processingCalls atomic.Int64 stopRequested atomic.Bool mu sync.Mutex @@ -218,7 +223,7 @@ func (ipc *ipcCommon) sendResponse(id int64, result []any, err error) { func (ipc *ipcCommon) handleResponse(msg Message) { ipc.mu.Lock() - ch, ok := ipc.pendingCalls[msg.Id] + call, ok := ipc.pendingCalls[msg.Id] if ok { delete(ipc.pendingCalls, msg.Id) } @@ -235,8 +240,8 @@ func (ipc *ipcCommon) handleResponse(msg Message) { } else { res = mo.Err[Vals](fmt.Errorf("remote error: %s", msg.Error)) } - ch <- res - close(ch) + call.resultChan <- res + close(call.resultChan) } func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) { @@ -247,8 +252,10 @@ func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) { ipc.mu.Lock() id := ipc.nextId ipc.nextId++ - resChan := make(chan mo.Result[Vals], 1) - ipc.pendingCalls[id] = resChan + call := &pendingCall{ + resultChan: make(chan mo.Result[Vals], 1), + } + ipc.pendingCalls[id] = call ipc.mu.Unlock() msg := Message{ @@ -265,7 +272,7 @@ func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) { return nil, fmt.Errorf("send call: %w", err) } - result := <-resChan + result := <-call.resultChan return result.Get() } @@ -281,7 +288,7 @@ func (ipc *ipcCommon) closeConn() { defer ipc.mu.Unlock() _ = ipc.conn.Close() for _, call := range ipc.pendingCalls { - call <- mo.Err[Vals](fmt.Errorf("call cancelled due to ipc termination")) + call.resultChan <- mo.Err[Vals](fmt.Errorf("call cancelled due to ipc termination")) } } @@ -295,7 +302,7 @@ func NewParent(cmd *exec.Cmd, localApis ...any) (*ParentIPC, error) { p := ParentIPC{ ipcCommon: &ipcCommon{ localApis: mapTypeNames(localApis), - pendingCalls: make(map[int64]chan mo.Result[Vals]), + pendingCalls: make(map[int64]*pendingCall), errCh: make(chan error, 1), socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d.sock", os.Getpid())), }, @@ -427,7 +434,7 @@ func NewChild(localApis ...any) (*ChildIPC, error) { c := ChildIPC{ ipcCommon: &ipcCommon{ localApis: mapTypeNames(localApis), - pendingCalls: make(map[int64]chan mo.Result[Vals]), + pendingCalls: make(map[int64]*pendingCall), errCh: make(chan error, 1), }, }