1
0
mirror of https://github.com/owncloud/ocis.git synced 2025-04-18 23:44:07 +03:00

feat: cli, delete stale uploads

This commit is contained in:
Michal Klos 2025-04-08 09:29:36 +02:00
parent d2ff8f49ff
commit fc381e8045
2 changed files with 210 additions and 1 deletions

View File

@ -0,0 +1,6 @@
Enhancement: CLI, storage-users uploads delete-stale-nodes
An oCIS command that deletes stale nodes: in processing state wihout connected upload info.
https://github.com/owncloud/ocis/pull/11216
https://github.com/owncloud/enterprise/issues/7178

View File

@ -4,12 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"time"
tw "github.com/olekukonko/tablewriter"
"github.com/shamaton/msgpack/v2"
"github.com/urfave/cli/v2"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
@ -21,9 +24,20 @@ import (
"github.com/owncloud/reva/v2/pkg/events"
"github.com/owncloud/reva/v2/pkg/storage"
"github.com/owncloud/reva/v2/pkg/storage/fs/registry"
"github.com/owncloud/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/owncloud/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/owncloud/reva/v2/pkg/utils"
)
const (
MSGPACK_KEY_USER_OCIS_NODESTATUS = "user.ocis.nodestatus"
// Log indentation levels
LOG_INDENT_L1 = " " // 2 spaces
LOG_INDENT_L2 = LOG_INDENT_L1 + LOG_INDENT_L1
LOG_INDENT_L3 = LOG_INDENT_L2 + LOG_INDENT_L1
)
// Session contains the information of an upload session
type Session struct {
ID string `json:"id"`
@ -42,11 +56,11 @@ type Session struct {
// Uploads is the entry point for the uploads command
func Uploads(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "uploads",
Usage: "manage unfinished uploads",
Subcommands: []*cli.Command{
ListUploadSessions(cfg),
DeleteStaleProcessingNodes(cfg),
},
}
}
@ -311,3 +325,192 @@ func buildInfo(filter storage.UploadSessionFilter) string {
b.WriteString(":")
return b.String()
}
// DeleteStaleProcessingNodes is the entry point for the delete-stale-nodes command
func DeleteStaleProcessingNodes(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "delete-stale-nodes",
Usage: "Delete all nodes in processing state that are not referenced by any upload session",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "spaceid",
Usage: "Space ID to check for processing nodes (omit to check all spaces)",
Required: false,
},
&cli.BoolFlag{
Name: "dry-run",
Usage: "Only show what would be deleted without actually deleting",
Value: true,
},
&cli.BoolFlag{
Name: "verbose",
Usage: "Enable verbose logging",
Value: false,
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(ctx *cli.Context) error {
spaceIDs := []string{}
dryRun := ctx.Bool("dry-run")
verbose := ctx.Bool("verbose")
start := time.Now()
// Check if specific space ID provided
if ctx.IsSet("spaceid") {
spaceIDs = append(spaceIDs, ctx.String("spaceid"))
} else {
fmt.Println("Scanning all spaces for stale processing nodes...")
spaceIDs = globSpaceIDs(cfg)
}
if verbose {
fmt.Printf("Spaces to cleanup: %d\n", len(spaceIDs))
for _, spaceID := range spaceIDs {
fmt.Printf(" - %s\n", spaceID)
}
}
staleCount := 0
for _, spaceID := range spaceIDs {
staleCount += deleteStaleUploads(cfg, spaceID, dryRun, verbose)
}
if verbose {
fmt.Printf("Took %ds\n", int(time.Since(start).Seconds()))
}
fmt.Printf("Total stale nodes: %d\n", staleCount)
return nil
},
}
}
// globSpaceIDs returns a list of all space IDs in the storage root
func globSpaceIDs(cfg *config.Config) []string {
fsys := os.DirFS(cfg.Drivers.OCIS.Root)
dirs, err := fs.Glob(fsys, "spaces/*/*/nodes")
if err != nil {
fmt.Fprintf(os.Stderr, "Error globbing spaces root directory %s: %v\n", cfg.Drivers.OCIS.Root, err)
return []string{}
}
spaceIDs := []string{}
for _, dir := range dirs {
// For dir i.e. spaces/9d/408cec-8f0a-4d33-8715-89df1217a10c/nodes
// spaceID is 9d408cec-8f0a-4d33-8715-89df1217a10c
spaceIDs = append(spaceIDs, strings.ReplaceAll(strings.TrimSuffix(strings.TrimPrefix(dir, "spaces/"), "/nodes"), "/", ""))
}
return spaceIDs
}
// delete stale processing nodes for a given spaceID
func deleteStaleUploads(cfg *config.Config, spaceID string, dryRun bool, verbose bool) int {
if verbose {
fmt.Printf("\nDeleting stale processing nodes for space: %s\n", spaceID)
}
// Find .mpk files in space directory
spaceRoot := filepath.Join(cfg.Drivers.OCIS.Root, "spaces", lookup.Pathify(spaceID, 1, 2))
mpkFiles := []string{}
err := filepath.Walk(spaceRoot, func(path string, info os.FileInfo, err error) error {
if err != nil {
fmt.Fprintf(os.Stderr, "Error accessing path %s: %s\n", path, err)
return filepath.SkipDir
}
if !info.IsDir() && strings.HasSuffix(path, ".mpk") {
mpkFiles = append(mpkFiles, path)
}
return nil
})
if err != nil {
fmt.Fprintf(os.Stderr, "Error walking space directory %s: %s\n", spaceRoot, err)
return 0
}
if verbose {
fmt.Printf("%sFound total %d .mpk files\n", LOG_INDENT_L1, len(mpkFiles))
}
staleCount := 0
for _, path := range mpkFiles {
staleCount += deleteStaleNode(cfg, path, dryRun, verbose)
}
if verbose {
fmt.Printf("%sFound total %d stale nodes\n", LOG_INDENT_L1, staleCount)
}
return staleCount
}
// deleteStaleNode deletes a stale node: if it is not referenced by any upload session
// returns 1 if the node stale node was detected for deletion, 0 otherwise, for counting purposes
func deleteStaleNode(cfg *config.Config, path string, dryRun bool, verbose bool) int {
nodeDir := filepath.Dir(path)
// Read .mpk file to get processing info
b, err := os.ReadFile(path)
if err != nil {
fmt.Fprintf(os.Stderr, "Error reading file %s: %s\n", path, err)
return 0
}
var mpkData map[string]interface{}
if err := msgpack.Unmarshal(b, &mpkData); err != nil {
fmt.Fprintf(os.Stderr, "Error unmarshaling file %s: %s\n", path, err)
return 0
}
processingID := extractProcessingID(mpkData)
if processingID == "" {
return 0
}
// Construct path to upload info file:
// i.e. ~/.ocis/storage/users/uploads/5329c14b-b786-4b27-8f7d-7429f03009d7.info
// And pass only the .info file not exists: err is ErrNotExist
pathUploadInfo := filepath.Join(cfg.Drivers.OCIS.Root, "uploads", processingID) + ".info"
_, infoStatErr := os.Stat(pathUploadInfo)
if infoStatErr == nil {
return 0
}
if !os.IsNotExist(infoStatErr) {
// Tere was an error other than file not existing, log and return
fmt.Fprintf(os.Stderr, "Error checking upload info %s: %s\n", pathUploadInfo, infoStatErr)
return 0
}
if verbose {
fmt.Printf("%sFound stale upload at %s (Processing ID: %s)\n", LOG_INDENT_L1, path, processingID)
fmt.Printf("%sUpload info missing at: %s\n", LOG_INDENT_L2, pathUploadInfo)
}
if dryRun {
return 1
}
if err := os.RemoveAll(nodeDir); err != nil {
fmt.Fprintf(os.Stderr, "%sError deleting stale node %s: %v\n", LOG_INDENT_L2, nodeDir, err)
return 0
}
if verbose {
fmt.Printf("%sDeleted stale node: %s\n", LOG_INDENT_L2, nodeDir)
}
return 1
}
func extractProcessingID(mpkData map[string]interface{}) string {
processingID := ""
for k, v := range mpkData {
vStr := string(v.([]byte))
if k == MSGPACK_KEY_USER_OCIS_NODESTATUS && strings.Contains(vStr, node.ProcessingStatus) {
processingID = strings.Split(vStr, ":")[1]
break
}
}
return processingID
}