go pendingcall chan -> struct

This commit is contained in:
Egor Aristov 2025-12-05 22:41:26 +03:00
parent 388e36721e
commit ae7ebb8127
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F

View File

@ -49,13 +49,18 @@ type Callable interface {
Call(method string, params ...any) (Vals, error) Call(method string, params ...any) (Vals, error)
} }
type pendingCall struct {
resultChan chan mo.Result[Vals]
resultType reflect.Type
}
type ipcCommon struct { type ipcCommon struct {
localApis map[string]any localApis map[string]any
socketPath string socketPath string
conn net.Conn conn net.Conn
errCh chan error errCh chan error
nextId int64 nextId int64
pendingCalls map[int64]chan mo.Result[Vals] pendingCalls map[int64]*pendingCall
processingCalls atomic.Int64 processingCalls atomic.Int64
stopRequested atomic.Bool stopRequested atomic.Bool
mu sync.Mutex mu sync.Mutex
@ -218,7 +223,7 @@ func (ipc *ipcCommon) sendResponse(id int64, result []any, err error) {
func (ipc *ipcCommon) handleResponse(msg Message) { func (ipc *ipcCommon) handleResponse(msg Message) {
ipc.mu.Lock() ipc.mu.Lock()
ch, ok := ipc.pendingCalls[msg.Id] call, ok := ipc.pendingCalls[msg.Id]
if ok { if ok {
delete(ipc.pendingCalls, msg.Id) delete(ipc.pendingCalls, msg.Id)
} }
@ -235,8 +240,8 @@ func (ipc *ipcCommon) handleResponse(msg Message) {
} else { } else {
res = mo.Err[Vals](fmt.Errorf("remote error: %s", msg.Error)) res = mo.Err[Vals](fmt.Errorf("remote error: %s", msg.Error))
} }
ch <- res call.resultChan <- res
close(ch) close(call.resultChan)
} }
func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) { func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) {
@ -247,8 +252,10 @@ func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) {
ipc.mu.Lock() ipc.mu.Lock()
id := ipc.nextId id := ipc.nextId
ipc.nextId++ ipc.nextId++
resChan := make(chan mo.Result[Vals], 1) call := &pendingCall{
ipc.pendingCalls[id] = resChan resultChan: make(chan mo.Result[Vals], 1),
}
ipc.pendingCalls[id] = call
ipc.mu.Unlock() ipc.mu.Unlock()
msg := Message{ msg := Message{
@ -265,7 +272,7 @@ func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) {
return nil, fmt.Errorf("send call: %w", err) return nil, fmt.Errorf("send call: %w", err)
} }
result := <-resChan result := <-call.resultChan
return result.Get() return result.Get()
} }
@ -281,7 +288,7 @@ func (ipc *ipcCommon) closeConn() {
defer ipc.mu.Unlock() defer ipc.mu.Unlock()
_ = ipc.conn.Close() _ = ipc.conn.Close()
for _, call := range ipc.pendingCalls { for _, call := range ipc.pendingCalls {
call <- mo.Err[Vals](fmt.Errorf("call cancelled due to ipc termination")) call.resultChan <- mo.Err[Vals](fmt.Errorf("call cancelled due to ipc termination"))
} }
} }
@ -295,7 +302,7 @@ func NewParent(cmd *exec.Cmd, localApis ...any) (*ParentIPC, error) {
p := ParentIPC{ p := ParentIPC{
ipcCommon: &ipcCommon{ ipcCommon: &ipcCommon{
localApis: mapTypeNames(localApis), localApis: mapTypeNames(localApis),
pendingCalls: make(map[int64]chan mo.Result[Vals]), pendingCalls: make(map[int64]*pendingCall),
errCh: make(chan error, 1), errCh: make(chan error, 1),
socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d.sock", os.Getpid())), socketPath: filepath.Join(os.TempDir(), fmt.Sprintf("kitten-ipc-%d.sock", os.Getpid())),
}, },
@ -427,7 +434,7 @@ func NewChild(localApis ...any) (*ChildIPC, error) {
c := ChildIPC{ c := ChildIPC{
ipcCommon: &ipcCommon{ ipcCommon: &ipcCommon{
localApis: mapTypeNames(localApis), localApis: mapTypeNames(localApis),
pendingCalls: make(map[int64]chan mo.Result[Vals]), pendingCalls: make(map[int64]*pendingCall),
errCh: make(chan error, 1), errCh: make(chan error, 1),
}, },
} }