Files
tg_exporter/internal/db_channel.go
2024-09-02 18:47:12 +08:00

136 lines
2.9 KiB
Go

package internal
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"exporter/database/telegram_resource/public/model"
"exporter/database/telegram_resource/public/table"
. "github.com/go-jet/jet/v2/postgres"
"github.com/go-jet/jet/v2/qrm"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/samber/lo"
)
type DBChannel struct {
UUID int64
Username string
Title string
Offset int
MinID int
}
func NewDBChannel(uuid int64, username, title string) *DBChannel {
if uuid == 0 {
panic("channel id is required")
}
return &DBChannel{
UUID: uuid,
Username: username,
Title: title,
}
}
func (c *DBChannel) Asset(assetID int64, ext string) string {
assetFile := fmt.Sprintf("outputs/%d/%d.%s", c.UUID, assetID, ext)
// if file dir not exists then create it
if _, err := os.Stat(filepath.Dir(assetFile)); os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(assetFile), 0o755); err != nil {
panic(err)
}
}
// if file exists then delete it
if _, err := os.Stat(assetFile); err == nil {
if err := os.Remove(assetFile); err != nil {
panic(err)
}
}
return assetFile
}
func (c *DBChannel) Get(ctx context.Context) error {
tbl := table.Channels
var m model.Channels
err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.UUID.EQ(Int64(c.UUID))).QueryContext(ctx, db, &m)
if err != nil {
if errors.Is(err, qrm.ErrNoRows) {
// create new channel with default value
m = model.Channels{
UUID: c.UUID,
Username: c.Username,
Title: c.Title,
MinID: 0,
Offset: 0,
CreatedAt: lo.ToPtr(time.Now()),
UpdatedAt: lo.ToPtr(time.Now()),
}
if _, err := tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(m).ExecContext(ctx, db); err != nil {
return errors.Wrap(err, "insert channel")
}
} else {
return errors.Wrap(err, "select channel")
}
}
c.MinID = int(m.MinID)
c.Offset = int(m.Offset)
return nil
}
func (c *DBChannel) Update(ctx context.Context, offsetID int) error {
c.Offset = offsetID
if c.MinID < offsetID {
c.MinID = offsetID
}
tbl := table.Channels
_, err := tbl.UPDATE().
SET(
tbl.Offset.SET(Int64(int64(c.Offset))),
tbl.MinID.SET(Int64(int64(c.MinID))),
).
WHERE(tbl.UUID.EQ(Int64(c.UUID))).
ExecContext(ctx, db)
if err != nil {
return errors.Wrap(err, "update channel")
}
return nil
}
func (c *DBChannel) SaveMessage(ctx context.Context, msg *ChannelMessage) error {
message := &model.ChannelMessages{
ChannelID: c.UUID,
UUID: int64(msg.ID),
Content: lo.ToPtr(msg.Message),
Media: msg.GetMedia(),
PublishedAt: msg.PublishAt,
CreatedAt: time.Now(),
}
tbl := table.ChannelMessages
_, err := tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(message).ExecContext(ctx, db)
if err != nil {
if e, ok := err.(*pq.Error); ok {
if e.Code == "23505" {
return nil
}
}
return errors.Wrap(err, "insert message")
}
return nil
}