1
0
mirror of https://github.com/moby/buildkit.git synced 2025-08-02 13:26:49 +03:00
Files
buildkit/source/local/source.go
Tonis Tiigi 4c9d94f93c add cache key debuginfo lookup
This allows opt-in to cache key debug database on
daemon startup.

If enabled, all cache keys generated by builds are
saved into this database together with the plaintexts
of the original data so a reverse lookup can be performed
later to compare two checksums and find out their original
difference. If checksum contains other checksums internally
then these are saved as well. For storage constraints, the
plaintext of file content is not saved but the metadata
portion can be still looked up.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
2025-06-30 23:29:36 -07:00

399 lines
10 KiB
Go

package local
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/contenthash"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
srctypes "github.com/moby/buildkit/source/types"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/cachedigest"
"github.com/moby/buildkit/util/progress"
"github.com/moby/patternmatcher"
"github.com/moby/sys/user"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/time/rate"
)
type Opt struct {
CacheAccessor cache.Accessor
}
func NewSource(opt Opt) (source.Source, error) {
ls := &localSource{
cm: opt.CacheAccessor,
}
return ls, nil
}
type localSource struct {
cm cache.Accessor
}
func (ls *localSource) Schemes() []string {
return []string{srctypes.LocalScheme}
}
func (ls *localSource) Identifier(scheme, ref string, attrs map[string]string, platform *pb.Platform) (source.Identifier, error) {
id, err := NewLocalIdentifier(ref)
if err != nil {
return nil, err
}
for k, v := range attrs {
switch k {
case pb.AttrLocalSessionID:
id.SessionID = v
if p := strings.SplitN(v, ":", 2); len(p) == 2 {
id.Name = p[0] + "-" + id.Name
id.SessionID = p[1]
}
case pb.AttrIncludePatterns:
var patterns []string
if err := json.Unmarshal([]byte(v), &patterns); err != nil {
return nil, err
}
id.IncludePatterns = patterns
case pb.AttrExcludePatterns:
var patterns []string
if err := json.Unmarshal([]byte(v), &patterns); err != nil {
return nil, err
}
id.ExcludePatterns = patterns
case pb.AttrFollowPaths:
var paths []string
if err := json.Unmarshal([]byte(v), &paths); err != nil {
return nil, err
}
id.FollowPaths = paths
case pb.AttrSharedKeyHint:
id.SharedKeyHint = v
case pb.AttrLocalDiffer:
switch v {
case pb.AttrLocalDifferMetadata, "":
id.Differ = fsutil.DiffMetadata
case pb.AttrLocalDifferNone:
id.Differ = fsutil.DiffNone
}
case pb.AttrMetadataTransfer:
b, err := strconv.ParseBool(v)
if err != nil {
return nil, errors.Wrapf(err, "invalid value for local.metadatatransfer %q", v)
}
id.MetadataOnly = b
case pb.AttrMetadataTransferExclude:
var exceptions []string
if err := json.Unmarshal([]byte(v), &exceptions); err != nil {
return nil, err
}
id.MetadataExceptions = exceptions
}
}
return id, nil
}
func (ls *localSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager, _ solver.Vertex) (source.SourceInstance, error) {
localIdentifier, ok := id.(*LocalIdentifier)
if !ok {
return nil, errors.Errorf("invalid local identifier %v", id)
}
return &localSourceHandler{
src: *localIdentifier,
sm: sm,
localSource: ls,
}, nil
}
type localSourceHandler struct {
src LocalIdentifier
sm *session.Manager
*localSource
}
func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, string, solver.CacheOpts, bool, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
id := g.SessionIterator().NextSession()
if id == "" {
return "", "", nil, false, errors.New("could not access local files without session")
}
sessionID = id
}
dt, err := json.Marshal(struct {
SessionID string
IncludePatterns []string
ExcludePatterns []string
FollowPaths []string
MetadataTransfer bool `json:",omitempty"`
MetadataExceptions []string `json:",omitempty"`
}{
SessionID: sessionID,
IncludePatterns: ls.src.IncludePatterns,
ExcludePatterns: ls.src.ExcludePatterns,
FollowPaths: ls.src.FollowPaths,
MetadataTransfer: ls.src.MetadataOnly,
MetadataExceptions: ls.src.MetadataExceptions,
})
if err != nil {
return "", "", nil, false, err
}
dgst, err := cachedigest.FromBytes(dt, cachedigest.TypeJSON)
if err != nil {
return "", "", nil, false, err
}
return "session:" + ls.src.Name + ":" + dgst.String(), dgst.String(), nil, true, nil
}
func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
return ls.snapshotWithAnySession(ctx, g)
}
timeoutCtx, cancel := context.WithCancelCause(ctx)
timeoutCtx, _ = context.WithTimeoutCause(timeoutCtx, 5*time.Second, errors.WithStack(context.DeadlineExceeded)) //nolint:govet
defer func() { cancel(errors.WithStack(context.Canceled)) }()
caller, err := ls.sm.Get(timeoutCtx, sessionID, false)
if err != nil {
return ls.snapshotWithAnySession(ctx, g)
}
ref, err := ls.snapshot(ctx, caller)
if err != nil {
var serr filesync.InvalidSessionError
if errors.As(err, &serr) {
return ls.snapshotWithAnySession(ctx, g)
}
return nil, err
}
return ref, nil
}
func (ls *localSourceHandler) snapshotWithAnySession(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
var ref cache.ImmutableRef
err := ls.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
r, err := ls.snapshot(ctx, c)
if err != nil {
return err
}
ref = r
return nil
})
return ref, err
}
func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Caller) (out cache.ImmutableRef, retErr error) {
metaSfx := ""
if ls.src.MetadataOnly {
metaSfx = ":metadata"
}
sharedKey := ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() + metaSfx // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid)
var mutable cache.MutableRef
sis, err := searchSharedKey(ctx, ls.cm, sharedKey)
if err != nil {
return nil, err
}
for _, si := range sis {
if m, err := ls.cm.GetMutable(ctx, si.ID()); err == nil {
bklog.G(ctx).Debugf("reusing ref for local: %s", m.ID())
mutable = m
break
} else {
bklog.G(ctx).Debugf("not reusing ref %s for local: %v", si.ID(), err)
}
}
if mutable == nil {
m, err := ls.cm.New(ctx, nil, nil, cache.CachePolicyRetain, cache.WithRecordType(client.UsageRecordTypeLocalSource), cache.WithDescription(fmt.Sprintf("local source for %s", ls.src.Name)))
if err != nil {
return nil, err
}
mutable = m
bklog.G(ctx).Debugf("new ref for local: %s", mutable.ID())
}
defer func() {
if retErr != nil && mutable != nil {
// on error remove the record as checksum update is in undefined state
if err := mutable.SetCachePolicyDefault(); err != nil {
bklog.G(ctx).Errorf("failed to reset mutable cachepolicy: %v", err)
}
contenthash.ClearCacheContext(mutable)
go mutable.Release(context.WithoutCancel(ctx))
}
}()
mount, err := mutable.Mount(ctx, false, nil)
if err != nil {
return nil, err
}
lm := snapshot.LocalMounter(mount)
dest, err := lm.Mount()
if err != nil {
return nil, err
}
defer func() {
if retErr != nil && lm != nil {
lm.Unmount()
}
}()
cc, err := contenthash.GetCacheContext(ctx, mutable)
if err != nil {
return nil, err
}
opt := filesync.FSSendRequestOpt{
Name: ls.src.Name,
IncludePatterns: ls.src.IncludePatterns,
ExcludePatterns: ls.src.ExcludePatterns,
FollowPaths: ls.src.FollowPaths,
DestDir: dest,
CacheUpdater: &cacheUpdater{cc, mount.IdentityMapping()},
ProgressCb: newProgressHandler(ctx, "transferring "+ls.src.Name+":"),
Differ: ls.src.Differ,
MetadataOnly: ls.src.MetadataOnly,
}
if opt.MetadataOnly && len(ls.src.MetadataExceptions) > 0 {
matcher, err := patternmatcher.New(ls.src.MetadataExceptions)
if err != nil {
return nil, errors.WithStack(err)
}
opt.MetadataOnlyFilter = func(p string, _ *fstypes.Stat) bool {
v, err := matcher.MatchesOrParentMatches(p)
return err == nil && v
}
}
if idmap := mount.IdentityMapping(); idmap != nil {
opt.Filter = func(p string, stat *fstypes.Stat) bool {
uid, gid, err := idmap.ToHost(int(stat.Uid), int(stat.Gid))
if err != nil {
return false
}
stat.Uid = uint32(uid)
stat.Gid = uint32(gid)
return true
}
}
if err := filesync.FSSync(ctx, caller, opt); err != nil {
return nil, err
}
if err := lm.Unmount(); err != nil {
return nil, err
}
lm = nil
if err := contenthash.SetCacheContext(ctx, mutable, cc); err != nil {
return nil, err
}
// skip storing snapshot by the shared key if it already exists
md := cacheRefMetadata{mutable}
if md.getSharedKey() != sharedKey {
if err := md.setSharedKey(sharedKey); err != nil {
return nil, err
}
bklog.G(ctx).Debugf("saved %s as %s", mutable.ID(), sharedKey)
}
snap, err := mutable.Commit(ctx)
if err != nil {
return nil, err
}
mutable = nil // avoid deferred cleanup
return snap, nil
}
func newProgressHandler(ctx context.Context, id string) func(int, bool) {
limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
pw, _, _ := progress.NewFromContext(ctx)
now := time.Now()
st := progress.Status{
Started: &now,
Action: "transferring",
}
pw.Write(id, st)
return func(s int, last bool) {
if last || limiter.Allow() {
st.Current = s
if last {
now := time.Now()
st.Completed = &now
}
pw.Write(id, st)
if last {
pw.Close()
}
}
}
}
type cacheUpdater struct {
contenthash.CacheContext
idmap *user.IdentityMapping
}
func (cu *cacheUpdater) MarkSupported(bool) {
}
func (cu *cacheUpdater) ContentHasher() fsutil.ContentHasher {
return contenthash.NewFromStat
}
const (
keySharedKey = "local.sharedKey"
sharedKeyIndex = keySharedKey + ":"
)
func searchSharedKey(ctx context.Context, store cache.MetadataStore, k string) ([]cacheRefMetadata, error) {
var results []cacheRefMetadata
mds, err := store.Search(ctx, sharedKeyIndex+k, false)
if err != nil {
return nil, err
}
for _, md := range mds {
results = append(results, cacheRefMetadata{md})
}
return results, nil
}
type cacheRefMetadata struct {
cache.RefMetadata
}
func (md cacheRefMetadata) getSharedKey() string {
return md.GetString(keySharedKey)
}
func (md cacheRefMetadata) setSharedKey(key string) error {
return md.SetString(keySharedKey, key, sharedKeyIndex+key)
}