parent/child mode

This commit is contained in:
Egor Aristov 2025-10-23 18:15:40 +03:00
parent 2b67cd1d01
commit 640a792d74
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F

View File

@ -21,16 +21,6 @@ const ipcSocketArg = "--ipc-socket"
type StdioMode int type StdioMode int
type ipcCommon struct {
localApi any
socketPath string
conn net.Conn
errCh chan error
nextId int64
pendingCalls map[int64]chan callResult
mu sync.Mutex
}
type MsgType int type MsgType int
const ( const (
@ -52,7 +42,17 @@ type callResult struct {
err error err error
} }
func (ipc *ipcCommon) startRcvData() { type ipcCommon struct {
localApi any
socketPath string
conn net.Conn
errCh chan error
nextId int64
pendingCalls map[int64]chan callResult
mu sync.Mutex
}
func (ipc *ipcCommon) readConn() {
scn := bufio.NewScanner(ipc.conn) scn := bufio.NewScanner(ipc.conn)
for scn.Scan() { for scn.Scan() {
var msg Message var msg Message
@ -76,6 +76,22 @@ func (ipc *ipcCommon) processMsg(msg Message) {
} }
} }
func (ipc *ipcCommon) writeMsg(msg Message) error {
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("marshal message: %w", err)
}
data = append(data, '\n')
if _, err := ipc.conn.Write(data); err != nil {
return fmt.Errorf("write message: %w", err)
}
return nil
}
func (ipc *ipcCommon) handleCall(msg Message) { func (ipc *ipcCommon) handleCall(msg Message) {
if ipc.localApi == nil { if ipc.localApi == nil {
@ -114,6 +130,22 @@ func (ipc *ipcCommon) handleCall(msg Message) {
ipc.sendResponse(msg.Id, res, resErr.Interface().(error)) ipc.sendResponse(msg.Id, res, resErr.Interface().(error))
} }
func (ipc *ipcCommon) sendResponse(id int64, result []any, err error) {
msg := Message{
Type: MsgResponse,
Id: id,
Result: result,
}
if err != nil {
msg.Error = err.Error()
}
if err := ipc.writeMsg(msg); err != nil {
ipc.raiseErr(fmt.Errorf("send response for id=%d: %w", id, err))
}
}
func (ipc *ipcCommon) handleResponse(msg Message) { func (ipc *ipcCommon) handleResponse(msg Message) {
ipc.mu.Lock() ipc.mu.Lock()
@ -137,38 +169,6 @@ func (ipc *ipcCommon) handleResponse(msg Message) {
close(ch) close(ch)
} }
func (ipc *ipcCommon) sendResponse(id int64, result []any, err error) {
msg := Message{
Type: MsgResponse,
Id: id,
Result: result,
}
if err != nil {
msg.Error = err.Error()
}
if err := ipc.sendMsg(msg); err != nil {
ipc.raiseErr(fmt.Errorf("send response for id=%d: %w", id, err))
}
}
func (ipc *ipcCommon) sendMsg(msg Message) error {
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("marshal message: %w", err)
}
data = append(data, '\n')
if _, err := ipc.conn.Write(data); err != nil {
return fmt.Errorf("write message: %w", err)
}
return nil
}
func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) { func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) {
ipc.mu.Lock() ipc.mu.Lock()
id := ipc.nextId id := ipc.nextId
@ -184,7 +184,7 @@ func (ipc *ipcCommon) Call(method string, params ...any) ([]any, error) {
Params: params, Params: params,
} }
if err := ipc.sendMsg(msg); err != nil { if err := ipc.writeMsg(msg); err != nil {
ipc.mu.Lock() ipc.mu.Lock()
delete(ipc.pendingCalls, id) delete(ipc.pendingCalls, id)
ipc.mu.Unlock() ipc.mu.Unlock()
@ -277,7 +277,7 @@ func (p *ParentIPC) acceptConn() error {
return fmt.Errorf("accept: %w", res.Error()) return fmt.Errorf("accept: %w", res.Error())
} }
p.conn = res.MustGet() p.conn = res.MustGet()
p.startRcvData() p.readConn()
} }
return nil return nil
} }
@ -340,7 +340,7 @@ func (c *ChildIPC) Start() error {
return fmt.Errorf("connect to parent socket: %w", err) return fmt.Errorf("connect to parent socket: %w", err)
} }
c.conn = conn c.conn = conn
c.startRcvData() c.readConn()
return nil return nil
} }