diff --git a/backend/.air.toml b/backend/.air.toml index f0dcfc1..fa268b3 100644 --- a/backend/.air.toml +++ b/backend/.air.toml @@ -6,7 +6,7 @@ tmp_dir = "tmp" args_bin = [] bin = "./tmp/main serve" cmd = "go build -o ./tmp/main ." -delay = 1000 +delay = 2000 exclude_dir = ["assets", "tmp", "vendor", "testdata"] exclude_file = [] exclude_regex = ["_test.go"] diff --git a/backend/app/jobs/download_from_alioss.go b/backend/app/jobs/download_from_alioss.go index b6413d5..464e6c1 100644 --- a/backend/app/jobs/download_from_alioss.go +++ b/backend/app/jobs/download_from_alioss.go @@ -62,10 +62,6 @@ func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFr } dst := filepath.Join(w.app.StoragePath, media.Path) - if w.job.Add(&RemoveDownloadedVideo{FilePath: dst}); err != nil { - log.Errorf("Error removing original file: %v", err) - } - // check is path exist st, err := os.Stat(dst) if os.IsNotExist(err) { @@ -104,10 +100,5 @@ func (w *DownloadFromAliOSSWorker) NextJob(hash string) error { return err } - if err := w.job.Add(&VideoExtractHeadImage{MediaHash: hash}); err != nil { - log.Errorf("Error adding job: %v", err) - return err - } - return nil } diff --git a/backend/app/jobs/provider.gen.go b/backend/app/jobs/provider.gen.go index cac3332..3a017a6 100755 --- a/backend/app/jobs/provider.gen.go +++ b/backend/app/jobs/provider.gen.go @@ -58,6 +58,37 @@ func Provide(opts ...opt.Option) error { }, atom.GroupInitial); err != nil { return err } + if err := container.Container.Provide(func( + __job *job.Job, + app *app.Config, + job *job.Job, + oss *ali.OSSClient, + ) (contracts.Initial, error) { + obj := &PublishDraftPostsWorker{ + app: app, + job: job, + oss: oss, + } + if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { + return nil, err + } + + return obj, nil + }, atom.GroupInitial); err != nil { + return err + } + if err := container.Container.Provide(func( + __job *job.Job, + ) (contracts.Initial, error) { + obj := &RemoveFileWorker{} + if err := river.AddWorkerSafely(__job.Workers, obj); err != nil { + return nil, err + } + + return obj, nil + }, atom.GroupInitial); err != nil { + return err + } if err := container.Container.Provide(func( __job *job.Job, app *app.Config, diff --git a/backend/app/jobs/publish_draft_posts.go b/backend/app/jobs/publish_draft_posts.go new file mode 100644 index 0000000..ca36a17 --- /dev/null +++ b/backend/app/jobs/publish_draft_posts.go @@ -0,0 +1,107 @@ +package jobs + +import ( + "context" + "time" + + "quyun/app/models" + "quyun/database/fields" + "quyun/database/schemas/public/model" + "quyun/pkg/utils" + "quyun/providers/ali" + "quyun/providers/app" + "quyun/providers/job" + + . "github.com/riverqueue/river" + "github.com/samber/lo" + log "github.com/sirupsen/logrus" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = (*PublishDraftPosts)(nil) + +type PublishDraftPosts struct { + MediaHash string `json:"media_hash"` +} + +func (s PublishDraftPosts) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + } +} + +func (s PublishDraftPosts) Kind() string { return "publish_draft_posts" } +func (a PublishDraftPosts) UniqueID() string { return a.Kind() } + +var _ Worker[PublishDraftPosts] = (*PublishDraftPostsWorker)(nil) + +// @provider(job) +type PublishDraftPostsWorker struct { + WorkerDefaults[PublishDraftPosts] + + oss *ali.OSSClient + job *job.Job + app *app.Config +} + +func (w *PublishDraftPostsWorker) NextRetry(job *Job[PublishDraftPosts]) time.Time { + return time.Now().Add(30 * time.Second) +} + +func (w *PublishDraftPostsWorker) Work(ctx context.Context, job *Job[PublishDraftPosts]) error { + log := log.WithField("job", job.Args.Kind()) + + log.Infof("[Start] Working on job with strings: %+v", job.Args) + defer log.Infof("[End] Finished %s", job.Args.Kind()) + + media, err := models.Medias.GetByHash(ctx, job.Args.MediaHash) + if err != nil { + log.Errorf("Error getting media by ID: %v", err) + return JobCancel(err) + } + + relationMedias, err := models.Medias.GetRelations(ctx, media.Hash) + if err != nil { + log.Errorf("Error getting relation medias: %v", err) + return JobCancel(err) + } + + assets := lo.FilterMap(relationMedias, func(media *model.Medias, _ int) (fields.MediaAsset, bool) { + return fields.MediaAsset{ + Type: media.MimeType, + Media: media.ID, + Metas: &media.Metas.Data, + }, media.MimeType != "image/jpeg" + }) + assets = append(assets, fields.MediaAsset{ + Type: media.MimeType, + Media: media.ID, + Metas: &media.Metas.Data, + }) + + // publish a draft posts + post := &model.Posts{ + Status: fields.PostStatusDraft, + Title: utils.FormatTitle(media.Name), + Description: "", + Content: "", + Price: 0, + Discount: 100, + Views: 0, + Likes: 0, + Tags: fields.Json[[]string]{}, + Assets: fields.ToJson(assets), + HeadImages: fields.ToJson(lo.FilterMap(relationMedias, func(media *model.Medias, _ int) (int64, bool) { + return media.ID, media.MimeType == "image/jpeg" + })), + } + if err := models.Posts.Create(ctx, post); err != nil { + log.Errorf("Error creating post: %v", err) + return JobCancel(err) + } + log.Infof("Post created successfully with ID: %d", post.ID) + + return nil +} diff --git a/backend/app/jobs/remove_downloaded_video.go b/backend/app/jobs/remove_downloaded_video.go deleted file mode 100644 index 4d51b11..0000000 --- a/backend/app/jobs/remove_downloaded_video.go +++ /dev/null @@ -1,54 +0,0 @@ -package jobs - -import ( - "context" - "os" - "time" - - . "github.com/riverqueue/river" - log "github.com/sirupsen/logrus" - _ "go.ipao.vip/atom" - "go.ipao.vip/atom/contracts" -) - -var _ contracts.JobArgs = (*RemoveDownloadedVideo)(nil) - -type RemoveDownloadedVideo struct { - FilePath string `json:"file_path"` -} - -func (s RemoveDownloadedVideo) InsertOpts() InsertOpts { - return InsertOpts{ - Queue: QueueDefault, - Priority: PriorityDefault, - ScheduledAt: time.Now().Add(time.Minute * 10), - } -} - -func (s RemoveDownloadedVideo) Kind() string { return "remove_downloaded_video" } -func (a RemoveDownloadedVideo) UniqueID() string { return a.Kind() } - -var _ Worker[RemoveDownloadedVideo] = (*RemoveDownloadedVideoWorker)(nil) - -// @provider(job) -type RemoveDownloadedVideoWorker struct { - WorkerDefaults[RemoveDownloadedVideo] -} - -func (w *RemoveDownloadedVideoWorker) NextRetry(job *Job[RemoveDownloadedVideo]) time.Time { - return time.Now().Add(30 * time.Second) -} - -func (w *RemoveDownloadedVideoWorker) Work(ctx context.Context, job *Job[RemoveDownloadedVideo]) error { - log := log.WithField("job", job.Args.Kind()) - - log.Infof("[Start] Working on job with strings: %+v", job.Args) - defer log.Infof("[End] Finished %s", job.Args.Kind()) - - if err := os.Remove(job.Args.FilePath); err != nil { - log.Errorf("Error removing file: %v", err) - return err - } - - return nil -} diff --git a/backend/app/jobs/remove_file.go b/backend/app/jobs/remove_file.go new file mode 100644 index 0000000..ab4c658 --- /dev/null +++ b/backend/app/jobs/remove_file.go @@ -0,0 +1,61 @@ +package jobs + +import ( + "context" + "os" + "time" + + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" + _ "go.ipao.vip/atom" + "go.ipao.vip/atom/contracts" +) + +var _ contracts.JobArgs = (*RemoveFile)(nil) + +type RemoveFile struct { + FilePath string `json:"file_path"` +} + +func (s RemoveFile) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: QueueDefault, + Priority: PriorityDefault, + // ScheduledAt: time.Now().Add(time.Minute * 10), + } +} + +func (s RemoveFile) Kind() string { return "remove_file" } +func (a RemoveFile) UniqueID() string { return a.Kind() } + +var _ Worker[RemoveFile] = (*RemoveFileWorker)(nil) + +// @provider(job) +type RemoveFileWorker struct { + WorkerDefaults[RemoveFile] +} + +func (w *RemoveFileWorker) NextRetry(job *Job[RemoveFile]) time.Time { + return time.Now().Add(30 * time.Second) +} + +func (w *RemoveFileWorker) Work(ctx context.Context, job *Job[RemoveFile]) error { + log := log.WithField("job", job.Args.Kind()) + + log.Infof("[Start] Working on job with strings: %+v", job.Args) + defer log.Infof("[End] Finished %s", job.Args.Kind()) + + // Check if the file exists + if _, err := os.Stat(job.Args.FilePath); os.IsNotExist(err) { + log.Warn("File does not exist: %v", job.Args.FilePath) + return nil + } + // Remove the file + if err := os.Remove(job.Args.FilePath); err != nil { + log.Errorf("Error removing file: %v", err) + return err + } + log.Infof("File removed successfully: %v", job.Args.FilePath) + + return nil +} diff --git a/backend/app/jobs/video_extract_head_image.go b/backend/app/jobs/video_extract_head_image.go index 6100adf..54d5a20 100644 --- a/backend/app/jobs/video_extract_head_image.go +++ b/backend/app/jobs/video_extract_head_image.go @@ -62,7 +62,6 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx log.Errorf("Error getting media by ID: %v", err) return JobCancel(err) } - _ = media input := filepath.Join(w.app.StoragePath, media.Path) output := input[:len(input)-len(filepath.Ext(input))] + ".jpg" @@ -73,12 +72,6 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx } defer os.RemoveAll(output) - // Upload the image to OSS - if err := w.oss.Upload(ctx, output, filepath.Base(output)); err != nil { - log.Errorf("Error uploading image to OSS: %v", err) - return JobCancel(err) - } - fileSize, err := utils.GetFileSize(output) if err != nil { log.Errorf("Error getting file size: %v", err) @@ -92,15 +85,29 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx } filename := fileMd5 + filepath.Ext(output) + name := "[展示图]" + media.Name + ".jpg" + // create a new media record for the image imageMedia := &model.Medias{ CreatedAt: time.Now(), - Name: "[展示图]" + media.Name, + Name: name, MimeType: "image/jpeg", Size: fileSize, Path: w.oss.GetSavePath(filename), Hash: fileMd5, - Metas: fields.Json[fields.MediaMetas]{}, + Metas: fields.ToJson(fields.MediaMetas{ + ParentHash: media.Hash, + }), + } + + // upload to oss + if err := w.oss.Upload(ctx, output, imageMedia.Path); err != nil { + log.Errorf("Error uploading image to OSS: %v", err) + return JobCancel(err) + } + + if w.job.Add(&RemoveFile{FilePath: output}); err != nil { + log.Errorf("Error removing original file: %v", err) } if err := models.Medias.Create(ctx, imageMedia); err != nil { @@ -108,5 +115,15 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx return JobCancel(err) } + dst := filepath.Join(w.app.StoragePath, media.Path) + if w.job.Add(&RemoveFile{FilePath: dst}); err != nil { + log.Errorf("Error removing original file: %v", err) + } + + if w.job.Add(&PublishDraftPosts{MediaHash: media.Hash}); err != nil { + log.Errorf("Error adding job: %v", err) + return JobCancel(err) + } + return nil } diff --git a/backend/app/jobs/video_store_short.go b/backend/app/jobs/video_store_short.go index df60a50..a5a0585 100644 --- a/backend/app/jobs/video_store_short.go +++ b/backend/app/jobs/video_store_short.go @@ -100,6 +100,12 @@ func (w *VideoStoreShortWorker) Work(ctx context.Context, job *Job[VideoStoreSho }), } + // upload to oss + if err := w.oss.Upload(ctx, job.Args.FilePath, filePath); err != nil { + log.Errorf("Error uploading file to OSS: %v", err) + return JobCancel(err) + } + if err := models.Medias.Create(ctx, mediaModel); err != nil { log.Errorf("Error saving media record: %v data: %+v", err, mediaModel) return err @@ -107,5 +113,18 @@ func (w *VideoStoreShortWorker) Work(ctx context.Context, job *Job[VideoStoreSho log.Infof("Media record created with path: %s and hash: %s", filePath, fileMd5) + if w.job.Add(&RemoveFile{FilePath: job.Args.FilePath}); err != nil { + log.Errorf("Error removing original file: %v", err) + } + + return w.NextJob(media.Hash) +} + +func (w *VideoStoreShortWorker) NextJob(hash string) error { + if err := w.job.Add(&VideoExtractHeadImage{MediaHash: hash}); err != nil { + log.Errorf("Error adding job: %v", err) + return err + } + return nil } diff --git a/backend/app/models/medias.go b/backend/app/models/medias.go index 2d426a3..5d768f9 100644 --- a/backend/app/models/medias.go +++ b/backend/app/models/medias.go @@ -106,10 +106,10 @@ func (m *mediasModel) BatchCreate(ctx context.Context, models []*model.Medias) e func (m *mediasModel) Create(ctx context.Context, model *model.Medias) error { model.CreatedAt = time.Now() - stmt := table.Medias.INSERT(table.Medias.MutableColumns).MODEL(model) + stmt := table.Medias.INSERT(table.Medias.MutableColumns).MODEL(model).RETURNING(table.Medias.AllColumns) m.log.Infof("sql: %s", stmt.DebugSql()) - if _, err := stmt.ExecContext(ctx, db); err != nil { + if err := stmt.QueryContext(ctx, db, model); err != nil { m.log.Errorf("error creating media item: %v", err) return err } @@ -236,3 +236,24 @@ func (m *mediasModel) UpdateMetas(ctx context.Context, id int64, metas fields.Me m.log.Infof("media (%d) metas updated successfully", id) return nil } + +// GetRelationMedias +func (m *mediasModel) GetRelations(ctx context.Context, hash string) ([]*model.Medias, error) { + tbl := table.Medias + stmt := tbl. + SELECT(tbl.AllColumns). + WHERE( + RawBool("metas->>'parent_hash' = ?", RawArgs{"?": hash}), + ) + m.log.Infof("sql: %s", stmt.DebugSql()) + + var medias []model.Medias + if err := stmt.QueryContext(ctx, db, &medias); err != nil { + m.log.Errorf("error querying media items: %v", err) + return nil, err + } + + return lo.Map(medias, func(media model.Medias, _ int) *model.Medias { + return &media + }), nil +} diff --git a/backend/app/models/medias_test.go b/backend/app/models/medias_test.go index b576ca2..38432e9 100644 --- a/backend/app/models/medias_test.go +++ b/backend/app/models/medias_test.go @@ -188,3 +188,32 @@ func (s *MediasTestSuite) Test_Page() { }) }) } + +func (s *MediasTestSuite) Test_CreateGetID() { + Convey("Create", s.T(), func() { + model := &model.Medias{ + Name: fmt.Sprintf("test-%d", 1), + CreatedAt: time.Now(), + MimeType: "application/pdf", + Size: 100, + Path: "path/to/media.pdf", + } + + err := Medias.Create(context.Background(), model) + So(err, ShouldBeNil) + So(model.ID, ShouldNotBeEmpty) + + s.T().Logf("model id :%d", model.ID) + }) +} + +func (s *MediasTestSuite) Test_GetRelations() { + Convey("GetByHash", s.T(), func() { + hash := "ce4cd071128cef282cf315dda75bdab4" + media, err := Medias.GetRelations(context.Background(), hash) + So(err, ShouldBeNil) + So(media, ShouldNotBeNil) + + s.T().Logf("media: %+v", media) + }) +} diff --git a/backend/database/fields/posts.go b/backend/database/fields/posts.go index b22e42c..74f4f31 100644 --- a/backend/database/fields/posts.go +++ b/backend/database/fields/posts.go @@ -1,9 +1,10 @@ package fields type MediaAsset struct { - Type string `json:"type"` - Media int64 `json:"media"` - Mark *string `json:"mark,omitempty"` + Type string `json:"type"` + Media int64 `json:"media"` + Metas *MediaMetas `json:"metas,omitempty"` + Mark *string `json:"mark,omitempty"` } // swagger:enum PostStatus diff --git a/backend/pkg/utils/exec.go b/backend/pkg/utils/exec.go index 5c9207f..b8dd570 100644 --- a/backend/pkg/utils/exec.go +++ b/backend/pkg/utils/exec.go @@ -12,6 +12,7 @@ import ( // ExecCommand executes a command and streams its output in real-time func ExecCommand(name string, args ...string) error { + log.Infof("Executing command: %s %v", name, args) cmd := exec.Command(name, args...) stdout, err := cmd.StdoutPipe() @@ -58,6 +59,7 @@ func ExecCommand(name string, args ...string) error { // ExecCommandOutput executes a command and returns its output func ExecCommandOutput(name string, args ...string) ([]byte, error) { + log.Infof("Executing command: %s %v", name, args) cmd := exec.Command(name, args...) output, err := cmd.Output() if err != nil { diff --git a/backend/pkg/utils/posts.go b/backend/pkg/utils/posts.go new file mode 100644 index 0000000..62be5a6 --- /dev/null +++ b/backend/pkg/utils/posts.go @@ -0,0 +1,45 @@ +package utils + +import "strings" + +// FormatTitle +// Format the title of a media file by replacing spaces with underscores and removing special characters +func FormatTitle(title string) string { + // remove file ext from title + if strings.Contains(title, ".") { + title = strings.Split(title, ".")[0] + } + + // replace all spaces with underscores + replacements := []string{ + " ", "", + "!", "", + "@", "", + "#", "", + "$", "", + "%", "", + "^", "", + "&", "", + "*", "", + "(", "(", + ")", ")", + "[", "【", + "]", "】", + "{", "《", + "}", "》", + ":", ":", + ";", "", + "'", "", + "\"", "", + "<", "", + ">", "", + ",", "", + ".", "", + "?", "", + } + + replacer := strings.NewReplacer(replacements...) + title = replacer.Replace(title) + + return title +} diff --git a/backend/providers/ali/oss_client.go b/backend/providers/ali/oss_client.go index 34f774b..8a761ff 100644 --- a/backend/providers/ali/oss_client.go +++ b/backend/providers/ali/oss_client.go @@ -77,7 +77,7 @@ func (c *OSSClient) Delete(ctx context.Context, path string) error { func (c *OSSClient) Upload(ctx context.Context, input, dst string) error { request := &oss.PutObjectRequest{ Bucket: oss.Ptr(c.config.Bucket), - Key: oss.Ptr(c.GetSavePath(dst)), + Key: oss.Ptr(dst), } if _, err := c.internalClient.PutObjectFromFile(ctx, request, input); err != nil { diff --git a/fixtures/quyun/959e5310105c96e653f10b74e5bdc36b-short.mp4 b/fixtures/quyun/959e5310105c96e653f10b74e5bdc36b-short.mp4 deleted file mode 100644 index 70a91c1..0000000 Binary files a/fixtures/quyun/959e5310105c96e653f10b74e5bdc36b-short.mp4 and /dev/null differ diff --git a/fixtures/quyun/959e5310105c96e653f10b74e5bdc36b.mp4 b/fixtures/quyun/959e5310105c96e653f10b74e5bdc36b.mp4 deleted file mode 100644 index adc14ed..0000000 Binary files a/fixtures/quyun/959e5310105c96e653f10b74e5bdc36b.mp4 and /dev/null differ diff --git a/fixtures/quyun/ce4cd071128cef282cf315dda75bdab4-short.mp4 b/fixtures/quyun/ce4cd071128cef282cf315dda75bdab4-short.mp4 deleted file mode 100644 index 277026e..0000000 Binary files a/fixtures/quyun/ce4cd071128cef282cf315dda75bdab4-short.mp4 and /dev/null differ diff --git a/fixtures/quyun/ce4cd071128cef282cf315dda75bdab4.mp4 b/fixtures/quyun/ce4cd071128cef282cf315dda75bdab4.mp4 deleted file mode 100644 index 52db5eb..0000000 Binary files a/fixtures/quyun/ce4cd071128cef282cf315dda75bdab4.mp4 and /dev/null differ diff --git a/frontend/admin/src/pages/PostCreatePage.vue b/frontend/admin/src/pages/PostCreatePage.vue index e633260..481badd 100644 --- a/frontend/admin/src/pages/PostCreatePage.vue +++ b/frontend/admin/src/pages/PostCreatePage.vue @@ -1,6 +1,7 @@