Merge pull request #2855 from MichaelEischer/rclone-exit

Fix rclone subprocess handling
This commit is contained in:
MichaelEischer 2020-07-26 22:55:25 +02:00 committed by GitHub
commit be54ceff66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 29 deletions

View File

@ -63,6 +63,9 @@ func run(command string, args ...string) (*StdioConn, *exec.Cmd, *sync.WaitGroup
stdout, w, err := os.Pipe() stdout, w, err := os.Pipe()
if err != nil { if err != nil {
// close first pipe
r.Close()
stdin.Close()
return nil, nil, nil, nil, err return nil, nil, nil, nil, err
} }
@ -70,13 +73,23 @@ func run(command string, args ...string) (*StdioConn, *exec.Cmd, *sync.WaitGroup
cmd.Stdout = w cmd.Stdout = w
bg, err := backend.StartForeground(cmd) bg, err := backend.StartForeground(cmd)
// close rclone side of pipes
errR := r.Close()
errW := w.Close()
// return first error
if err == nil {
err = errR
}
if err == nil {
err = errW
}
if err != nil { if err != nil {
return nil, nil, nil, nil, err return nil, nil, nil, nil, err
} }
c := &StdioConn{ c := &StdioConn{
stdin: stdout, receive: stdout,
stdout: stdin, send: stdin,
cmd: cmd, cmd: cmd,
} }
@ -114,7 +127,7 @@ func wrapConn(c *StdioConn, lim limiter.Limiter) wrappedConn {
} }
// New initializes a Backend and starts the process. // New initializes a Backend and starts the process.
func New(cfg Config, lim limiter.Limiter) (*Backend, error) { func newBackend(cfg Config, lim limiter.Limiter) (*Backend, error) {
var ( var (
args []string args []string
err error err error
@ -183,6 +196,8 @@ func New(cfg Config, lim limiter.Limiter) (*Backend, error) {
err := cmd.Wait() err := cmd.Wait()
debug.Log("Wait returned %v", err) debug.Log("Wait returned %v", err)
be.waitResult = err be.waitResult = err
// close our side of the pipes to rclone
stdioConn.CloseAll()
close(waitCh) close(waitCh)
}() }()
@ -234,7 +249,7 @@ func New(cfg Config, lim limiter.Limiter) (*Backend, error) {
// Open starts an rclone process with the given config. // Open starts an rclone process with the given config.
func Open(cfg Config, lim limiter.Limiter) (*Backend, error) { func Open(cfg Config, lim limiter.Limiter) (*Backend, error) {
be, err := New(cfg, lim) be, err := newBackend(cfg, lim)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -260,7 +275,7 @@ func Open(cfg Config, lim limiter.Limiter) (*Backend, error) {
// Create initializes a new restic repo with clone. // Create initializes a new restic repo with clone.
func Create(cfg Config) (*Backend, error) { func Create(cfg Config) (*Backend, error) {
be, err := New(cfg, nil) be, err := newBackend(cfg, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -299,7 +314,7 @@ func (be *Backend) Close() error {
debug.Log("rclone exited") debug.Log("rclone exited")
case <-time.After(waitForExit): case <-time.After(waitForExit):
debug.Log("timeout, closing file descriptors") debug.Log("timeout, closing file descriptors")
err := be.conn.Close() err := be.conn.CloseAll()
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,37 @@
package rclone
import (
"context"
"os/exec"
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
// restic should detect rclone exiting.
func TestRcloneExit(t *testing.T) {
dir, cleanup := rtest.TempDir(t)
defer cleanup()
cfg := NewConfig()
cfg.Remote = dir
be, err := Open(cfg, nil)
if e, ok := errors.Cause(err).(*exec.Error); ok && e.Err == exec.ErrNotFound {
t.Skipf("program %q not found", e.Name)
return
}
rtest.OK(t, err)
defer be.Close()
err = be.cmd.Process.Kill()
rtest.OK(t, err)
t.Log("killed rclone")
_, err = be.Stat(context.TODO(), restic.Handle{
Name: "foo",
Type: restic.DataFile,
})
rtest.Assert(t, err != nil, "expected an error")
}

View File

@ -12,37 +12,42 @@ import (
// StdioConn implements a net.Conn via stdin/stdout. // StdioConn implements a net.Conn via stdin/stdout.
type StdioConn struct { type StdioConn struct {
stdin *os.File receive *os.File
stdout *os.File send *os.File
cmd *exec.Cmd cmd *exec.Cmd
close sync.Once closeRecv sync.Once
closeSend sync.Once
} }
func (s *StdioConn) Read(p []byte) (int, error) { func (s *StdioConn) Read(p []byte) (int, error) {
n, err := s.stdin.Read(p) n, err := s.receive.Read(p)
return n, err return n, err
} }
func (s *StdioConn) Write(p []byte) (int, error) { func (s *StdioConn) Write(p []byte) (int, error) {
n, err := s.stdout.Write(p) n, err := s.send.Write(p)
return n, err return n, err
} }
// Close closes both streams. // Close closes the stream to the child process.
func (s *StdioConn) Close() (err error) { func (s *StdioConn) Close() (err error) {
s.close.Do(func() { s.closeSend.Do(func() {
debug.Log("close stdio connection") debug.Log("close stdio send connection")
var errs []error err = s.send.Close()
})
for _, f := range []func() error{s.stdin.Close, s.stdout.Close} { return err
err := f()
if err != nil {
errs = append(errs, err)
}
} }
if len(errs) > 0 { // CloseAll closes both streams.
err = errs[0] func (s *StdioConn) CloseAll() (err error) {
err = s.Close()
s.closeRecv.Do(func() {
debug.Log("close stdio receive connection")
err2 := s.receive.Close()
if err == nil {
err = err2
} }
}) })
@ -61,8 +66,8 @@ func (s *StdioConn) RemoteAddr() net.Addr {
// SetDeadline sets the read/write deadline. // SetDeadline sets the read/write deadline.
func (s *StdioConn) SetDeadline(t time.Time) error { func (s *StdioConn) SetDeadline(t time.Time) error {
err1 := s.stdin.SetReadDeadline(t) err1 := s.receive.SetReadDeadline(t)
err2 := s.stdout.SetWriteDeadline(t) err2 := s.send.SetWriteDeadline(t)
if err1 != nil { if err1 != nil {
return err1 return err1
} }
@ -71,12 +76,12 @@ func (s *StdioConn) SetDeadline(t time.Time) error {
// SetReadDeadline sets the read/write deadline. // SetReadDeadline sets the read/write deadline.
func (s *StdioConn) SetReadDeadline(t time.Time) error { func (s *StdioConn) SetReadDeadline(t time.Time) error {
return s.stdin.SetReadDeadline(t) return s.receive.SetReadDeadline(t)
} }
// SetWriteDeadline sets the read/write deadline. // SetWriteDeadline sets the read/write deadline.
func (s *StdioConn) SetWriteDeadline(t time.Time) error { func (s *StdioConn) SetWriteDeadline(t time.Time) error {
return s.stdout.SetWriteDeadline(t) return s.send.SetWriteDeadline(t)
} }
// make sure StdioConn implements net.Conn // make sure StdioConn implements net.Conn