package medias import ( "bufio" "context" "database/sql" "fmt" "os" "path/filepath" "time" "backend/database/models/qvyun/public/model" "backend/database/models/qvyun/public/table" "backend/pkg/errorx" "backend/pkg/media_store" "backend/pkg/path" "backend/pkg/pg" "backend/providers/storage" . "github.com/go-jet/jet/v2/postgres" "github.com/grafov/m3u8" "github.com/pkg/errors" "github.com/samber/lo" "github.com/sirupsen/logrus" hashids "github.com/speps/go-hashids/v2" "github.com/spf13/cast" ) // @provider:except type Service struct { db *sql.DB hashIds *hashids.HashID storageConfig *storage.Config log *logrus.Entry `inject:"false"` } func (svc *Service) Prepare() error { svc.log = logrus.WithField("module", "medias.service") return nil } // GetMediaByHash func (svc *Service) GetMediaByHash(ctx context.Context, tenantId int64, hash string) (*model.Medias, error) { log := svc.log.WithField("method", "GetMediaByHash") tbl := table.Medias stmt := tbl. SELECT(tbl.AllColumns). WHERE( tbl.Hash.EQ(String(hash)).AND( tbl.TenantID.EQ(Int(tenantId)), ), ) log.Debug(stmt.DebugSql()) var m model.Medias if err := stmt.QueryContext(ctx, svc.db, &m); err != nil { return nil, errors.Wrapf(err, "get media by hash %s failed", hash) } return &m, nil } // GetByID func (svc *Service) GetMediaByID(ctx context.Context, tenantId, userId, id int64) (*model.Medias, error) { log := svc.log.WithField("method", "GetMediaByID") tbl := table.Medias stmt := tbl.SELECT(tbl.AllColumns).WHERE( tbl.ID.EQ(Int(id)).AND( tbl.TenantID.EQ(Int(tenantId)), ), ) log.Debug(stmt.DebugSql()) var m model.Medias if err := stmt.QueryContext(ctx, svc.db, &m); err != nil { return nil, errors.Wrap(err, "query media by id") } return &m, nil } func (svc *Service) ModelToListItem(ctx context.Context, m *model.Medias) *ListItem { item := &ListItem{ ID: m.ID, Poster: fmt.Sprintf("/posters/%d/%s.jpg", m.TenantID, m.Hash), Hash: m.Hash, Title: m.Title, Description: m.Description, Duration: m.Duration, Price: m.Price, Discount: m.Discount, Resources: m.Resources, CreatedAt: m.CreatedAt, UpdatedAt: m.UpdatedAt, Bought: false, } return item } // List func (svc *Service) List(ctx context.Context, tenantId, userId int64, filter *ListFilter) ([]*ListItem, error) { log := svc.log.WithField("method", "List") boughtIDs, err := svc.GetUserBoughtMedias(ctx, tenantId, userId) if err != nil { return nil, errors.Wrap(err, "get user bought medias") } tbl := table.Medias stmt := tbl. SELECT(tbl.AllColumns). ORDER_BY(tbl.ID.DESC()) cond := tbl.TenantID.EQ(Int(tenantId)) if filter.Title != nil && *filter.Title != "" { cond = cond.AND(tbl.Title.LIKE(String("%" + *filter.Title + "%"))) } if filter.Bought != nil && *filter.Bought { if len(boughtIDs) > 0 { cond = cond. AND(tbl.ID.IN(lo.Map(boughtIDs, func(item int64, _ int) Expression { return Int(item) })...)) } } else { cond = cond.AND(tbl.Publish.EQ(Bool(true))) } if filter.OffsetID > 0 { if filter.Action == 0 { cond = cond.AND(tbl.ID.LT(Int(filter.OffsetID))) stmt = stmt.LIMIT(10) } if filter.Action == 1 { cond = cond.AND(tbl.ID.GT(Int(filter.OffsetID))) } } else { stmt = stmt.LIMIT(10) } stmt = stmt.WHERE(cond) log.Debug(stmt.DebugSql()) var dest []model.Medias if err := stmt.QueryContext(ctx, svc.db, &dest); err != nil { return nil, errors.Wrap(err, "query medias") } items := lo.Map(dest, func(m model.Medias, _ int) *ListItem { item := svc.ModelToListItem(ctx, &m) if lo.Contains(boughtIDs, m.ID) { item.Bought = true } return item }) return items, nil } // GetUserBoughtMedias func (svc *Service) GetUserBoughtMedias(ctx context.Context, tenant int64, userID int64) ([]int64, error) { log := svc.log.WithField("method", "GetUserBoughtMedias") tbl := table.UserMedias stmt := tbl. SELECT(tbl.MediaID). WHERE( tbl.TenantID.EQ(Int(tenant)).AND( tbl.UserID.EQ(Int(userID)), ), ) log.Debug(stmt.Sql()) var mediaIDs []int64 if err := stmt.QueryContext(ctx, svc.db, &mediaIDs); err != nil { return nil, errors.Wrap(err, "query user bought medias") } return mediaIDs, nil } // HasUserBought func (svc *Service) HasUserBought(ctx context.Context, tenantId, userId, mediaId int64) (bool, error) { log := svc.log.WithField("method", "HasUserBought") tbl := table.UserMedias stmt := tbl. SELECT(COUNT(tbl.MediaID).AS("cnt")). WHERE( tbl.TenantID.EQ(Int(tenantId)).AND( tbl.UserID.EQ(Int(userId)).AND( tbl.MediaID.EQ(Int(mediaId)), ), ), ) log.Debug(stmt.DebugSql()) var m struct { Cnt int64 } if err := stmt.QueryContext(ctx, svc.db, &m); err != nil { return false, errors.Wrap(err, "query user bought media") } return m.Cnt > 0, nil } // Upsert func (svc *Service) Upsert(ctx context.Context, tenantId int64, item media_store.VideoInfo) error { log := svc.log.WithField("method", "Upsert") resources := pg.MediaResources{} if path.DirExists(filepath.Join(svc.storageConfig.Path, item.Hash, pg.MediaTypeVideo.String())) { resources = append(resources, pg.MediaTypeVideo) } if path.DirExists(filepath.Join(svc.storageConfig.Path, item.Hash, pg.MediaTypeAudio.String())) { resources = append(resources, pg.MediaTypeAudio) } if path.DirExists(filepath.Join(svc.storageConfig.Path, item.Hash, pg.MediaTypePdf.String())) { resources = append(resources, pg.MediaTypePdf) } tbl := table.Medias stmt := tbl. INSERT(tbl.TenantID, tbl.Hash, tbl.Title, tbl.Price, tbl.Duration, tbl.Resources, tbl.Publish). VALUES(Int(tenantId), String(item.Hash), String(item.Name), Int(item.Price()), Int(item.Duration), Json(resources.MustValue()), Bool(true)). ON_CONFLICT(tbl.Hash). DO_UPDATE( SET( tbl.Title.SET(String(item.Name)), tbl.Price.SET(Int(item.Price())), tbl.Resources.SET(Json(resources.MustValue())), tbl.Duration.SET(Int(item.Duration)), tbl.Publish.SET(Bool(true)), tbl.UpdatedAt.SET(TimestampT(time.Now())), ), ) log.Debug(stmt.DebugSql()) if _, err := stmt.ExecContext(ctx, svc.db); err != nil { return errors.Wrapf(err, "upsert media: %s %s", item.Hash, item.Name) } return nil } // get video m3u8 func (svc *Service) GetM3U8(ctx context.Context, tenantId int64, types pg.MediaType, hash string, bought bool) (m3u8.Playlist, error) { log := svc.log.WithField("method", "GetM3U8") indexPath := filepath.Join(svc.storageConfig.Path, fmt.Sprintf("%d", tenantId), hash, types.String(), "index.m3u8") log.Infof("m3u8 path: %s", indexPath) f, err := os.Open(indexPath) if err != nil { return nil, errors.Wrap(err, "open index file") } p, listType, err := m3u8.DecodeFrom(bufio.NewReader(f), true) if err != nil { return nil, errors.Wrap(err, "decode index file") } if listType != m3u8.MEDIA { return nil, errors.New("Invalid media file") } media, ok := p.(*m3u8.MediaPlaylist) if !ok { return nil, errors.New("Invalid media playlist") } if !bought { duration := 0 media.Segments = lo.Map(media.Segments, func(seg *m3u8.MediaSegment, _ int) *m3u8.MediaSegment { if seg == nil { return nil } duration += int(seg.Duration) if duration <= 55 { return seg } return nil }) } for _, seg := range media.Segments { if seg == nil { continue } // remove seg.URI ext, only keep the name name, ext := path.SplitNameExt(seg.URI) nameId, err := cast.ToInt64E(name) if err != nil { return nil, errors.Wrap(err, "cast index to int64") } // get video info hashID, err := svc.hashIds.EncodeInt64([]int64{nameId}) if err != nil { return nil, errors.Wrap(err, "encode hash id") } seg.URI = fmt.Sprintf("%s/%s.%s", types, hashID, ext) } return media, nil } // GetSegmentPath func (svc *Service) GetSegmentPath(ctx context.Context, t pg.MediaType, tenantId int64, hash string, segment int64) string { return filepath.Join(svc.storageConfig.Path, fmt.Sprintf("%d", tenantId), hash, t.String(), fmt.Sprintf("%d.ts", segment)) } func (svc *Service) Checkout(ctx context.Context, tenantId, userId, mediaId int64) error { log := svc.log.WithField("method", "Checkout") bought, err := svc.HasUserBought(ctx, tenantId, userId, mediaId) if err != nil { return errors.Wrap(err, "check user bought") } if bought { return nil } media, err := svc.GetMediaByID(ctx, tenantId, userId, mediaId) if err != nil { return errors.Wrap(err, "get media") } userBalance, err := svc.GetUserBalance(ctx, tenantId, userId) if err != nil { return errors.Wrap(err, "get user balance") } if userBalance < media.Price { return errorx.UserBalanceNotEnough } tx, err := svc.db.Begin() if err != nil { return errors.Wrap(err, "begin transaction") } defer tx.Rollback() tbl := table.UserMedias stmt := tbl. INSERT(tbl.TenantID, tbl.UserID, tbl.MediaID, tbl.Price). VALUES(Int(tenantId), Int(userId), Int(mediaId), Int(media.Price)) log.Debug(stmt.DebugSql()) if _, err := stmt.ExecContext(ctx, tx); err != nil { return errors.Wrap(err, "insert user media") } // update user balance tblUserTenants := table.UsersTenants stmtUserTenants := tblUserTenants. UPDATE(). SET( tblUserTenants.Balance.SET( tblUserTenants.Balance.SUB(Int(media.Price)), ), ). WHERE( tblUserTenants.TenantID.EQ(Int(tenantId)).AND( tblUserTenants.UserID.EQ(Int(userId)), ), ) log.Debug(stmtUserTenants.DebugSql()) if _, err := stmtUserTenants.ExecContext(ctx, tx); err != nil { return errors.Wrap(err, "update user balance") } // add user balance tblUserBalances := table.UserBalanceHistories stmtUserBalances := tblUserBalances. INSERT( tblUserBalances.TenantID, tblUserBalances.UserID, tblUserBalances.Balance, tblUserBalances.Target, tblUserBalances.Type, ). VALUES( Int(tenantId), Int(userId), Int(-media.Price), Json(pg.BalanceTarget{ ID: media.ID, Name: table.Medias.TableName(), }.MustValue()), String(pg.BalanceTypeConsume.String()), ) log.Debug(stmtUserBalances.DebugSql()) if _, err := stmtUserBalances.ExecContext(ctx, tx); err != nil { return errors.Wrap(err, "insert user balance") } if err := tx.Commit(); err != nil { return errors.Wrap(err, "commit transaction") } return nil } // GetUserBalance func (svc *Service) GetUserBalance(ctx context.Context, tenantId, userId int64) (int64, error) { log := svc.log.WithField("method", "GetUserBalance") tbl := table.UsersTenants stmt := tbl.SELECT(tbl.Balance.AS("balance")).WHERE( tbl.TenantID.EQ(Int(tenantId)).AND( tbl.UserID.EQ(Int(userId)), ), ) log.Debug(stmt.DebugSql()) var result struct { Balance int64 } if err := stmt.QueryContext(ctx, svc.db, &result); err != nil { return 0, errors.Wrap(err, "query user balance") } return result.Balance, nil }