Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

[19.03 backport] Handle blocked I/O of exec'd processes #296

Merged
merged 2 commits into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func (i *rio) Close() error {
}

func (i *rio) Wait() {
i.sc.Wait()
i.sc.Wait(context.Background())

i.IO.Wait()
}
29 changes: 26 additions & 3 deletions container/stream/streams.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream // import "github.com/docker/docker/container/stream"

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -24,11 +25,12 @@ import (
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".
type Config struct {
sync.WaitGroup
wg sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
dio *cio.DirectIO
}

// NewConfig creates a stream config and initializes
Expand Down Expand Up @@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error {

// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
c.dio = iop
copyFunc := func(w io.Writer, r io.ReadCloser) {
c.Add(1)
c.wg.Add(1)
go func() {
if _, err := pools.Copy(w, r); err != nil {
logrus.Errorf("stream copy error: %v", err)
}
r.Close()
c.Done()
c.wg.Done()
}()
}

Expand All @@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) {
}
}
}

// Wait for the stream to close
// Wait supports timeouts via the context to unblock and forcefully
// close the io streams
func (c *Config) Wait(ctx context.Context) {
done := make(chan struct{}, 1)
go func() {
c.wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
if c.dio != nil {
c.dio.Cancel()
c.dio.Wait()
c.dio.Close()
}
}
}
3 changes: 2 additions & 1 deletion daemon/exec/exec.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package exec // import "github.com/docker/docker/daemon/exec"

import (
"context"
"runtime"
"sync"

Expand Down Expand Up @@ -58,7 +59,7 @@ func (i *rio) Close() error {
}

func (i *rio) Wait() {
i.sc.Wait()
i.sc.Wait(context.Background())

i.IO.Wait()
}
Expand Down
11 changes: 8 additions & 3 deletions daemon/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
if err != nil {
logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
}

c.StreamConfig.Wait()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
c.StreamConfig.Wait(ctx)
cancel()
c.Reset(false)

exitStatus := container.ExitStatus{
Expand Down Expand Up @@ -124,7 +125,11 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
defer execConfig.Unlock()
execConfig.ExitCode = &ec
execConfig.Running = false
execConfig.StreamConfig.Wait()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
execConfig.StreamConfig.Wait(ctx)
cancel()

if err := execConfig.CloseStreams(); err != nil {
logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
}
Expand Down
99 changes: 0 additions & 99 deletions integration-cli/docker_cli_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ import (
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/docker/client"
"github.com/docker/docker/integration-cli/cli"
"github.com/docker/docker/integration-cli/cli/build"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/go-check/check"
"gotest.tools/assert"
is "gotest.tools/assert/cmp"
Expand Down Expand Up @@ -534,100 +532,3 @@ func (s *DockerSuite) TestExecEnvLinksHost(c *check.C) {
assert.Check(c, is.Contains(out, "HOSTNAME=myhost"))
assert.Check(c, is.Contains(out, "DB_NAME=/bar/db"))
}

func (s *DockerSuite) TestExecWindowsOpenHandles(c *check.C) {
testRequires(c, DaemonIsWindows)

if runtime.GOOS == "windows" {
v, err := kernel.GetKernelVersion()
assert.NilError(c, err)
build, _ := strconv.Atoi(strings.Split(strings.SplitN(v.String(), " ", 3)[2][1:], ".")[0])
if build >= 17743 {
c.Skip("Temporarily disabled on RS5 17743+ builds due to platform bug")

// This is being tracked internally. @jhowardmsft. Summary of failure
// from an email in early July 2018 below:
//
// Platform regression. In cmd.exe by the look of it. I can repro
// it outside of CI. It fails the same on 17681, 17676 and even as
// far back as 17663, over a month old. From investigating, I can see
// what's happening in the container, but not the reason. The test
// starts a long-running container based on the Windows busybox image.
// It then adds another process (docker exec) to that container to
// sleep. It loops waiting for two instances of busybox.exe running,
// and cmd.exe to quit. What's actually happening is that the second
// exec hangs indefinitely, and from docker top, I can see
// "OpenWith.exe" running.

//Manual repro would be
//# Start the first long-running container
//docker run --rm -d --name test busybox sleep 300

//# In another window, docker top test. There should be a single instance of busybox.exe running
//# In a third window, docker exec test cmd /c start sleep 10 NOTE THIS HANGS UNTIL 5 MIN TIMEOUT
//# In the second window, run docker top test. Note that OpenWith.exe is running, one cmd.exe and only one busybox. I would expect no "OpenWith" and two busybox.exe's.
}
}

runSleepingContainer(c, "-d", "--name", "test")
exec := make(chan bool)
go func() {
dockerCmd(c, "exec", "test", "cmd", "/c", "start sleep 10")
exec <- true
}()

count := 0
for {
top := make(chan string)
var out string
go func() {
out, _ := dockerCmd(c, "top", "test")
top <- out
}()

select {
case <-time.After(time.Second * 5):
c.Fatal("timed out waiting for top while exec is exiting")
case out = <-top:
break
}

if strings.Count(out, "busybox.exe") == 2 && !strings.Contains(out, "cmd.exe") {
// The initial exec process (cmd.exe) has exited, and both sleeps are currently running
break
}
count++
if count >= 30 {
c.Fatal("too many retries")
}
time.Sleep(1 * time.Second)
}

inspect := make(chan bool)
go func() {
dockerCmd(c, "inspect", "test")
inspect <- true
}()

select {
case <-time.After(time.Second * 5):
c.Fatal("timed out waiting for inspect while exec is exiting")
case <-inspect:
break
}

// Ensure the background sleep is still running
out, _ := dockerCmd(c, "top", "test")
assert.Equal(c, strings.Count(out, "busybox.exe"), 2)

// The exec should exit when the background sleep exits
select {
case <-time.After(time.Second * 15):
c.Fatal("timed out waiting for async exec to exit")
case <-exec:
// Ensure the background sleep has actually exited
out, _ := dockerCmd(c, "top", "test")
assert.Equal(c, strings.Count(out, "busybox.exe"), 1)
break
}
}
16 changes: 8 additions & 8 deletions libcontainerd/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,13 +652,6 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
}).Error("exit event")
return
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}

ctr, err := c.getContainer(ctx, ei.ContainerID)
if err != nil {
Expand All @@ -672,11 +665,18 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
c.logger.WithFields(logrus.Fields{
"container": ei.ContainerID,
"error": err,
}).Error("failed to find container")
}).Error("failed to get container labels")
return
}
newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
}
_, err = p.Delete(context.Background())
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"container": ei.ContainerID,
"process": ei.ProcessID,
}).Warn("failed to delete process")
}
}
})
}
Expand Down