package internal import ( "context" "fmt" "os" "path/filepath" "strings" "time" "exporter/config" "exporter/database/telegram_resource/public/model" "exporter/database/telegram_resource/public/table" . "github.com/go-jet/jet/v2/postgres" "github.com/go-jet/jet/v2/qrm" "github.com/lib/pq" "github.com/pkg/errors" "github.com/samber/lo" ) type DBChannel struct { UUID int64 Username string Title string OffsetID int MinID int ExportMedia bool } func NewDBChannelFromDB(id int64) (*DBChannel, error) { var m model.Channels tbl := table.Channels err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.ID.EQ(Int64(id))).QueryContext(context.Background(), db, &m) if err != nil { return nil, errors.Wrapf(err, "find channel by id(%d) failed", id) } return &DBChannel{ UUID: m.UUID, Username: m.Username, Title: m.Title, OffsetID: int(m.Offset), MinID: int(m.MinID), ExportMedia: m.ExportMedia, }, nil } func NewDBChannel(uuid int64, username, title string) *DBChannel { if uuid == 0 { panic("channel id is required") } return &DBChannel{ UUID: uuid, Username: username, Title: title, } } func (c *DBChannel) Asset(assetID int64, ext string) string { assetFile := fmt.Sprintf("%d/%d.%s", c.UUID, assetID, strings.Trim(ext, ".")) assetFile = filepath.Join(config.C.Output, assetFile) // if file dir not exists then create it if _, err := os.Stat(filepath.Dir(assetFile)); os.IsNotExist(err) { if err := os.MkdirAll(filepath.Dir(assetFile), 0o755); err != nil { panic(err) } } // if file exists then delete it if _, err := os.Stat(assetFile); err == nil { if err := os.Remove(assetFile); err != nil { panic(err) } } return assetFile } func (c *DBChannel) GetOrCreate(ctx context.Context) error { tbl := table.Channels var m model.Channels err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.UUID.EQ(Int64(c.UUID))).QueryContext(ctx, db, &m) if err != nil { if errors.Is(err, qrm.ErrNoRows) { // create new channel with default value m = model.Channels{ UUID: c.UUID, Username: c.Username, Title: c.Title, MinID: 0, Offset: 0, ExportMedia: c.ExportMedia, CreatedAt: lo.ToPtr(time.Now()), UpdatedAt: lo.ToPtr(time.Now()), } if _, err := tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(m).ExecContext(ctx, db); err != nil { return errors.Wrap(err, "insert channel") } } else { return errors.Wrap(err, "select channel") } } c.MinID = int(m.MinID) c.OffsetID = int(m.Offset) c.ExportMedia = m.ExportMedia return nil } func (c *DBChannel) SetOffset(ctx context.Context, offset int) error { if c.OffsetID < offset { c.OffsetID = offset } tbl := table.Channels _, err := tbl.UPDATE(). SET( tbl.Offset.SET(Int64(int64(c.OffsetID))), ). WHERE(tbl.UUID.EQ(Int64(c.UUID))). ExecContext(ctx, db) if err != nil { return errors.Wrap(err, "update channel") } return nil } func (c *DBChannel) Update(ctx context.Context, msgID int) error { if c.MinID < msgID { c.MinID = msgID } if c.OffsetID < msgID { c.OffsetID = msgID } tbl := table.Channels _, err := tbl.UPDATE(). SET( tbl.MinID.SET(Int64(int64(c.MinID))), tbl.Offset.SET(Int64(int64(c.OffsetID))), ). WHERE(tbl.UUID.EQ(Int64(c.UUID))). ExecContext(ctx, db) if err != nil { return errors.Wrap(err, "update channel") } return nil } func (c *DBChannel) SaveMessage(ctx context.Context, msg *ChannelMessage) error { message := &model.ChannelMessages{ ChannelID: c.UUID, GroupID: msg.GroupID, UUID: int64(msg.ID), Content: lo.ToPtr(msg.Message), Media: msg.GetMedias(), PublishedAt: msg.PublishAt, CreatedAt: time.Now(), } tbl := table.ChannelMessages createNew := func() error { _, err := tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(message).ExecContext(ctx, db) if err != nil { if e, ok := err.(*pq.Error); ok { if e.Code == "23505" { return nil } } return errors.Wrap(err, "insert message") } return nil } if msg.GroupID == 0 { if err := createNew(); err != nil { return err } return nil } cond := tbl.GroupID.EQ(Int(message.GroupID)).AND( tbl.ChannelID.EQ(Int(message.ChannelID)), ) var m model.ChannelMessages err := tbl. SELECT(tbl.ID). WHERE(cond). LIMIT(1). QueryContext(ctx, db, &m) if err != nil { // 如果没有找到记录,那么插入新记录 if errors.Is(err, qrm.ErrNoRows) { return createNew() } return errors.Wrap(err, "select message") } // 如果找到记录,那么更新记录 stmt := tbl.UPDATE().SET( tbl.Content.SET(RawString(`CONCAT(content, #var::varchar)`, RawArgs{ "#var": *message.Content, })), tbl.Media.SET(RawString(`media || #var::jsonb`, RawArgs{ "#var": msg.GetMedias(), })), ).WHERE(cond) _, err = stmt.ExecContext(ctx, db) // _, err = db.ExecContext(ctx, stmt.DebugSql()) return err }