more correct open/close code

This commit is contained in:
Egor Aristov 2025-10-27 13:36:03 +03:00
parent 214ec97078
commit dfd0bf381f
Signed by: egor3f
GPG Key ID: 40482A264AAEC85F
5 changed files with 56 additions and 17 deletions

View File

@ -3,6 +3,7 @@ package kittenipc
import (
"bufio"
"encoding/json"
"errors"
"flag"
"fmt"
"net"
@ -12,6 +13,8 @@ import (
"reflect"
"slices"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/samber/mo"
@ -44,13 +47,15 @@ type Callable interface {
}
type ipcCommon struct {
localApi any
socketPath string
conn net.Conn
errCh chan error
nextId int64
pendingCalls map[int64]chan mo.Result[Vals]
mu sync.Mutex
localApi any
socketPath string
conn net.Conn
errCh chan error
nextId int64
pendingCalls map[int64]chan mo.Result[Vals]
processingCalls atomic.Int64
stopRequested atomic.Bool
mu sync.Mutex
}
func (ipc *ipcCommon) readConn() {
@ -94,6 +99,12 @@ func (ipc *ipcCommon) sendMsg(msg Message) error {
}
func (ipc *ipcCommon) handleCall(msg Message) {
if ipc.stopRequested.Load() {
return
}
ipc.processingCalls.Add(1)
defer ipc.processingCalls.Add(-1)
if ipc.localApi == nil {
ipc.sendResponse(msg.Id, nil, fmt.Errorf("remote side does not accept ipc calls"))
@ -171,6 +182,10 @@ func (ipc *ipcCommon) handleResponse(msg Message) {
}
func (ipc *ipcCommon) Call(method string, params ...any) (Vals, error) {
if ipc.stopRequested.Load() {
return nil, fmt.Errorf("ipc is stopping")
}
ipc.mu.Lock()
id := ipc.nextId
ipc.nextId++
@ -203,7 +218,7 @@ func (ipc *ipcCommon) raiseErr(err error) {
}
}
func (ipc *ipcCommon) cleanup() {
func (ipc *ipcCommon) closeConn() {
ipc.mu.Lock()
defer ipc.mu.Unlock()
_ = ipc.conn.Close()
@ -288,6 +303,20 @@ func (p *ParentIPC) acceptConn() error {
return nil
}
func (p *ParentIPC) Stop() error {
if len(p.pendingCalls) > 0 {
return fmt.Errorf("there are calls pending")
}
if p.processingCalls.Load() > 0 {
return fmt.Errorf("there are calls processing")
}
p.stopRequested.Store(true)
if err := p.cmd.Process.Signal(syscall.SIGINT); err != nil {
return fmt.Errorf("send SIGTERM: %w", err)
}
return p.Wait()
}
func (p *ParentIPC) Wait() error {
waitErrCh := make(chan error, 1)
@ -301,11 +330,21 @@ func (p *ParentIPC) Wait() error {
retErr = fmt.Errorf("ipc internal error: %w", err)
case err := <-waitErrCh:
if err != nil {
retErr = fmt.Errorf("cmd wait: %w", err)
var exitErr *exec.ExitError
if ok := errors.As(err, &exitErr); ok {
if !exitErr.Success() {
ws, ok := exitErr.Sys().(syscall.WaitStatus)
if !(ok && ws.Signaled() && ws.Signal() == syscall.SIGINT && p.stopRequested.Load()) {
retErr = fmt.Errorf("cmd wait: %w", err)
}
}
} else {
retErr = fmt.Errorf("cmd wait: %w", err)
}
}
}
p.cleanup()
p.closeConn()
return retErr
}

View File

@ -27,7 +27,7 @@ declare abstract class IPCCommon {
protected conn: net.Socket | null;
protected nextId: number;
protected pendingCalls: Record<number, (result: CallResult) => void>;
protected closeRequested: boolean;
protected stopRequested: boolean;
protected processingCalls: number;
protected onError?: (err: Error) => void;
protected onClose?: () => void;

View File

@ -1 +1 @@
{"version":3,"file":"lib.d.ts","sourceRoot":"","sources":["../src/lib.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,GAAG,MAAM,UAAU,CAAC;AAUhC,aAAK,OAAO;IACR,IAAI,IAAI;IACR,QAAQ,IAAI;CACf;AAED,KAAK,IAAI,GAAG,GAAG,EAAE,CAAC;AAElB,UAAU,WAAW;IACjB,IAAI,EAAE,OAAO,CAAC,IAAI,CAAC;IACnB,EAAE,EAAE,MAAM,CAAC;IACX,MAAM,EAAE,MAAM,CAAC;IACf,MAAM,EAAE,IAAI,CAAC;CAChB;AAED,UAAU,eAAe;IACrB,IAAI,EAAE,OAAO,CAAC,QAAQ,CAAC;IACvB,EAAE,EAAE,MAAM,CAAC;IACX,MAAM,CAAC,EAAE,IAAI,CAAC;IACd,KAAK,CAAC,EAAE,MAAM,CAAC;CAClB;AAED,KAAK,OAAO,GAAG,WAAW,GAAG,eAAe,CAAC;AAE7C,UAAU,UAAU;IAChB,MAAM,EAAE,IAAI,CAAC;IACb,KAAK,EAAE,KAAK,GAAG,IAAI,CAAC;CACvB;AAED,uBAAe,SAAS;IACpB,SAAS,CAAC,SAAS,EAAE,MAAM,CAAC,MAAM,EAAE,GAAG,CAAC,CAAC;IACzC,SAAS,CAAC,UAAU,EAAE,MAAM,CAAC;IAC7B,SAAS,CAAC,IAAI,EAAE,GAAG,CAAC,MAAM,GAAG,IAAI,CAAQ;IACzC,SAAS,CAAC,MAAM,EAAE,MAAM,CAAK;IAC7B,SAAS,CAAC,YAAY,EAAE,MAAM,CAAC,MAAM,EAAE,CAAC,MAAM,EAAE,UAAU,KAAK,IAAI,CAAC,CAAM;IAC1E,SAAS,CAAC,cAAc,EAAE,OAAO,CAAS;IAC1C,SAAS,CAAC,eAAe,EAAE,MAAM,CAAK;IACtC,SAAS,CAAC,OAAO,CAAC,EAAE,CAAC,GAAG,EAAE,KAAK,KAAK,IAAI,CAAC;IACzC,SAAS,CAAC,OAAO,CAAC,EAAE,MAAM,IAAI,CAAC;IAE/B,SAAS,aAAa,SAAS,EAAE,MAAM,EAAE,EAAE,UAAU,EAAE,MAAM;IAS7D,SAAS,CAAC,QAAQ,IAAI,IAAI;IA4B1B,SAAS,CAAC,UAAU,CAAC,GAAG,EAAE,OAAO,GAAG,IAAI;IAWxC,SAAS,CAAC,OAAO,CAAC,GAAG,EAAE,OAAO,GAAG,IAAI;IAWrC,SAAS,CAAC,UAAU,CAAC,GAAG,EAAE,WAAW,GAAG,IAAI;IAsD5C,SAAS,CAAC,cAAc,CAAC,GAAG,EAAE,eAAe,GAAG,IAAI;IAapD,IAAI;IAWJ,IAAI,CAAC,MAAM,EAAE,MAAM,EAAE,GAAG,MAAM,EAAE,IAAI,GAAG,OAAO,CAAC,IAAI,CAAC;IAoBpD,SAAS,CAAC,QAAQ,CAAC,GAAG,EAAE,KAAK,GAAG,IAAI;CAGvC;AAGD,qBAAa,SAAU,SAAQ,SAAS;IACpC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAAS;IACjC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAAW;IACnC,OAAO,CAAC,GAAG,CAA6B;IACxC,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAa;gBAE1B,OAAO,EAAE,MAAM,EAAE,OAAO,EAAE,MAAM,EAAE,EAAE,GAAG,SAAS,EAAE,MAAM,EAAE;IAahE,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;YAuBd,UAAU;IAmBlB,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;CAgB9B;AAGD,qBAAa,QAAS,SAAQ,SAAS;gBACvB,GAAG,SAAS,EAAE,MAAM,EAAE;IAI5B,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;IAUtB,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;CAa9B"}
{"version":3,"file":"lib.d.ts","sourceRoot":"","sources":["../src/lib.ts"],"names":[],"mappings":"AAAA,OAAO,KAAK,GAAG,MAAM,UAAU,CAAC;AAUhC,aAAK,OAAO;IACR,IAAI,IAAI;IACR,QAAQ,IAAI;CACf;AAED,KAAK,IAAI,GAAG,GAAG,EAAE,CAAC;AAElB,UAAU,WAAW;IACjB,IAAI,EAAE,OAAO,CAAC,IAAI,CAAC;IACnB,EAAE,EAAE,MAAM,CAAC;IACX,MAAM,EAAE,MAAM,CAAC;IACf,MAAM,EAAE,IAAI,CAAC;CAChB;AAED,UAAU,eAAe;IACrB,IAAI,EAAE,OAAO,CAAC,QAAQ,CAAC;IACvB,EAAE,EAAE,MAAM,CAAC;IACX,MAAM,CAAC,EAAE,IAAI,CAAC;IACd,KAAK,CAAC,EAAE,MAAM,CAAC;CAClB;AAED,KAAK,OAAO,GAAG,WAAW,GAAG,eAAe,CAAC;AAE7C,UAAU,UAAU;IAChB,MAAM,EAAE,IAAI,CAAC;IACb,KAAK,EAAE,KAAK,GAAG,IAAI,CAAC;CACvB;AAED,uBAAe,SAAS;IACpB,SAAS,CAAC,SAAS,EAAE,MAAM,CAAC,MAAM,EAAE,GAAG,CAAC,CAAC;IACzC,SAAS,CAAC,UAAU,EAAE,MAAM,CAAC;IAC7B,SAAS,CAAC,IAAI,EAAE,GAAG,CAAC,MAAM,GAAG,IAAI,CAAQ;IACzC,SAAS,CAAC,MAAM,EAAE,MAAM,CAAK;IAC7B,SAAS,CAAC,YAAY,EAAE,MAAM,CAAC,MAAM,EAAE,CAAC,MAAM,EAAE,UAAU,KAAK,IAAI,CAAC,CAAM;IAC1E,SAAS,CAAC,aAAa,EAAE,OAAO,CAAS;IACzC,SAAS,CAAC,eAAe,EAAE,MAAM,CAAK;IACtC,SAAS,CAAC,OAAO,CAAC,EAAE,CAAC,GAAG,EAAE,KAAK,KAAK,IAAI,CAAC;IACzC,SAAS,CAAC,OAAO,CAAC,EAAE,MAAM,IAAI,CAAC;IAE/B,SAAS,aAAa,SAAS,EAAE,MAAM,EAAE,EAAE,UAAU,EAAE,MAAM;IAS7D,SAAS,CAAC,QAAQ,IAAI,IAAI;IA4B1B,SAAS,CAAC,UAAU,CAAC,GAAG,EAAE,OAAO,GAAG,IAAI;IAWxC,SAAS,CAAC,OAAO,CAAC,GAAG,EAAE,OAAO,GAAG,IAAI;IAWrC,SAAS,CAAC,UAAU,CAAC,GAAG,EAAE,WAAW,GAAG,IAAI;IAsD5C,SAAS,CAAC,cAAc,CAAC,GAAG,EAAE,eAAe,GAAG,IAAI;IAapD,IAAI;IAWJ,IAAI,CAAC,MAAM,EAAE,MAAM,EAAE,GAAG,MAAM,EAAE,IAAI,GAAG,OAAO,CAAC,IAAI,CAAC;IAoBpD,SAAS,CAAC,QAAQ,CAAC,GAAG,EAAE,KAAK,GAAG,IAAI;CAGvC;AAGD,qBAAa,SAAU,SAAQ,SAAS;IACpC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAAS;IACjC,OAAO,CAAC,QAAQ,CAAC,OAAO,CAAW;IACnC,OAAO,CAAC,GAAG,CAA6B;IACxC,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAa;gBAE1B,OAAO,EAAE,MAAM,EAAE,OAAO,EAAE,MAAM,EAAE,EAAE,GAAG,SAAS,EAAE,MAAM,EAAE;IAahE,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;YAuBd,UAAU;IAmBlB,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;CAgB9B;AAGD,qBAAa,QAAS,SAAQ,SAAS;gBACvB,GAAG,SAAS,EAAE,MAAM,EAAE;IAI5B,KAAK,IAAI,OAAO,CAAC,IAAI,CAAC;IAUtB,IAAI,IAAI,OAAO,CAAC,IAAI,CAAC;CAa9B"}

File diff suppressed because one or more lines are too long

View File

@ -42,7 +42,7 @@ abstract class IPCCommon {
protected conn: net.Socket | null = null;
protected nextId: number = 0;
protected pendingCalls: Record<number, (result: CallResult) => void> = {};
protected closeRequested: boolean = false;
protected stopRequested: boolean = false;
protected processingCalls: number = 0;
protected onError?: (err: Error) => void;
protected onClose?: () => void;
@ -155,7 +155,7 @@ abstract class IPCCommon {
this.processingCalls--;
}
if(this.closeRequested) {
if(this.stopRequested) {
if(this.onClose) this.onClose();
}
}
@ -174,13 +174,13 @@ abstract class IPCCommon {
}
stop() {
if (this.closeRequested) {
if (this.stopRequested) {
throw new Error('close already requested');
}
if(!this.conn || this.conn.readyState === "closed") {
throw new Error('connection already closed');
}
this.closeRequested = true;
this.stopRequested = true;
if(this.onClose) this.onClose();
}