mirror of
https://github.com/moby/moby.git
synced 2025-08-08 13:22:22 +03:00
use router.Cancellable instead of direct CloseNotify
Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
@@ -24,6 +24,6 @@ func (r *buildRouter) Routes() []router.Route {
|
|||||||
|
|
||||||
func (r *buildRouter) initRoutes() {
|
func (r *buildRouter) initRoutes() {
|
||||||
r.routes = []router.Route{
|
r.routes = []router.Route{
|
||||||
router.NewPostRoute("/build", r.postBuild),
|
router.Cancellable(router.NewPostRoute("/build", r.postBuild)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -184,21 +184,6 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
|
|||||||
stdout := &streamformatter.StdoutFormatter{Writer: out, StreamFormatter: sf}
|
stdout := &streamformatter.StdoutFormatter{Writer: out, StreamFormatter: sf}
|
||||||
stderr := &streamformatter.StderrFormatter{Writer: out, StreamFormatter: sf}
|
stderr := &streamformatter.StderrFormatter{Writer: out, StreamFormatter: sf}
|
||||||
|
|
||||||
finished := make(chan struct{})
|
|
||||||
defer close(finished)
|
|
||||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
|
||||||
notifyContext, cancel := context.WithCancel(ctx)
|
|
||||||
closeNotifier := notifier.CloseNotify()
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-closeNotifier:
|
|
||||||
cancel()
|
|
||||||
case <-finished:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
ctx = notifyContext
|
|
||||||
}
|
|
||||||
|
|
||||||
imgID, err := br.backend.Build(ctx, buildOptions,
|
imgID, err := br.backend.Build(ctx, buildOptions,
|
||||||
builder.DockerIgnoreContext{ModifiableContext: buildContext},
|
builder.DockerIgnoreContext{ModifiableContext: buildContext},
|
||||||
stdout, stderr, out)
|
stdout, stderr, out)
|
||||||
|
@@ -4,6 +4,8 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/backend"
|
"github.com/docker/docker/api/types/backend"
|
||||||
"github.com/docker/docker/pkg/archive"
|
"github.com/docker/docker/pkg/archive"
|
||||||
"github.com/docker/docker/pkg/version"
|
"github.com/docker/docker/pkg/version"
|
||||||
@@ -49,8 +51,8 @@ type stateBackend interface {
|
|||||||
type monitorBackend interface {
|
type monitorBackend interface {
|
||||||
ContainerChanges(name string) ([]archive.Change, error)
|
ContainerChanges(name string) ([]archive.Change, error)
|
||||||
ContainerInspect(name string, size bool, version version.Version) (interface{}, error)
|
ContainerInspect(name string, size bool, version version.Version) (interface{}, error)
|
||||||
ContainerLogs(name string, config *backend.ContainerLogsConfig, started chan struct{}) error
|
ContainerLogs(ctx context.Context, name string, config *backend.ContainerLogsConfig, started chan struct{}) error
|
||||||
ContainerStats(name string, config *backend.ContainerStatsConfig) error
|
ContainerStats(ctx context.Context, name string, config *backend.ContainerStatsConfig) error
|
||||||
ContainerTop(name string, psArgs string) (*types.ContainerProcessList, error)
|
ContainerTop(name string, psArgs string) (*types.ContainerProcessList, error)
|
||||||
|
|
||||||
Containers(config *types.ContainerListOptions) ([]*types.Container, error)
|
Containers(config *types.ContainerListOptions) ([]*types.Container, error)
|
||||||
|
@@ -33,8 +33,8 @@ func (r *containerRouter) initRoutes() {
|
|||||||
router.NewGetRoute("/containers/{name:.*}/changes", r.getContainersChanges),
|
router.NewGetRoute("/containers/{name:.*}/changes", r.getContainersChanges),
|
||||||
router.NewGetRoute("/containers/{name:.*}/json", r.getContainersByName),
|
router.NewGetRoute("/containers/{name:.*}/json", r.getContainersByName),
|
||||||
router.NewGetRoute("/containers/{name:.*}/top", r.getContainersTop),
|
router.NewGetRoute("/containers/{name:.*}/top", r.getContainersTop),
|
||||||
router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs),
|
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/logs", r.getContainersLogs)),
|
||||||
router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats),
|
router.Cancellable(router.NewGetRoute("/containers/{name:.*}/stats", r.getContainersStats)),
|
||||||
router.NewGetRoute("/containers/{name:.*}/attach/ws", r.wsContainersAttach),
|
router.NewGetRoute("/containers/{name:.*}/attach/ws", r.wsContainersAttach),
|
||||||
router.NewGetRoute("/exec/{id:.*}/json", r.getExecByID),
|
router.NewGetRoute("/exec/{id:.*}/json", r.getExecByID),
|
||||||
router.NewGetRoute("/containers/{name:.*}/archive", r.getContainersArchive),
|
router.NewGetRoute("/containers/{name:.*}/archive", r.getContainersArchive),
|
||||||
|
@@ -67,19 +67,13 @@ func (s *containerRouter) getContainersStats(ctx context.Context, w http.Respons
|
|||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
}
|
}
|
||||||
|
|
||||||
var closeNotifier <-chan bool
|
|
||||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
|
||||||
closeNotifier = notifier.CloseNotify()
|
|
||||||
}
|
|
||||||
|
|
||||||
config := &backend.ContainerStatsConfig{
|
config := &backend.ContainerStatsConfig{
|
||||||
Stream: stream,
|
Stream: stream,
|
||||||
OutStream: w,
|
OutStream: w,
|
||||||
Stop: closeNotifier,
|
|
||||||
Version: string(httputils.VersionFromContext(ctx)),
|
Version: string(httputils.VersionFromContext(ctx)),
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.backend.ContainerStats(vars["name"], config)
|
return s.backend.ContainerStats(ctx, vars["name"], config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *containerRouter) getContainersLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
func (s *containerRouter) getContainersLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||||
@@ -97,11 +91,6 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
|
|||||||
return fmt.Errorf("Bad parameters: you must choose at least one stream")
|
return fmt.Errorf("Bad parameters: you must choose at least one stream")
|
||||||
}
|
}
|
||||||
|
|
||||||
var closeNotifier <-chan bool
|
|
||||||
if notifier, ok := w.(http.CloseNotifier); ok {
|
|
||||||
closeNotifier = notifier.CloseNotify()
|
|
||||||
}
|
|
||||||
|
|
||||||
containerName := vars["name"]
|
containerName := vars["name"]
|
||||||
logsConfig := &backend.ContainerLogsConfig{
|
logsConfig := &backend.ContainerLogsConfig{
|
||||||
ContainerLogsOptions: types.ContainerLogsOptions{
|
ContainerLogsOptions: types.ContainerLogsOptions{
|
||||||
@@ -113,11 +102,10 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
|
|||||||
ShowStderr: stderr,
|
ShowStderr: stderr,
|
||||||
},
|
},
|
||||||
OutStream: w,
|
OutStream: w,
|
||||||
Stop: closeNotifier,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
chStarted := make(chan struct{})
|
chStarted := make(chan struct{})
|
||||||
if err := s.backend.ContainerLogs(containerName, logsConfig, chStarted); err != nil {
|
if err := s.backend.ContainerLogs(ctx, containerName, logsConfig, chStarted); err != nil {
|
||||||
select {
|
select {
|
||||||
case <-chStarted:
|
case <-chStarted:
|
||||||
// The client may be expecting all of the data we're sending to
|
// The client may be expecting all of the data we're sending to
|
||||||
|
@@ -18,7 +18,7 @@ func NewRouter(b Backend) router.Router {
|
|||||||
r.routes = []router.Route{
|
r.routes = []router.Route{
|
||||||
router.NewOptionsRoute("/{anyroute:.*}", optionsHandler),
|
router.NewOptionsRoute("/{anyroute:.*}", optionsHandler),
|
||||||
router.NewGetRoute("/_ping", pingHandler),
|
router.NewGetRoute("/_ping", pingHandler),
|
||||||
router.NewGetRoute("/events", r.getEvents),
|
router.Cancellable(router.NewGetRoute("/events", r.getEvents)),
|
||||||
router.NewGetRoute("/info", r.getInfo),
|
router.NewGetRoute("/info", r.getInfo),
|
||||||
router.NewGetRoute("/version", r.getVersion),
|
router.NewGetRoute("/version", r.getVersion),
|
||||||
router.NewPostRoute("/auth", r.postAuth),
|
router.NewPostRoute("/auth", r.postAuth),
|
||||||
|
@@ -83,11 +83,6 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var closeNotify <-chan bool
|
|
||||||
if closeNotifier, ok := w.(http.CloseNotifier); ok {
|
|
||||||
closeNotify = closeNotifier.CloseNotify()
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ev := <-l:
|
case ev := <-l:
|
||||||
@@ -101,8 +96,8 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
|
|||||||
}
|
}
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
return nil
|
return nil
|
||||||
case <-closeNotify:
|
case <-ctx.Done():
|
||||||
logrus.Debug("Client disconnected, stop sending events")
|
logrus.Debug("Client context cancelled, stop sending events")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -31,7 +31,6 @@ type ContainerAttachConfig struct {
|
|||||||
type ContainerLogsConfig struct {
|
type ContainerLogsConfig struct {
|
||||||
types.ContainerLogsOptions
|
types.ContainerLogsOptions
|
||||||
OutStream io.Writer
|
OutStream io.Writer
|
||||||
Stop <-chan bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainerStatsConfig holds information for configuring the runtime
|
// ContainerStatsConfig holds information for configuring the runtime
|
||||||
@@ -39,7 +38,6 @@ type ContainerLogsConfig struct {
|
|||||||
type ContainerStatsConfig struct {
|
type ContainerStatsConfig struct {
|
||||||
Stream bool
|
Stream bool
|
||||||
OutStream io.Writer
|
OutStream io.Writer
|
||||||
Stop <-chan bool
|
|
||||||
Version string
|
Version string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -6,6 +6,8 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/api/types/backend"
|
"github.com/docker/docker/api/types/backend"
|
||||||
"github.com/docker/docker/container"
|
"github.com/docker/docker/container"
|
||||||
@@ -19,7 +21,7 @@ import (
|
|||||||
|
|
||||||
// ContainerLogs hooks up a container's stdout and stderr streams
|
// ContainerLogs hooks up a container's stdout and stderr streams
|
||||||
// configured with the given struct.
|
// configured with the given struct.
|
||||||
func (daemon *Daemon) ContainerLogs(containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
|
func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, config *backend.ContainerLogsConfig, started chan struct{}) error {
|
||||||
container, err := daemon.GetContainer(containerName)
|
container, err := daemon.GetContainer(containerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -78,7 +80,7 @@ func (daemon *Daemon) ContainerLogs(containerName string, config *backend.Contai
|
|||||||
case err := <-logs.Err:
|
case err := <-logs.Err:
|
||||||
logrus.Errorf("Error streaming logs: %v", err)
|
logrus.Errorf("Error streaming logs: %v", err)
|
||||||
return nil
|
return nil
|
||||||
case <-config.Stop:
|
case <-ctx.Done():
|
||||||
logs.Close()
|
logs.Close()
|
||||||
return nil
|
return nil
|
||||||
case msg, ok := <-logs.Msg:
|
case msg, ok := <-logs.Msg:
|
||||||
|
@@ -5,6 +5,8 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types/backend"
|
"github.com/docker/docker/api/types/backend"
|
||||||
"github.com/docker/docker/pkg/ioutils"
|
"github.com/docker/docker/pkg/ioutils"
|
||||||
"github.com/docker/docker/pkg/version"
|
"github.com/docker/docker/pkg/version"
|
||||||
@@ -14,7 +16,7 @@ import (
|
|||||||
|
|
||||||
// ContainerStats writes information about the container to the stream
|
// ContainerStats writes information about the container to the stream
|
||||||
// given in the config object.
|
// given in the config object.
|
||||||
func (daemon *Daemon) ContainerStats(prefixOrName string, config *backend.ContainerStatsConfig) error {
|
func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, config *backend.ContainerStatsConfig) error {
|
||||||
if runtime.GOOS == "windows" {
|
if runtime.GOOS == "windows" {
|
||||||
return errors.New("Windows does not support stats")
|
return errors.New("Windows does not support stats")
|
||||||
}
|
}
|
||||||
@@ -114,7 +116,7 @@ func (daemon *Daemon) ContainerStats(prefixOrName string, config *backend.Contai
|
|||||||
if !config.Stream {
|
if !config.Stream {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
case <-config.Stop:
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user