Files
mp-qvyun/backend/modules/medias/service.go
2024-12-28 16:05:25 +08:00

466 lines
12 KiB
Go

package medias
import (
"bufio"
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"time"
"backend/database/models/qvyun/public/model"
"backend/database/models/qvyun/public/table"
"backend/pkg/errorx"
"backend/pkg/media_store"
"backend/pkg/path"
"backend/pkg/pg"
"backend/providers/storage"
. "github.com/go-jet/jet/v2/postgres"
"github.com/go-jet/jet/v2/qrm"
"github.com/grafov/m3u8"
"github.com/pkg/errors"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
hashids "github.com/speps/go-hashids/v2"
"github.com/spf13/cast"
)
// @provider:except
type Service struct {
db *sql.DB
hashIds *hashids.HashID
storageConfig *storage.Config
log *logrus.Entry `inject:"false"`
}
func (svc *Service) Prepare() error {
svc.log = logrus.WithField("module", "medias.service")
return nil
}
// GetMediaByHash
func (svc *Service) GetMediaByHash(ctx context.Context, tenantId int64, hash string) (*model.Medias, error) {
log := svc.log.WithField("method", "GetMediaByHash")
tbl := table.Medias
stmt := tbl.
SELECT(tbl.AllColumns).
WHERE(
tbl.Hash.EQ(String(hash)).AND(
tbl.TenantID.EQ(Int(tenantId)),
),
)
log.Debug(stmt.DebugSql())
var m model.Medias
if err := stmt.QueryContext(ctx, svc.db, &m); err != nil {
return nil, errors.Wrapf(err, "get media by hash %s failed", hash)
}
return &m, nil
}
// GetByID
func (svc *Service) GetMediaByID(ctx context.Context, tenantId, userId, id int64) (*model.Medias, error) {
log := svc.log.WithField("method", "GetMediaByID")
tbl := table.Medias
stmt := tbl.SELECT(tbl.AllColumns).WHERE(
tbl.ID.EQ(Int(id)).AND(
tbl.TenantID.EQ(Int(tenantId)),
),
)
log.Debug(stmt.DebugSql())
var m model.Medias
if err := stmt.QueryContext(ctx, svc.db, &m); err != nil {
return nil, errors.Wrap(err, "query media by id")
}
return &m, nil
}
func (svc *Service) ModelToListItem(ctx context.Context, m *model.Medias) *ListItem {
item := &ListItem{
ID: m.ID,
Poster: fmt.Sprintf("/posters/%d/%s.jpg", m.TenantID, m.Hash),
Hash: m.Hash,
Title: m.Title,
Description: m.Description,
Duration: m.Duration,
Price: m.Price,
Discount: m.Discount,
Resources: m.Resources,
CreatedAt: m.CreatedAt,
UpdatedAt: m.UpdatedAt,
Bought: false,
}
return item
}
// List
func (svc *Service) List(ctx context.Context, tenantId, userId int64, filter *ListFilter) ([]*ListItem, error) {
log := svc.log.WithField("method", "List")
boughtIDs, err := svc.GetUserBoughtMedias(ctx, tenantId, userId)
if err != nil {
return nil, errors.Wrap(err, "get user bought medias")
}
tbl := table.Medias
stmt := tbl.
SELECT(tbl.AllColumns).
ORDER_BY(tbl.ID.DESC())
cond := tbl.TenantID.EQ(Int(tenantId))
if filter.Title != nil && *filter.Title != "" {
cond = cond.AND(tbl.Title.LIKE(String("%" + *filter.Title + "%")))
}
if filter.Bought != nil && *filter.Bought {
if len(boughtIDs) == 0 {
return []*ListItem{}, nil
}
cond = cond.
AND(tbl.ID.IN(lo.Map(boughtIDs, func(item int64, _ int) Expression {
return Int(item)
})...))
} else {
cond = cond.AND(tbl.Publish.EQ(Bool(true)))
}
if filter.OffsetID > 0 {
if filter.Action == 0 {
cond = cond.AND(tbl.ID.LT(Int(filter.OffsetID)))
stmt = stmt.LIMIT(10)
}
if filter.Action == 1 {
cond = cond.AND(tbl.ID.GT(Int(filter.OffsetID)))
}
} else {
stmt = stmt.LIMIT(10)
}
stmt = stmt.WHERE(cond)
log.Debug(stmt.DebugSql())
var dest []model.Medias
if err := stmt.QueryContext(ctx, svc.db, &dest); err != nil {
return nil, errors.Wrap(err, "query medias")
}
items := lo.Map(dest, func(m model.Medias, _ int) *ListItem {
item := svc.ModelToListItem(ctx, &m)
if lo.Contains(boughtIDs, m.ID) {
item.Bought = true
}
return item
})
return items, nil
}
// GetUserBoughtMedias
func (svc *Service) GetUserBoughtMedias(ctx context.Context, tenant, userID int64) ([]int64, error) {
log := svc.log.WithField("method", "GetUserBoughtMedias")
tbl := table.UserMedias
stmt := tbl.
SELECT(tbl.MediaID).
WHERE(
tbl.TenantID.EQ(Int(tenant)).AND(
tbl.UserID.EQ(Int(userID)),
),
)
log.Debug(stmt.Sql())
var mediaIDs []int64
if err := stmt.QueryContext(ctx, svc.db, &mediaIDs); err != nil {
return nil, errors.Wrap(err, "query user bought medias")
}
return mediaIDs, nil
}
// HasUserBought
func (svc *Service) HasUserBought(ctx context.Context, tenantId, userId, mediaId int64) (bool, error) {
log := svc.log.WithField("method", "HasUserBought")
tbl := table.UserMedias
stmt := tbl.
SELECT(COUNT(tbl.MediaID).AS("cnt")).
WHERE(
tbl.TenantID.EQ(Int(tenantId)).AND(
tbl.UserID.EQ(Int(userId)).AND(
tbl.MediaID.EQ(Int(mediaId)),
),
),
)
log.Debug(stmt.DebugSql())
var m struct {
Cnt int64
}
if err := stmt.QueryContext(ctx, svc.db, &m); err != nil {
return false, errors.Wrap(err, "query user bought media")
}
return m.Cnt > 0, nil
}
// Upsert
func (svc *Service) Upsert(ctx context.Context, tenantId int64, item media_store.VideoInfo) error {
log := svc.log.WithField("method", "Upsert")
tenantIdStr := fmt.Sprintf("%d", tenantId)
resources := pg.MediaResources{}
if path.DirExists(filepath.Join(svc.storageConfig.Path, tenantIdStr, item.Hash, pg.MediaTypeVideo.String())) {
resources = append(resources, pg.MediaTypeVideo)
}
if path.DirExists(filepath.Join(svc.storageConfig.Path, tenantIdStr, item.Hash, pg.MediaTypeAudio.String())) {
resources = append(resources, pg.MediaTypeAudio)
}
if path.DirExists(filepath.Join(svc.storageConfig.Path, tenantIdStr, item.Hash, pg.MediaTypePdf.String())) {
resources = append(resources, pg.MediaTypePdf)
}
tbl := table.Medias
// if hash exists then update
stmt := tbl.SELECT(tbl.ID.AS("id")).WHERE(tbl.Hash.EQ(String(item.Hash)))
log.Debug(stmt.DebugSql())
var m struct {
ID int64
}
if err := stmt.QueryContext(ctx, svc.db, &m); err != nil && !errors.Is(err, qrm.ErrNoRows) {
return errors.Wrapf(err, "query media by hash %s", item.Hash)
}
if m.ID > 0 {
// update media
stmt2 := tbl.
UPDATE().
SET(
tbl.Title.SET(String(item.Name)),
tbl.Price.SET(Int(item.Price())),
tbl.Resources.SET(Json(resources.MustValue())),
tbl.Duration.SET(Int(item.Duration)),
tbl.Publish.SET(Bool(true)),
tbl.UpdatedAt.SET(TimestampT(time.Now())),
).
WHERE(tbl.ID.EQ(Int(m.ID)))
log.Debug(stmt2.DebugSql())
if _, err := stmt2.ExecContext(ctx, svc.db); err != nil {
return errors.Wrapf(err, "update media: %s %s", item.Hash, item.Name)
}
svc.log.Infof("update media: %d %s %s", m.ID, item.Hash, item.Name)
return nil
}
stmt3 := tbl.
INSERT(tbl.TenantID, tbl.Hash, tbl.Title, tbl.Price, tbl.Duration, tbl.Resources, tbl.Publish).
VALUES(Int(tenantId), String(item.Hash), String(item.Name), Int(item.Price()), Int(item.Duration), Json(resources.MustValue()), Bool(true)).
ON_CONFLICT(tbl.Hash).
DO_NOTHING()
log.Debug(stmt3.DebugSql())
if _, err := stmt3.ExecContext(ctx, svc.db); err != nil {
return errors.Wrapf(err, "insert into media: %s %s", item.Hash, item.Name)
}
svc.log.Infof("insert media: %s %s", item.Hash, item.Name)
return nil
}
// get video m3u8
func (svc *Service) GetM3U8(ctx context.Context, tenantId int64, types pg.MediaType, hash string, bought bool, token string) (m3u8.Playlist, error) {
log := svc.log.WithField("method", "GetM3U8")
indexPath := filepath.Join(svc.storageConfig.Path, fmt.Sprintf("%d", tenantId), hash, types.String(), "index.m3u8")
log.Infof("m3u8 path: %s", indexPath)
f, err := os.Open(indexPath)
if err != nil {
return nil, errors.Wrap(err, "open index file")
}
p, listType, err := m3u8.DecodeFrom(bufio.NewReader(f), true)
if err != nil {
return nil, errors.Wrap(err, "decode index file")
}
if listType != m3u8.MEDIA {
return nil, errors.New("Invalid media file")
}
media, ok := p.(*m3u8.MediaPlaylist)
if !ok {
return nil, errors.New("Invalid media playlist")
}
if !bought {
duration := 0
media.Segments = lo.Map(media.Segments, func(seg *m3u8.MediaSegment, _ int) *m3u8.MediaSegment {
if seg == nil {
return nil
}
duration += int(seg.Duration)
if duration <= 55 {
return seg
}
return nil
})
}
for _, seg := range media.Segments {
if seg == nil {
continue
}
// remove seg.URI ext, only keep the name
name, ext := path.SplitNameExt(seg.URI)
nameId, err := cast.ToInt64E(name)
if err != nil {
return nil, errors.Wrap(err, "cast index to int64")
}
// get video info
hashID, err := svc.hashIds.EncodeInt64([]int64{nameId})
if err != nil {
return nil, errors.Wrap(err, "encode hash id")
}
seg.URI = fmt.Sprintf("%s/%s.%s", types, hashID, ext)
if token != "" {
seg.URI += fmt.Sprintf("?token=%s", token)
}
}
return media, nil
}
// GetSegmentPath
func (svc *Service) GetSegmentPath(ctx context.Context, t pg.MediaType, tenantId int64, hash string, segment int64) string {
return filepath.Join(svc.storageConfig.Path, fmt.Sprintf("%d", tenantId), hash, t.String(), fmt.Sprintf("%d.ts", segment))
}
func (svc *Service) Checkout(ctx context.Context, tenantId, userId, mediaId int64) error {
log := svc.log.WithField("method", "Checkout")
bought, err := svc.HasUserBought(ctx, tenantId, userId, mediaId)
if err != nil {
return errors.Wrap(err, "check user bought")
}
if bought {
return nil
}
media, err := svc.GetMediaByID(ctx, tenantId, userId, mediaId)
if err != nil {
return errors.Wrap(err, "get media")
}
userBalance, err := svc.GetUserBalance(ctx, tenantId, userId)
if err != nil {
return errors.Wrap(err, "get user balance")
}
if userBalance < media.Price {
return errorx.UserBalanceNotEnough
}
tx, err := svc.db.Begin()
if err != nil {
return errors.Wrap(err, "begin transaction")
}
defer tx.Rollback()
tbl := table.UserMedias
stmt := tbl.
INSERT(tbl.TenantID, tbl.UserID, tbl.MediaID, tbl.Price).
VALUES(Int(tenantId), Int(userId), Int(mediaId), Int(media.Price))
log.Debug(stmt.DebugSql())
if _, err := stmt.ExecContext(ctx, tx); err != nil {
return errors.Wrap(err, "insert user media")
}
// update user balance
tblUserTenants := table.UsersTenants
stmtUserTenants := tblUserTenants.
UPDATE().
SET(
tblUserTenants.Balance.SET(
tblUserTenants.Balance.SUB(Int(media.Price)),
),
).
WHERE(
tblUserTenants.TenantID.EQ(Int(tenantId)).AND(
tblUserTenants.UserID.EQ(Int(userId)),
),
)
log.Debug(stmtUserTenants.DebugSql())
if _, err := stmtUserTenants.ExecContext(ctx, tx); err != nil {
return errors.Wrap(err, "update user balance")
}
// add user balance
tblUserBalances := table.UserBalanceHistories
stmtUserBalances := tblUserBalances.
INSERT(
tblUserBalances.TenantID,
tblUserBalances.UserID,
tblUserBalances.Balance,
tblUserBalances.Target,
tblUserBalances.Type,
).
VALUES(
Int(tenantId),
Int(userId),
Int(-media.Price),
Json(pg.BalanceTarget{
ID: media.ID,
Name: table.Medias.TableName(),
}.MustValue()),
String(pg.BalanceTypeConsume.String()),
)
log.Debug(stmtUserBalances.DebugSql())
if _, err := stmtUserBalances.ExecContext(ctx, tx); err != nil {
return errors.Wrap(err, "insert user balance")
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "commit transaction")
}
return nil
}
// GetUserBalance
func (svc *Service) GetUserBalance(ctx context.Context, tenantId, userId int64) (int64, error) {
log := svc.log.WithField("method", "GetUserBalance")
tbl := table.UsersTenants
stmt := tbl.SELECT(tbl.Balance.AS("balance")).WHERE(
tbl.TenantID.EQ(Int(tenantId)).AND(
tbl.UserID.EQ(Int(userId)),
),
)
log.Debug(stmt.DebugSql())
var result struct {
Balance int64
}
if err := stmt.QueryContext(ctx, svc.db, &result); err != nil {
return 0, errors.Wrap(err, "query user balance")
}
return result.Balance, nil
}