From 947efff180c1ea2865936703b579b1e7a96dbede Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 7 Jan 2015 14:43:04 -0800 Subject: [PATCH] Implement container stats collection in daemon Signed-off-by: Michael Crosby Upstream-commit: 65f58e2a742205c9e8470b360bd439642a5c8211 Component: engine --- components/engine/api/server/server.go | 14 ++++ components/engine/daemon/container.go | 7 ++ components/engine/daemon/daemon.go | 16 +++++ components/engine/daemon/execdriver/driver.go | 9 +++ .../engine/daemon/execdriver/lxc/driver.go | 5 ++ .../engine/daemon/execdriver/native/driver.go | 18 +++++ components/engine/daemon/start.go | 15 ++++ components/engine/daemon/stats_collector.go | 71 +++++++++++++++++++ 8 files changed, 155 insertions(+) create mode 100644 components/engine/daemon/stats_collector.go diff --git a/components/engine/api/server/server.go b/components/engine/api/server/server.go index d2715f1bc6..d5cdbd00cc 100644 --- a/components/engine/api/server/server.go +++ b/components/engine/api/server/server.go @@ -411,6 +411,19 @@ func getContainersJSON(eng *engine.Engine, version version.Version, w http.Respo return nil } +func getContainersStats(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := parseForm(r); err != nil { + return err + } + if vars == nil { + return fmt.Errorf("Missing parameter") + } + name := vars["name"] + job := eng.Job("container_stats", name) + streamJSON(job, w, true) + return job.Run() +} + func getContainersLogs(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := parseForm(r); err != nil { return err @@ -1323,6 +1336,7 @@ func createRouter(eng *engine.Engine, logging, enableCors bool, dockerVersion st "/containers/{name:.*}/json": getContainersByName, "/containers/{name:.*}/top": getContainersTop, "/containers/{name:.*}/logs": getContainersLogs, + "/containers/{name:.*}/stats": getContainersStats, "/containers/{name:.*}/attach/ws": wsContainersAttach, "/exec/{id:.*}/json": getExecByID, }, diff --git a/components/engine/daemon/container.go b/components/engine/daemon/container.go index b0eaea03b3..4a02328783 100644 --- a/components/engine/daemon/container.go +++ b/components/engine/daemon/container.go @@ -1414,3 +1414,10 @@ func (container *Container) getNetworkedContainer() (*Container, error) { return nil, fmt.Errorf("network mode not set to container") } } + +func (container *Container) Stats() (*execdriver.ResourceStats, error) { + if !container.IsRunning() { + return nil, fmt.Errorf("cannot collect stats on a non running container") + } + return container.daemon.Stats(container) +} diff --git a/components/engine/daemon/daemon.go b/components/engine/daemon/daemon.go index 1b2d13bb6d..01d8245dee 100644 --- a/components/engine/daemon/daemon.go +++ b/components/engine/daemon/daemon.go @@ -104,6 +104,7 @@ type Daemon struct { driver graphdriver.Driver execDriver execdriver.Driver trustStore *trust.TrustStore + statsCollector *statsCollector } // Install installs daemon capabilities to eng. @@ -116,6 +117,7 @@ func (daemon *Daemon) Install(eng *engine.Engine) error { "container_copy": daemon.ContainerCopy, "container_rename": daemon.ContainerRename, "container_inspect": daemon.ContainerInspect, + "container_stats": daemon.ContainerStats, "containers": daemon.Containers, "create": daemon.ContainerCreate, "rm": daemon.ContainerRm, @@ -982,6 +984,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine) (*Daemon, error) execDriver: ed, eng: eng, trustStore: t, + statsCollector: newStatsCollector(1 * time.Second), } if err := daemon.restore(); err != nil { return nil, err @@ -1092,6 +1095,19 @@ func (daemon *Daemon) Kill(c *Container, sig int) error { return daemon.execDriver.Kill(c.command, sig) } +func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) { + return daemon.execDriver.Stats(c.ID) +} + +func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver.ResourceStats, error) { + c := daemon.Get(name) + if c == nil { + return nil, fmt.Errorf("no such container") + } + ch := daemon.statsCollector.collect(c) + return ch, nil +} + // Nuke kills all containers then removes all content // from the content root, including images, volumes and // container filesystems. diff --git a/components/engine/daemon/execdriver/driver.go b/components/engine/daemon/execdriver/driver.go index fe99e062d1..044a2ea0a7 100644 --- a/components/engine/daemon/execdriver/driver.go +++ b/components/engine/daemon/execdriver/driver.go @@ -5,7 +5,9 @@ import ( "io" "os" "os/exec" + "time" + "github.com/docker/libcontainer" "github.com/docker/libcontainer/devices" ) @@ -61,6 +63,7 @@ type Driver interface { GetPidsForContainer(id string) ([]int, error) // Returns a list of pids for the given container. Terminate(c *Command) error // kill it with fire Clean(id string) error // clean all traces of container exec + Stats(id string) (*ResourceStats, error) // Get resource stats for a running container } // Network settings of the container @@ -101,6 +104,12 @@ type Resources struct { Cpuset string `json:"cpuset"` } +type ResourceStats struct { + *libcontainer.ContainerStats + Read time.Time `json:"read"` + ClockTicks int `json:"clock_ticks"` +} + type Mount struct { Source string `json:"source"` Destination string `json:"destination"` diff --git a/components/engine/daemon/execdriver/lxc/driver.go b/components/engine/daemon/execdriver/lxc/driver.go index c02ceae97a..7dca19d762 100644 --- a/components/engine/daemon/execdriver/lxc/driver.go +++ b/components/engine/daemon/execdriver/lxc/driver.go @@ -524,3 +524,8 @@ func (t *TtyConsole) Close() error { func (d *driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessConfig, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (int, error) { return -1, ErrExec } + +func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { + return nil, fmt.Errorf("container stats are not support with LXC") + +} diff --git a/components/engine/daemon/execdriver/native/driver.go b/components/engine/daemon/execdriver/native/driver.go index f6099bd049..e82d784aa1 100644 --- a/components/engine/daemon/execdriver/native/driver.go +++ b/components/engine/daemon/execdriver/native/driver.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "syscall" + "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" @@ -279,6 +280,23 @@ func (d *driver) Clean(id string) error { return os.RemoveAll(filepath.Join(d.root, id)) } +func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { + state, err := libcontainer.GetState(filepath.Join(d.root, id)) + if err != nil { + return nil, err + } + now := time.Now() + stats, err := libcontainer.GetStats(nil, state) + if err != nil { + return nil, err + } + return &execdriver.ResourceStats{ + ContainerStats: stats, + ClockTicks: system.GetClockTicks(), + Read: now, + }, nil +} + func getEnv(key string, env []string) string { for _, pair := range env { parts := strings.Split(pair, "=") diff --git a/components/engine/daemon/start.go b/components/engine/daemon/start.go index 363461080f..89116a84d8 100644 --- a/components/engine/daemon/start.go +++ b/components/engine/daemon/start.go @@ -1,6 +1,7 @@ package daemon import ( + "encoding/json" "fmt" "os" "strings" @@ -77,3 +78,17 @@ func (daemon *Daemon) setHostConfig(container *Container, hostConfig *runconfig. return nil } + +func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { + stats, err := daemon.SubscribeToContainerStats(job.Args[0]) + if err != nil { + return job.Error(err) + } + enc := json.NewEncoder(job.Stdout) + for update := range stats { + if err := enc.Encode(update); err != nil { + return job.Error(err) + } + } + return engine.StatusOK +} diff --git a/components/engine/daemon/stats_collector.go b/components/engine/daemon/stats_collector.go new file mode 100644 index 0000000000..0d1059d8b2 --- /dev/null +++ b/components/engine/daemon/stats_collector.go @@ -0,0 +1,71 @@ +package daemon + +import ( + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/execdriver" +) + +func newStatsCollector(interval time.Duration) *statsCollector { + s := &statsCollector{ + interval: interval, + containers: make(map[string]*statsCollectorData), + } + s.start() + return s +} + +type statsCollectorData struct { + c *Container + lastStats *execdriver.ResourceStats + subs []chan *execdriver.ResourceStats +} + +// statsCollector manages and provides container resource stats +type statsCollector struct { + m sync.Mutex + interval time.Duration + containers map[string]*statsCollectorData +} + +func (s *statsCollector) collect(c *Container) <-chan *execdriver.ResourceStats { + s.m.Lock() + ch := make(chan *execdriver.ResourceStats, 1024) + s.containers[c.ID] = &statsCollectorData{ + c: c, + subs: []chan *execdriver.ResourceStats{ + ch, + }, + } + s.m.Unlock() + return ch +} + +func (s *statsCollector) stopCollection(c *Container) { + s.m.Lock() + delete(s.containers, c.ID) + s.m.Unlock() +} + +func (s *statsCollector) start() { + go func() { + for _ = range time.Tick(s.interval) { + log.Debugf("starting collection of container stats") + s.m.Lock() + for id, d := range s.containers { + stats, err := d.c.Stats() + if err != nil { + // TODO: @crosbymichael evict container depending on error + log.Errorf("collecting stats for %s: %v", id, err) + continue + } + for _, sub := range s.containers[id].subs { + sub <- stats + } + } + s.m.Unlock() + } + }() +}