From d72f38417796ca9a003a15be0da8e86896f9cc63 Mon Sep 17 00:00:00 2001 From: Rogee Date: Thu, 16 Jan 2025 17:02:30 +0800 Subject: [PATCH] add storages --- .../app/events/subscribers/post_deleted.go | 15 ++- backend/app/http/medias/service.go | 16 +++ backend/app/http/storages/service.go | 75 ++++++++++++ backend/app/jobs/post_delete_assets.go | 107 ++++++++++++++++++ backend/pkg/storage/storage.go | 23 ---- 5 files changed, 211 insertions(+), 25 deletions(-) create mode 100644 backend/app/jobs/post_delete_assets.go diff --git a/backend/app/events/subscribers/post_deleted.go b/backend/app/events/subscribers/post_deleted.go index b1c8ce6..d844bc8 100644 --- a/backend/app/events/subscribers/post_deleted.go +++ b/backend/app/events/subscribers/post_deleted.go @@ -7,6 +7,7 @@ import ( "backend/app/events" "backend/app/events/publishers" "backend/app/http/posts" + "backend/app/jobs" "backend/providers/job" "git.ipao.vip/rogeecn/atom/contracts" @@ -53,8 +54,18 @@ func (e *PostDeleted) Handler(msg *message.Message) ([]*message.Message, error) if err != nil { return nil, errors.Wrapf(err, "failed to get item by id: %d", payload.ID) } - _ = post - // TODO: handle post deletion + + job, err := e.job.Client() + if err != nil { + return nil, err + } + + _, err = job.Insert(context.Background(), jobs.PostDeleteAssetsJob{ + PostID: post.ID, + }, nil) + if err != nil { + return nil, err + } return nil, nil } diff --git a/backend/app/http/medias/service.go b/backend/app/http/medias/service.go index 5faf9f1..9ba949b 100644 --- a/backend/app/http/medias/service.go +++ b/backend/app/http/medias/service.go @@ -108,3 +108,19 @@ func (svc *Service) GetMediaByHash(ctx context.Context, tenantID, userID int64, } return &ret, nil } + +func (svc *Service) DeleteByID(ctx context.Context, id ...int64) error { + if len(id) == 0 { + return nil + } + + _, span := otel.Start(ctx, "medias.service.DeleteByID") + defer span.End() + + tbl := table.Medias + stmt := tbl.DELETE().WHERE(tbl.ID.IN(lo.Map(id, func(item int64, _ int) Expression { return Int64(item) })...)) + span.SetAttributes(semconv.DBStatementKey.String(stmt.DebugSql())) + + _, err := stmt.ExecContext(ctx, svc.db) + return err +} diff --git a/backend/app/http/storages/service.go b/backend/app/http/storages/service.go index 37df550..ee7524f 100644 --- a/backend/app/http/storages/service.go +++ b/backend/app/http/storages/service.go @@ -3,13 +3,17 @@ package storages import ( "context" "database/sql" + "errors" + "backend/database/fields" "backend/database/models/qvyun_v2/public/model" "backend/database/models/qvyun_v2/public/table" "backend/providers/otel" . "github.com/go-jet/jet/v2/postgres" + "github.com/samber/lo" log "github.com/sirupsen/logrus" + "github.com/spf13/afero" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" ) @@ -138,3 +142,74 @@ func (svc *Service) GetDefault(ctx context.Context) (*model.Storages, error) { } return &storage, nil } + +// GetStoragesByID +func (svc *Service) GetStoragesByID(ctx context.Context, ids ...int64) ([]model.Storages, error) { + if len(ids) == 0 { + return nil, nil + } + ids = lo.Uniq(ids) + + _, span := otel.Start(ctx, "storages.service.GetStoragesByID") + defer span.End() + + tbl := table.Storages + stmt := tbl.SELECT(tbl.AllColumns).WHERE(tbl.ID.IN(lo.Map(ids, func(i int64, _ int) Expression { return Int64(i) })...)) + span.SetAttributes(semconv.DBStatementKey.String(stmt.DebugSql())) + + var storages []model.Storages + if err := stmt.QueryContext(ctx, svc.db, &storages); err != nil { + return nil, err + } + return storages, nil +} + +func (svc *Service) GetDefaultFS(ctx context.Context) (afero.Fs, error) { + st, err := svc.GetDefault(ctx) + if err != nil { + return nil, err + } + return svc.BuildFS(st) +} + +func (svc *Service) GetFSMapByIDs(ctx context.Context, ids ...int64) (map[int64]afero.Fs, error) { + _, span := otel.Start(ctx, "storages.service.GetFSByID") + defer span.End() + + storages, err := svc.GetStoragesByID(ctx, ids...) + if err != nil { + return nil, err + } + + maps := make(map[int64]afero.Fs) + for _, s := range storages { + fs, err := svc.BuildFS(&s) + if err != nil { + return nil, err + } + maps[s.ID] = fs + } + + return maps, nil +} + +// GetFSByID +func (svc *Service) GetFSByID(ctx context.Context, id int64) (afero.Fs, error) { + _, span := otel.Start(ctx, "storages.service.GetFSByID") + defer span.End() + + storage, err := svc.GetStorageByID(ctx, id) + if err != nil { + return nil, err + } + + return svc.BuildFS(storage) +} + +func (svc *Service) BuildFS(storage *model.Storages) (afero.Fs, error) { + switch storage.Type { + case fields.StorageTypeLocal: + return afero.NewBasePathFs(afero.NewOsFs(), storage.Config.Local.Path), nil + } + return nil, errors.New("invalid storage type") +} diff --git a/backend/app/jobs/post_delete_assets.go b/backend/app/jobs/post_delete_assets.go new file mode 100644 index 0000000..fd2a3b5 --- /dev/null +++ b/backend/app/jobs/post_delete_assets.go @@ -0,0 +1,107 @@ +package jobs + +import ( + "context" + "time" + + "backend/app/http/medias" + "backend/app/http/posts" + "backend/app/http/storages" + "backend/database/fields" + "backend/database/models/qvyun_v2/public/model" + + _ "git.ipao.vip/rogeecn/atom" + _ "git.ipao.vip/rogeecn/atom/contracts" + "github.com/pkg/errors" + . "github.com/riverqueue/river" + "github.com/samber/lo" + "github.com/sirupsen/logrus" +) + +var ( + _ JobArgs = (*PostDeleteAssetsJob)(nil) + _ JobArgsWithInsertOpts = (*PostDeleteAssetsJob)(nil) +) + +type PostDeleteAssetsJob struct { + PostID int64 +} + +// InsertOpts implements JobArgsWithInsertOpts. +func (s PostDeleteAssetsJob) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + UniqueOpts: UniqueOpts{ + ByArgs: true, + }, + } +} + +func (PostDeleteAssetsJob) Kind() string { + return "PostDeleteAssetsJob" +} + +var _ Worker[PostDeleteAssetsJob] = (*PostDeleteAssetsJobWorker)(nil) + +// @provider(job) +type PostDeleteAssetsJobWorker struct { + WorkerDefaults[PostDeleteAssetsJob] + + log *logrus.Entry `inject:"false"` + + postSvc *posts.Service + mediaSvc *medias.Service + storageSvc *storages.Service +} + +func (w *PostDeleteAssetsJobWorker) Prepare() error { + w.log = logrus.WithField("worker", "PostDeleteAssetsJobWorker") + return nil +} + +func (w *PostDeleteAssetsJobWorker) NextRetry(job *Job[PostDeleteAssetsJob]) time.Time { + return time.Now().Add(5 * time.Second) +} + +func (w *PostDeleteAssetsJobWorker) Work(ctx context.Context, job *Job[PostDeleteAssetsJob]) error { + post, err := w.postSvc.ForceGetPostByID(ctx, job.Args.PostID) + if err != nil { + return errors.Wrapf(err, "failed to get post(%d) by id", job.Args.PostID) + } + + hashes := lo.Map(post.Assets.Data, func(asset fields.MediaAsset, _ int) string { + return asset.Hash + }) + + medias, err := w.mediaSvc.GetMediasByHash(ctx, post.TenantID, post.UserID, hashes) + if err != nil { + return errors.Wrapf(err, "failed to get medias by hashes(%v)", hashes) + } + + storageIds := lo.Map(medias, func(media *model.Medias, _ int) int64 { return media.StorageID }) + storageMap, err := w.storageSvc.GetFSMapByIDs(ctx, storageIds...) + if err != nil { + return err + } + + // remove assets + for _, media := range medias { + st, ok := storageMap[media.StorageID] + if !ok { + continue + } + + if err := st.Remove(media.Path); err != nil { + return errors.Wrapf(err, "failed to remove media(%d)", media.ID) + } + } + + // delete all assets + ids := lo.Map(medias, func(media *model.Medias, _ int) int64 { return media.ID }) + if err := w.mediaSvc.DeleteByID(ctx, ids...); err != nil { + return errors.Wrapf(err, "failed to delete media(%d)", ids) + } + + return nil +} diff --git a/backend/pkg/storage/storage.go b/backend/pkg/storage/storage.go index 4d51abb..66da1b2 100644 --- a/backend/pkg/storage/storage.go +++ b/backend/pkg/storage/storage.go @@ -5,31 +5,8 @@ import ( "os" "path/filepath" "time" - - "backend/database/fields" - "backend/database/models/qvyun_v2/public/model" ) -func Build(m *model.Storages) Storage { - switch m.Type { - case fields.StorageTypeLocal: - return &Local{ - Host: m.Config.Local.Host, - Path: m.Config.Local.Path, - } - case fields.StorageTypeS3: - return &S3{ - Endpoint: m.Config.S3.Endpoint, - AccessKeyID: m.Config.S3.AccessKeyID, - AccessKeySecret: m.Config.S3.AccessKeySecret, - BucketName: m.Config.S3.BucketName, - Path: m.Config.S3.Path, - } - default: - panic("invalid storage type") - } -} - type Storage interface { Save(ctx context.Context, file *UploadedFile) (*UploadedFile, error) }