feat: update jobs

This commit is contained in:
yanghao05
2025-04-22 20:01:50 +08:00
parent 163a7c11fe
commit 707cbbb639
21 changed files with 359 additions and 87 deletions

View File

@@ -6,7 +6,7 @@ tmp_dir = "tmp"
args_bin = []
bin = "./tmp/main serve"
cmd = "go build -o ./tmp/main ."
delay = 1000
delay = 2000
exclude_dir = ["assets", "tmp", "vendor", "testdata"]
exclude_file = []
exclude_regex = ["_test.go"]

View File

@@ -62,10 +62,6 @@ func (w *DownloadFromAliOSSWorker) Work(ctx context.Context, job *Job[DownloadFr
}
dst := filepath.Join(w.app.StoragePath, media.Path)
if w.job.Add(&RemoveDownloadedVideo{FilePath: dst}); err != nil {
log.Errorf("Error removing original file: %v", err)
}
// check is path exist
st, err := os.Stat(dst)
if os.IsNotExist(err) {
@@ -104,10 +100,5 @@ func (w *DownloadFromAliOSSWorker) NextJob(hash string) error {
return err
}
if err := w.job.Add(&VideoExtractHeadImage{MediaHash: hash}); err != nil {
log.Errorf("Error adding job: %v", err)
return err
}
return nil
}

View File

@@ -58,6 +58,37 @@ func Provide(opts ...opt.Option) error {
}, atom.GroupInitial); err != nil {
return err
}
if err := container.Container.Provide(func(
__job *job.Job,
app *app.Config,
job *job.Job,
oss *ali.OSSClient,
) (contracts.Initial, error) {
obj := &PublishDraftPostsWorker{
app: app,
job: job,
oss: oss,
}
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
return nil, err
}
return obj, nil
}, atom.GroupInitial); err != nil {
return err
}
if err := container.Container.Provide(func(
__job *job.Job,
) (contracts.Initial, error) {
obj := &RemoveFileWorker{}
if err := river.AddWorkerSafely(__job.Workers, obj); err != nil {
return nil, err
}
return obj, nil
}, atom.GroupInitial); err != nil {
return err
}
if err := container.Container.Provide(func(
__job *job.Job,
app *app.Config,

View File

@@ -0,0 +1,107 @@
package jobs
import (
"context"
"time"
"quyun/app/models"
"quyun/database/fields"
"quyun/database/schemas/public/model"
"quyun/pkg/utils"
"quyun/providers/ali"
"quyun/providers/app"
"quyun/providers/job"
. "github.com/riverqueue/river"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = (*PublishDraftPosts)(nil)
type PublishDraftPosts struct {
MediaHash string `json:"media_hash"`
}
func (s PublishDraftPosts) InsertOpts() InsertOpts {
return InsertOpts{
Queue: QueueDefault,
Priority: PriorityDefault,
}
}
func (s PublishDraftPosts) Kind() string { return "publish_draft_posts" }
func (a PublishDraftPosts) UniqueID() string { return a.Kind() }
var _ Worker[PublishDraftPosts] = (*PublishDraftPostsWorker)(nil)
// @provider(job)
type PublishDraftPostsWorker struct {
WorkerDefaults[PublishDraftPosts]
oss *ali.OSSClient
job *job.Job
app *app.Config
}
func (w *PublishDraftPostsWorker) NextRetry(job *Job[PublishDraftPosts]) time.Time {
return time.Now().Add(30 * time.Second)
}
func (w *PublishDraftPostsWorker) Work(ctx context.Context, job *Job[PublishDraftPosts]) error {
log := log.WithField("job", job.Args.Kind())
log.Infof("[Start] Working on job with strings: %+v", job.Args)
defer log.Infof("[End] Finished %s", job.Args.Kind())
media, err := models.Medias.GetByHash(ctx, job.Args.MediaHash)
if err != nil {
log.Errorf("Error getting media by ID: %v", err)
return JobCancel(err)
}
relationMedias, err := models.Medias.GetRelations(ctx, media.Hash)
if err != nil {
log.Errorf("Error getting relation medias: %v", err)
return JobCancel(err)
}
assets := lo.FilterMap(relationMedias, func(media *model.Medias, _ int) (fields.MediaAsset, bool) {
return fields.MediaAsset{
Type: media.MimeType,
Media: media.ID,
Metas: &media.Metas.Data,
}, media.MimeType != "image/jpeg"
})
assets = append(assets, fields.MediaAsset{
Type: media.MimeType,
Media: media.ID,
Metas: &media.Metas.Data,
})
// publish a draft posts
post := &model.Posts{
Status: fields.PostStatusDraft,
Title: utils.FormatTitle(media.Name),
Description: "",
Content: "",
Price: 0,
Discount: 100,
Views: 0,
Likes: 0,
Tags: fields.Json[[]string]{},
Assets: fields.ToJson(assets),
HeadImages: fields.ToJson(lo.FilterMap(relationMedias, func(media *model.Medias, _ int) (int64, bool) {
return media.ID, media.MimeType == "image/jpeg"
})),
}
if err := models.Posts.Create(ctx, post); err != nil {
log.Errorf("Error creating post: %v", err)
return JobCancel(err)
}
log.Infof("Post created successfully with ID: %d", post.ID)
return nil
}

View File

@@ -1,54 +0,0 @@
package jobs
import (
"context"
"os"
"time"
. "github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = (*RemoveDownloadedVideo)(nil)
type RemoveDownloadedVideo struct {
FilePath string `json:"file_path"`
}
func (s RemoveDownloadedVideo) InsertOpts() InsertOpts {
return InsertOpts{
Queue: QueueDefault,
Priority: PriorityDefault,
ScheduledAt: time.Now().Add(time.Minute * 10),
}
}
func (s RemoveDownloadedVideo) Kind() string { return "remove_downloaded_video" }
func (a RemoveDownloadedVideo) UniqueID() string { return a.Kind() }
var _ Worker[RemoveDownloadedVideo] = (*RemoveDownloadedVideoWorker)(nil)
// @provider(job)
type RemoveDownloadedVideoWorker struct {
WorkerDefaults[RemoveDownloadedVideo]
}
func (w *RemoveDownloadedVideoWorker) NextRetry(job *Job[RemoveDownloadedVideo]) time.Time {
return time.Now().Add(30 * time.Second)
}
func (w *RemoveDownloadedVideoWorker) Work(ctx context.Context, job *Job[RemoveDownloadedVideo]) error {
log := log.WithField("job", job.Args.Kind())
log.Infof("[Start] Working on job with strings: %+v", job.Args)
defer log.Infof("[End] Finished %s", job.Args.Kind())
if err := os.Remove(job.Args.FilePath); err != nil {
log.Errorf("Error removing file: %v", err)
return err
}
return nil
}

View File

@@ -0,0 +1,61 @@
package jobs
import (
"context"
"os"
"time"
. "github.com/riverqueue/river"
log "github.com/sirupsen/logrus"
_ "go.ipao.vip/atom"
"go.ipao.vip/atom/contracts"
)
var _ contracts.JobArgs = (*RemoveFile)(nil)
type RemoveFile struct {
FilePath string `json:"file_path"`
}
func (s RemoveFile) InsertOpts() InsertOpts {
return InsertOpts{
Queue: QueueDefault,
Priority: PriorityDefault,
// ScheduledAt: time.Now().Add(time.Minute * 10),
}
}
func (s RemoveFile) Kind() string { return "remove_file" }
func (a RemoveFile) UniqueID() string { return a.Kind() }
var _ Worker[RemoveFile] = (*RemoveFileWorker)(nil)
// @provider(job)
type RemoveFileWorker struct {
WorkerDefaults[RemoveFile]
}
func (w *RemoveFileWorker) NextRetry(job *Job[RemoveFile]) time.Time {
return time.Now().Add(30 * time.Second)
}
func (w *RemoveFileWorker) Work(ctx context.Context, job *Job[RemoveFile]) error {
log := log.WithField("job", job.Args.Kind())
log.Infof("[Start] Working on job with strings: %+v", job.Args)
defer log.Infof("[End] Finished %s", job.Args.Kind())
// Check if the file exists
if _, err := os.Stat(job.Args.FilePath); os.IsNotExist(err) {
log.Warn("File does not exist: %v", job.Args.FilePath)
return nil
}
// Remove the file
if err := os.Remove(job.Args.FilePath); err != nil {
log.Errorf("Error removing file: %v", err)
return err
}
log.Infof("File removed successfully: %v", job.Args.FilePath)
return nil
}

View File

@@ -62,7 +62,6 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx
log.Errorf("Error getting media by ID: %v", err)
return JobCancel(err)
}
_ = media
input := filepath.Join(w.app.StoragePath, media.Path)
output := input[:len(input)-len(filepath.Ext(input))] + ".jpg"
@@ -73,12 +72,6 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx
}
defer os.RemoveAll(output)
// Upload the image to OSS
if err := w.oss.Upload(ctx, output, filepath.Base(output)); err != nil {
log.Errorf("Error uploading image to OSS: %v", err)
return JobCancel(err)
}
fileSize, err := utils.GetFileSize(output)
if err != nil {
log.Errorf("Error getting file size: %v", err)
@@ -92,15 +85,29 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx
}
filename := fileMd5 + filepath.Ext(output)
name := "[展示图]" + media.Name + ".jpg"
// create a new media record for the image
imageMedia := &model.Medias{
CreatedAt: time.Now(),
Name: "[展示图]" + media.Name,
Name: name,
MimeType: "image/jpeg",
Size: fileSize,
Path: w.oss.GetSavePath(filename),
Hash: fileMd5,
Metas: fields.Json[fields.MediaMetas]{},
Metas: fields.ToJson(fields.MediaMetas{
ParentHash: media.Hash,
}),
}
// upload to oss
if err := w.oss.Upload(ctx, output, imageMedia.Path); err != nil {
log.Errorf("Error uploading image to OSS: %v", err)
return JobCancel(err)
}
if w.job.Add(&RemoveFile{FilePath: output}); err != nil {
log.Errorf("Error removing original file: %v", err)
}
if err := models.Medias.Create(ctx, imageMedia); err != nil {
@@ -108,5 +115,15 @@ func (w *VideoExtractHeadImageWorker) Work(ctx context.Context, job *Job[VideoEx
return JobCancel(err)
}
dst := filepath.Join(w.app.StoragePath, media.Path)
if w.job.Add(&RemoveFile{FilePath: dst}); err != nil {
log.Errorf("Error removing original file: %v", err)
}
if w.job.Add(&PublishDraftPosts{MediaHash: media.Hash}); err != nil {
log.Errorf("Error adding job: %v", err)
return JobCancel(err)
}
return nil
}

View File

@@ -100,6 +100,12 @@ func (w *VideoStoreShortWorker) Work(ctx context.Context, job *Job[VideoStoreSho
}),
}
// upload to oss
if err := w.oss.Upload(ctx, job.Args.FilePath, filePath); err != nil {
log.Errorf("Error uploading file to OSS: %v", err)
return JobCancel(err)
}
if err := models.Medias.Create(ctx, mediaModel); err != nil {
log.Errorf("Error saving media record: %v data: %+v", err, mediaModel)
return err
@@ -107,5 +113,18 @@ func (w *VideoStoreShortWorker) Work(ctx context.Context, job *Job[VideoStoreSho
log.Infof("Media record created with path: %s and hash: %s", filePath, fileMd5)
if w.job.Add(&RemoveFile{FilePath: job.Args.FilePath}); err != nil {
log.Errorf("Error removing original file: %v", err)
}
return w.NextJob(media.Hash)
}
func (w *VideoStoreShortWorker) NextJob(hash string) error {
if err := w.job.Add(&VideoExtractHeadImage{MediaHash: hash}); err != nil {
log.Errorf("Error adding job: %v", err)
return err
}
return nil
}

View File

@@ -106,10 +106,10 @@ func (m *mediasModel) BatchCreate(ctx context.Context, models []*model.Medias) e
func (m *mediasModel) Create(ctx context.Context, model *model.Medias) error {
model.CreatedAt = time.Now()
stmt := table.Medias.INSERT(table.Medias.MutableColumns).MODEL(model)
stmt := table.Medias.INSERT(table.Medias.MutableColumns).MODEL(model).RETURNING(table.Medias.AllColumns)
m.log.Infof("sql: %s", stmt.DebugSql())
if _, err := stmt.ExecContext(ctx, db); err != nil {
if err := stmt.QueryContext(ctx, db, model); err != nil {
m.log.Errorf("error creating media item: %v", err)
return err
}
@@ -236,3 +236,24 @@ func (m *mediasModel) UpdateMetas(ctx context.Context, id int64, metas fields.Me
m.log.Infof("media (%d) metas updated successfully", id)
return nil
}
// GetRelationMedias
func (m *mediasModel) GetRelations(ctx context.Context, hash string) ([]*model.Medias, error) {
tbl := table.Medias
stmt := tbl.
SELECT(tbl.AllColumns).
WHERE(
RawBool("metas->>'parent_hash' = ?", RawArgs{"?": hash}),
)
m.log.Infof("sql: %s", stmt.DebugSql())
var medias []model.Medias
if err := stmt.QueryContext(ctx, db, &medias); err != nil {
m.log.Errorf("error querying media items: %v", err)
return nil, err
}
return lo.Map(medias, func(media model.Medias, _ int) *model.Medias {
return &media
}), nil
}

View File

@@ -188,3 +188,32 @@ func (s *MediasTestSuite) Test_Page() {
})
})
}
func (s *MediasTestSuite) Test_CreateGetID() {
Convey("Create", s.T(), func() {
model := &model.Medias{
Name: fmt.Sprintf("test-%d", 1),
CreatedAt: time.Now(),
MimeType: "application/pdf",
Size: 100,
Path: "path/to/media.pdf",
}
err := Medias.Create(context.Background(), model)
So(err, ShouldBeNil)
So(model.ID, ShouldNotBeEmpty)
s.T().Logf("model id :%d", model.ID)
})
}
func (s *MediasTestSuite) Test_GetRelations() {
Convey("GetByHash", s.T(), func() {
hash := "ce4cd071128cef282cf315dda75bdab4"
media, err := Medias.GetRelations(context.Background(), hash)
So(err, ShouldBeNil)
So(media, ShouldNotBeNil)
s.T().Logf("media: %+v", media)
})
}

View File

@@ -1,9 +1,10 @@
package fields
type MediaAsset struct {
Type string `json:"type"`
Media int64 `json:"media"`
Mark *string `json:"mark,omitempty"`
Type string `json:"type"`
Media int64 `json:"media"`
Metas *MediaMetas `json:"metas,omitempty"`
Mark *string `json:"mark,omitempty"`
}
// swagger:enum PostStatus

View File

@@ -12,6 +12,7 @@ import (
// ExecCommand executes a command and streams its output in real-time
func ExecCommand(name string, args ...string) error {
log.Infof("Executing command: %s %v", name, args)
cmd := exec.Command(name, args...)
stdout, err := cmd.StdoutPipe()
@@ -58,6 +59,7 @@ func ExecCommand(name string, args ...string) error {
// ExecCommandOutput executes a command and returns its output
func ExecCommandOutput(name string, args ...string) ([]byte, error) {
log.Infof("Executing command: %s %v", name, args)
cmd := exec.Command(name, args...)
output, err := cmd.Output()
if err != nil {

View File

@@ -0,0 +1,45 @@
package utils
import "strings"
// FormatTitle
// Format the title of a media file by replacing spaces with underscores and removing special characters
func FormatTitle(title string) string {
// remove file ext from title
if strings.Contains(title, ".") {
title = strings.Split(title, ".")[0]
}
// replace all spaces with underscores
replacements := []string{
" ", "",
"!", "",
"@", "",
"#", "",
"$", "",
"%", "",
"^", "",
"&", "",
"*", "",
"(", "",
")", "",
"[", "【",
"]", "】",
"{", "《",
"}", "》",
":", "",
";", "",
"'", "",
"\"", "",
"<", "",
">", "",
",", "",
".", "",
"?", "",
}
replacer := strings.NewReplacer(replacements...)
title = replacer.Replace(title)
return title
}

View File

@@ -77,7 +77,7 @@ func (c *OSSClient) Delete(ctx context.Context, path string) error {
func (c *OSSClient) Upload(ctx context.Context, input, dst string) error {
request := &oss.PutObjectRequest{
Bucket: oss.Ptr(c.config.Bucket),
Key: oss.Ptr(c.GetSavePath(dst)),
Key: oss.Ptr(dst),
}
if _, err := c.internalClient.PutObjectFromFile(ctx, request, input); err != nil {