174 lines
3.8 KiB
Go
174 lines
3.8 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"exporter/config"
|
|
"exporter/database/telegram_resource/public/model"
|
|
"exporter/database/telegram_resource/public/table"
|
|
|
|
. "github.com/go-jet/jet/v2/postgres"
|
|
"github.com/go-jet/jet/v2/qrm"
|
|
"github.com/lib/pq"
|
|
"github.com/pkg/errors"
|
|
"github.com/samber/lo"
|
|
)
|
|
|
|
type DBChannel struct {
|
|
UUID int64
|
|
Username string
|
|
Title string
|
|
Offset int
|
|
MinID int
|
|
}
|
|
|
|
func NewDBChannel(uuid int64, username, title string) *DBChannel {
|
|
if uuid == 0 {
|
|
panic("channel id is required")
|
|
}
|
|
return &DBChannel{
|
|
UUID: uuid,
|
|
Username: username,
|
|
Title: title,
|
|
}
|
|
}
|
|
|
|
func (c *DBChannel) Asset(assetID int64, ext string) string {
|
|
assetFile := fmt.Sprintf("%d/%d.%s", c.UUID, assetID, strings.Trim(ext, "."))
|
|
assetFile = filepath.Join(config.C.Output, assetFile)
|
|
|
|
// if file dir not exists then create it
|
|
if _, err := os.Stat(filepath.Dir(assetFile)); os.IsNotExist(err) {
|
|
if err := os.MkdirAll(filepath.Dir(assetFile), 0o755); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// if file exists then delete it
|
|
if _, err := os.Stat(assetFile); err == nil {
|
|
if err := os.Remove(assetFile); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
return assetFile
|
|
}
|
|
|
|
func (c *DBChannel) Get(ctx context.Context) error {
|
|
tbl := table.Channels
|
|
|
|
var m model.Channels
|
|
|
|
err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.UUID.EQ(Int64(c.UUID))).QueryContext(ctx, db, &m)
|
|
if err != nil {
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
// create new channel with default value
|
|
m = model.Channels{
|
|
UUID: c.UUID,
|
|
Username: c.Username,
|
|
Title: c.Title,
|
|
MinID: 0,
|
|
Offset: 0,
|
|
CreatedAt: lo.ToPtr(time.Now()),
|
|
UpdatedAt: lo.ToPtr(time.Now()),
|
|
}
|
|
|
|
if _, err := tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(m).ExecContext(ctx, db); err != nil {
|
|
return errors.Wrap(err, "insert channel")
|
|
}
|
|
} else {
|
|
return errors.Wrap(err, "select channel")
|
|
}
|
|
}
|
|
c.MinID = int(m.MinID)
|
|
c.Offset = int(m.Offset)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *DBChannel) Update(ctx context.Context, offsetID int) error {
|
|
c.Offset = offsetID
|
|
|
|
if c.MinID < offsetID {
|
|
c.MinID = offsetID
|
|
}
|
|
|
|
tbl := table.Channels
|
|
_, err := tbl.UPDATE().
|
|
SET(
|
|
tbl.Offset.SET(Int64(int64(c.Offset))),
|
|
tbl.MinID.SET(Int64(int64(c.MinID))),
|
|
).
|
|
WHERE(tbl.UUID.EQ(Int64(c.UUID))).
|
|
ExecContext(ctx, db)
|
|
if err != nil {
|
|
return errors.Wrap(err, "update channel")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *DBChannel) SaveMessage(ctx context.Context, msg *ChannelMessage) error {
|
|
message := &model.ChannelMessages{
|
|
ChannelID: c.UUID,
|
|
GroupID: msg.GroupID,
|
|
UUID: int64(msg.ID),
|
|
Content: lo.ToPtr(msg.Message),
|
|
Media: msg.GetMedias(),
|
|
PublishedAt: msg.PublishAt,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
tbl := table.ChannelMessages
|
|
|
|
var m model.ChannelMessages
|
|
err := tbl.
|
|
SELECT(tbl.ID).
|
|
WHERE(
|
|
tbl.GroupID.EQ(Int(message.GroupID)).AND(
|
|
tbl.UUID.EQ(Int64(message.UUID)),
|
|
),
|
|
).
|
|
LIMIT(1).
|
|
QueryContext(ctx, db, &m)
|
|
if err != nil {
|
|
// 如果没有找到记录,那么插入新记录
|
|
if errors.Is(err, qrm.ErrNoRows) {
|
|
_, err = tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(message).ExecContext(ctx, db)
|
|
if err != nil {
|
|
if e, ok := err.(*pq.Error); ok {
|
|
if e.Code == "23505" {
|
|
return nil
|
|
}
|
|
}
|
|
return errors.Wrap(err, "insert message")
|
|
}
|
|
return nil
|
|
}
|
|
return errors.Wrap(err, "select message")
|
|
}
|
|
|
|
// 如果找到记录,那么更新记录
|
|
stmt := tbl.UPDATE().SET(
|
|
tbl.Content.SET(RawString(`CONCAT(content, #var)`, RawArgs{
|
|
"#var": *message.Content,
|
|
})),
|
|
tbl.Media.SET(RawString(`media || #var::jsonb`, RawArgs{
|
|
"#var": msg.GetMedia(),
|
|
})),
|
|
).WHERE(
|
|
tbl.GroupID.EQ(Int(message.GroupID)).AND(
|
|
tbl.UUID.EQ(Int(message.UUID)),
|
|
),
|
|
)
|
|
|
|
_, err = db.ExecContext(ctx, stmt.DebugSql())
|
|
|
|
return err
|
|
}
|