package storage_migrate import ( "context" "crypto/md5" "encoding/hex" "fmt" "io" "os" "path" "path/filepath" "strings" "time" "quyun/v2/app/commands" "quyun/v2/database" "quyun/v2/database/models" "quyun/v2/providers/postgres" "quyun/v2/providers/storage" "github.com/spf13/cobra" "go.ipao.vip/atom" "go.ipao.vip/atom/container" "go.uber.org/dig" "gorm.io/gorm" ) func defaultProviders() container.Providers { return commands.Default(container.Providers{ postgres.DefaultProvider(), storage.DefaultProvider(), database.DefaultProvider(), }...) } func Command() atom.Option { return atom.Command( atom.Name("storage-migrate"), atom.Short("migrate media assets to md5 object keys"), atom.Arguments(func(cmd *cobra.Command) { cmd.Flags().Bool("dry-run", false, "preview changes without writing") cmd.Flags().Int("batch", 200, "batch size per scan") }), atom.RunE(Serve), atom.Providers(defaultProviders()), atom.Example("storage-migrate --dry-run"), ) } type Service struct { dig.In DB *gorm.DB Storage *storage.Storage } func Serve(cmd *cobra.Command, args []string) error { return container.Container.Invoke(func(ctx context.Context, svc Service) error { models.SetDefault(svc.DB) dryRun, _ := cmd.Flags().GetBool("dry-run") batchSize, _ := cmd.Flags().GetInt("batch") if batchSize <= 0 { batchSize = 200 } localPath := svc.Storage.Config.LocalPath if localPath == "" { localPath = "./storage" } fmt.Printf("storage migrate: dry-run=%v batch=%d local-path=%s\n", dryRun, batchSize, localPath) tenantCache := make(map[int64]*models.Tenant) offset := 0 for { tbl, q := models.MediaAssetQuery.QueryContext(ctx) list, err := q.Order(tbl.ID.Asc()).Offset(offset).Limit(batchSize).Find() if err != nil { return err } if len(list) == 0 { break } for _, asset := range list { // 仅处理本地存储且有实际文件路径的资源。 if strings.ToLower(asset.Provider) != "local" { continue } if asset.ObjectKey == "" { continue } if strings.HasPrefix(asset.ObjectKey, "http://") || strings.HasPrefix(asset.ObjectKey, "https://") { continue } srcPath := asset.ObjectKey if !filepath.IsAbs(srcPath) { srcPath = filepath.Join(localPath, filepath.FromSlash(srcPath)) } hash, size, err := fileMD5(srcPath) if err != nil { fmt.Printf("skip asset=%d err=%v\n", asset.ID, err) continue } filename := asset.Meta.Data().Filename if filename == "" { filename = path.Base(asset.ObjectKey) } var tenant *models.Tenant if asset.TenantID > 0 { if cached, ok := tenantCache[asset.TenantID]; ok { tenant = cached } else if t, err := models.TenantQuery.WithContext(ctx).Where(models.TenantQuery.ID.Eq(asset.TenantID)).First(); err == nil { tenantCache[asset.TenantID] = t tenant = t } } newKey := buildObjectKey(tenant, hash, filename) dstPath := filepath.Join(localPath, filepath.FromSlash(newKey)) if asset.ObjectKey == newKey && asset.Hash == hash { continue } if !dryRun { if asset.ObjectKey != newKey { if err := os.MkdirAll(filepath.Dir(dstPath), 0o755); err != nil { return err } if _, err := os.Stat(dstPath); err == nil { if srcPath != dstPath { _ = os.Remove(srcPath) } } else if err := os.Rename(srcPath, dstPath); err != nil { return err } } _, err := models.MediaAssetQuery.WithContext(ctx). Where(models.MediaAssetQuery.ID.Eq(asset.ID)). UpdateSimple( models.MediaAssetQuery.ObjectKey.Value(newKey), models.MediaAssetQuery.Hash.Value(hash), models.MediaAssetQuery.UpdatedAt.Value(time.Now()), ) if err != nil { return err } } fmt.Printf("migrated asset=%d key=%s hash=%s size=%d\n", asset.ID, newKey, hash, size) } offset += len(list) } return nil }) } func buildObjectKey(tenant *models.Tenant, hash, filename string) string { // 按租户维度组织对象路径:quyun//. tenantUUID := "public" if tenant != nil && tenant.UUID.String() != "" { tenantUUID = tenant.UUID.String() } ext := strings.ToLower(filepath.Ext(filename)) return path.Join("quyun", tenantUUID, hash+ext) } func fileMD5(filename string) (string, int64, error) { f, err := os.Open(filename) if err != nil { return "", 0, err } defer f.Close() h := md5.New() size, err := io.Copy(h, f) if err != nil { return "", size, err } return hex.EncodeToString(h.Sum(nil)), size, nil }