Skip to content

Commit

Permalink
Move stdio attach from libcontainerd backend to callback
Browse files Browse the repository at this point in the history
Signed-off-by: Tonis Tiigi <[email protected]>
  • Loading branch information
tonistiigi committed Oct 24, 2016
1 parent b2786a3 commit 37a3be2
Show file tree
Hide file tree
Showing 16 changed files with 145 additions and 151 deletions.
44 changes: 44 additions & 0 deletions container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/docker/docker/daemon/network"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/promise"
Expand Down Expand Up @@ -1012,3 +1013,46 @@ func (container *Container) CancelAttachContext() {
}
container.attachContext.mu.Unlock()
}

func (container *Container) startLogging() error {
if container.HostConfig.LogConfig.Type == "none" {
return nil // do not start logging routines
}

l, err := container.StartLogger(container.HostConfig.LogConfig)
if err != nil {
return fmt.Errorf("Failed to initialize logging driver: %v", err)
}

copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
container.LogCopier = copier
copier.Run()
container.LogDriver = l

// set LogPath field only for json-file logdriver
if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
container.LogPath = jl.LogPath()
}

return nil
}

// InitializeStdio is called by libcontainerd to connect the stdio.
func (container *Container) InitializeStdio(iop libcontainerd.IOPipe) error {
if err := container.startLogging(); err != nil {
container.Reset(false)
return err
}

container.StreamConfig.CopyToPipe(iop)

if container.Stdin() == nil && !container.Config.Tty {
if iop.Stdin != nil {
if err := iop.Stdin.Close(); err != nil {
logrus.Error("error closing stdin: %+v", err)
}
}
}

return nil
}
2 changes: 1 addition & 1 deletion daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (daemon *Daemon) restore() error {

if c.IsRunning() || c.IsPaused() {
c.RestartManager().Cancel() // manually start containers because some need to wait for swarm networking
if err := daemon.containerd.Restore(c.ID); err != nil {
if err := daemon.containerd.Restore(c.ID, c.InitializeStdio); err != nil {
logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R

attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)

systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p)
systemPid, err := d.containerd.AddProcess(ctx, c.ID, name, p, ec.InitializeStdio)
if err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions daemon/exec/exec.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package exec

import (
"runtime"
"sync"

"github.com/Sirupsen/logrus"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/runconfig"
)
Expand Down Expand Up @@ -39,6 +42,21 @@ func NewConfig() *Config {
}
}

// InitializeStdio is called by libcontainerd to connect the stdio.
func (c *Config) InitializeStdio(iop libcontainerd.IOPipe) error {
c.StreamConfig.CopyToPipe(iop)

if c.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
if iop.Stdin != nil {
if err := iop.Stdin.Close(); err != nil {
logrus.Error("error closing exec stdin: %+v", err)
}
}
}

return nil
}

// Store keeps track of the exec configurations.
type Store struct {
commands map[string]*Config
Expand Down
25 changes: 0 additions & 25 deletions daemon/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
timetypes "github.com/docker/docker/api/types/time"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stdcopy"
)
Expand Down Expand Up @@ -121,30 +120,6 @@ func (daemon *Daemon) getLogger(container *container.Container) (logger.Logger,
return container.StartLogger(container.HostConfig.LogConfig)
}

// StartLogging initializes and starts the container logging stream.
func (daemon *Daemon) StartLogging(container *container.Container) error {
if container.HostConfig.LogConfig.Type == "none" {
return nil // do not start logging routines
}

l, err := container.StartLogger(container.HostConfig.LogConfig)
if err != nil {
return fmt.Errorf("Failed to initialize logging driver: %v", err)
}

copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
container.LogCopier = copier
copier.Run()
container.LogDriver = l

// set LogPath field only for json-file logdriver
if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
container.LogPath = jl.LogPath()
}

return nil
}

// mergeLogConfig merges the daemon log config to the container's log config if the container's log driver is not specified.
func (daemon *Daemon) mergeAndVerifyLogConfig(cfg *containertypes.LogConfig) error {
if cfg.Type == "" {
Expand Down
69 changes: 0 additions & 69 deletions daemon/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ package daemon
import (
"errors"
"fmt"
"io"
"runtime"
"strconv"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/restartmanager"
"github.com/docker/docker/runconfig"
)

// StateChanged updates daemon state changes from containerd
Expand Down Expand Up @@ -133,69 +130,3 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {

return nil
}

// AttachStreams is called by libcontainerd to connect the stdio.
func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
var (
s *runconfig.StreamConfig
ec *exec.Config
)

c := daemon.containers.Get(id)
if c == nil {
var err error
ec, err = daemon.getExecConfig(id)
if err != nil {
return fmt.Errorf("no such exec/container: %s", id)
}
s = ec.StreamConfig
} else {
s = c.StreamConfig
if err := daemon.StartLogging(c); err != nil {
c.Reset(false)
return err
}
}

copyFunc := func(w io.Writer, r io.Reader) {
s.Add(1)
go func() {
if _, err := io.Copy(w, r); err != nil {
logrus.Errorf("%v stream copy error: %v", id, err)
}
s.Done()
}()
}

if iop.Stdout != nil {
copyFunc(s.Stdout(), iop.Stdout)
}
if iop.Stderr != nil {
copyFunc(s.Stderr(), iop.Stderr)
}

if stdin := s.Stdin(); stdin != nil {
if iop.Stdin != nil {
go func() {
io.Copy(iop.Stdin, stdin)
if err := iop.Stdin.Close(); err != nil {
logrus.Error(err)
}
}()
}
} else {
//TODO(swernli): On Windows, not closing stdin when no tty is requested by the exec Config
// results in a hang. We should re-evaluate generalizing this fix for all OSes if
// we can determine that is the right thing to do more generally.
if (c != nil && !c.Config.Tty) || (ec != nil && !ec.Tty && runtime.GOOS == "windows") {
// tty is enabled, so dont close containerd's iopipe stdin.
if iop.Stdin != nil {
if err := iop.Stdin.Close(); err != nil {
logrus.Error(err)
}
}
}
}

return nil
}
2 changes: 1 addition & 1 deletion daemon/monitor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (daemon *Daemon) postRunProcessing(container *container.Container, e libcon

// Create a new servicing container, which will start, complete the update, and merge back the
// results if it succeeded, all as part of the below function call.
if err := daemon.containerd.Create((container.ID + "_servicing"), "", "", *spec, newOpts...); err != nil {
if err := daemon.containerd.Create((container.ID + "_servicing"), "", "", *spec, container.InitializeStdio, newOpts...); err != nil {
container.SetExitCode(-1)
return fmt.Errorf("Post-run update servicing failed: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
container.ResetRestartManager(true)
}

if err := daemon.containerd.Create(container.ID, checkpoint, container.CheckpointDir(), *spec, createOptions...); err != nil {
if err := daemon.containerd.Create(container.ID, checkpoint, container.CheckpointDir(), *spec, container.InitializeStdio, createOptions...); err != nil {
errDesc := grpc.ErrorDesc(err)
logrus.Errorf("Create container failed with error: %s", errDesc)
// if we receive an internal error from the initial start of a container then lets
Expand Down
20 changes: 8 additions & 12 deletions libcontainerd/client_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type client struct {
// AddProcess is the handler for adding a process to an already running
// container. It's called through docker exec. It returns the system pid of the
// exec'd process.
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) (int, error) {
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (int, error) {
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
Expand Down Expand Up @@ -116,14 +116,10 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly

container.processes[processFriendlyName] = p

clnt.unlock(containerID)

if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
if err := attachStdio(*iopipe); err != nil {
p.closeFifos(iopipe)
return -1, err
}
clnt.lock(containerID)

return int(resp.SystemPid), nil
}
Expand Down Expand Up @@ -153,7 +149,7 @@ func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
return p, nil
}

func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) (err error) {
func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) (err error) {
clnt.lock(containerID)
defer clnt.unlock(containerID)

Expand Down Expand Up @@ -195,7 +191,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir
return err
}

return container.start(checkpoint, checkpointDir)
return container.start(checkpoint, checkpointDir, attachStdio)
}

func (clnt *client) Signal(containerID string, sig int) error {
Expand Down Expand Up @@ -404,7 +400,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
return w
}

func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
clnt.lock(cont.Id)
defer clnt.unlock(cont.Id)

Expand Down Expand Up @@ -445,7 +441,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
return err
})

if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
if err := attachStdio(*iopipe); err != nil {
container.closeFifos(iopipe)
return err
}
Expand Down Expand Up @@ -537,7 +533,7 @@ func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error)
return ev, err
}

func (clnt *client) Restore(containerID string, options ...CreateOption) error {
func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
// Synchronize with live events
clnt.remote.Lock()
defer clnt.remote.Unlock()
Expand Down Expand Up @@ -585,7 +581,7 @@ func (clnt *client) Restore(containerID string, options ...CreateOption) error {

// container is still alive
if clnt.liveRestore {
if err := clnt.restore(cont, ev, options...); err != nil {
if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
}
return nil
Expand Down
17 changes: 5 additions & 12 deletions libcontainerd/client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const defaultOwner = "docker"
// },
// "Servicing": false
//}
func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) error {
func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, attachStdio StdioCallback, options ...CreateOption) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
logrus.Debugln("libcontainerd: client.Create() with spec", spec)
Expand Down Expand Up @@ -253,7 +253,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir
// internal structure, start will keep HCS in sync by deleting the
// container there.
logrus.Debugf("libcontainerd: Create() id=%s, Calling start()", containerID)
if err := container.start(); err != nil {
if err := container.start(attachStdio); err != nil {
clnt.deleteContainer(containerID)
return err
}
Expand All @@ -266,7 +266,7 @@ func (clnt *client) Create(containerID string, checkpoint string, checkpointDir
// AddProcess is the handler for adding a process to an already running
// container. It's called through docker exec. It returns the system pid of the
// exec'd process.
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) (int, error) {
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio StdioCallback) (int, error) {
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
Expand Down Expand Up @@ -343,18 +343,11 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
// Add the process to the container's list of processes
container.processes[processFriendlyName] = proc

// Make sure the lock is not held while calling back into the daemon
clnt.unlock(containerID)

// Tell the engine to attach streams back to the client
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
if err := attachStdio(*iopipe); err != nil {
return -1, err
}

// Lock again so that the defer unlock doesn't fail. (I really don't like this code)
clnt.lock(containerID)

// Spin up a go routine waiting for exit to handle cleanup
go container.waitExit(proc, false)

Expand Down Expand Up @@ -544,7 +537,7 @@ func (clnt *client) Stats(containerID string) (*Stats, error) {
}

// Restore is the handler for restoring a container
func (clnt *client) Restore(containerID string, unusedOnWindows ...CreateOption) error {
func (clnt *client) Restore(containerID string, _ StdioCallback, unusedOnWindows ...CreateOption) error {
// TODO Windows: Implement this. For now, just tell the backend the container exited.
logrus.Debugf("libcontainerd: Restore(%s)", containerID)
return clnt.backend.StateChanged(containerID, StateInfo{
Expand Down
Loading

0 comments on commit 37a3be2

Please sign in to comment.