feat: use db

This commit is contained in:
Rogee
2024-09-02 18:01:38 +08:00
parent 472fe2ffaf
commit 98cd1a0365
18 changed files with 477 additions and 136 deletions

View File

@@ -1,11 +1,16 @@
package internal
import "fmt"
import (
"encoding/json"
"fmt"
"time"
)
type ChannelMessage struct {
ID int
Message string
Medias []ChannelMessageMedia
ID int
Message string
Medias []ChannelMessageMedia
PublishAt time.Time
}
type ChannelMessageMedia struct {
@@ -13,8 +18,8 @@ type ChannelMessageMedia struct {
Video string
}
func NewChannelMessage(id int) *ChannelMessage {
return &ChannelMessage{ID: id}
func NewChannelMessage(id, ts int) *ChannelMessage {
return &ChannelMessage{ID: id, PublishAt: time.Unix(int64(ts), 0)}
}
func (c *ChannelMessage) WithMessage(message string) *ChannelMessage {
@@ -31,3 +36,8 @@ func (c *ChannelMessage) WithVideo(video string) *ChannelMessage {
c.Medias = append(c.Medias, ChannelMessageMedia{Video: video})
return c
}
func (c *ChannelMessage) GetMedia() string {
b, _ := json.Marshal(c.Medias)
return string(b)
}

View File

@@ -10,7 +10,7 @@ import (
"go.uber.org/zap"
)
func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *ChannelConfig, modeHistory bool) error {
func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel, modeHistory bool) error {
inputPeer := &tg.InputPeerChannel{ChannelID: channel.ID, AccessHash: channel.AccessHash}
request := &tg.MessagesGetHistoryRequest{
@@ -55,8 +55,8 @@ func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *Channel
return
}
channelMessage := NewChannelMessage(msg.ID)
defer cfg.SaveMessage(channelMessage)
channelMessage := NewChannelMessage(msg.ID, msg.GetDate())
defer cfg.SaveMessage(ctx, channelMessage)
channelMessage.WithMessage(msg.GetMessage())

View File

@@ -1,119 +0,0 @@
package internal
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"github.com/pkg/errors"
)
type ChannelConfig struct {
ID int64 `json:"id"`
Offset int `json:"offset"`
MinID int `json:"min_id"`
}
func NewChannelConfig(channelID int64) *ChannelConfig {
if channelID == 0 {
panic("channel id is required")
}
return &ChannelConfig{ID: channelID}
}
func (c *ChannelConfig) Asset(assetID int64, ext string) string {
assetFile := fmt.Sprintf("outputs/%d/assets/%d.%s", c.ID, assetID, ext)
// if file dir not exists then create it
if _, err := os.Stat(filepath.Dir(assetFile)); os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(assetFile), 0o755); err != nil {
panic(err)
}
}
// if file exists then delete it
if _, err := os.Stat(assetFile); err == nil {
if err := os.Remove(assetFile); err != nil {
panic(err)
}
}
return assetFile
}
func (c *ChannelConfig) file(_ context.Context) (string, error) {
channelConfigFile := fmt.Sprintf("outputs/%d/config.json", c.ID)
// if file dir not exists then create it
if _, err := os.Stat(filepath.Dir(channelConfigFile)); os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(channelConfigFile), 0o755); err != nil {
return "", errors.Wrap(err, "create channel config dir")
}
}
// if file not exists then create it
if _, err := os.Stat(channelConfigFile); os.IsNotExist(err) {
// create config file
data, _ := json.Marshal(c)
if err := os.WriteFile(channelConfigFile, data, 0o644); err != nil {
return "", errors.Wrap(err, "write channel config")
}
}
return channelConfigFile, nil
}
func (c *ChannelConfig) Read(ctx context.Context) error {
channelConfigFile, err := c.file(ctx)
if err != nil {
return err
}
// read config file
data, err := os.ReadFile(channelConfigFile)
if err != nil {
return errors.Wrap(err, "read channel config")
}
if err := json.Unmarshal(data, &c); err != nil {
return errors.Wrap(err, "unmarshal channel config")
}
return nil
}
func (c *ChannelConfig) Update(ctx context.Context, offsetID int) error {
channelConfigFile, err := c.file(ctx)
if err != nil {
return err
}
c.Offset = offsetID
if c.MinID < offsetID {
c.MinID = offsetID
}
b, err := json.Marshal(c)
if err != nil {
return errors.Wrap(err, "marshal channel config")
}
if err := os.WriteFile(channelConfigFile, b, 0o644); err != nil {
return errors.Wrap(err, "write channel config")
}
return nil
}
func (c *ChannelConfig) SaveMessage(msg *ChannelMessage) {
// save message
saveTo := fmt.Sprintf("outputs/%d/%d.json", c.ID, msg.ID)
b, err := json.Marshal(msg)
if err != nil {
panic(err)
}
if err := os.WriteFile(saveTo, b, 0o644); err != nil {
panic(err)
}
}

View File

@@ -51,8 +51,8 @@ func exportCmd(ctx context.Context) error {
return errors.New("channel not found")
}
cfg := NewChannelConfig(channel.GetID())
if err := cfg.Read(ctx); err != nil {
cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title)
if err := cfg.Get(ctx); err != nil {
return err
}

View File

@@ -37,6 +37,9 @@ func InitClient(cfg *config.Config) error {
func Close() error {
_ = logger.Sync()
if db != nil {
db.Close()
}
return nil
}

18
internal/db.go Normal file
View File

@@ -0,0 +1,18 @@
package internal
import (
"database/sql"
_ "github.com/lib/pq"
)
var db *sql.DB
func InitDB(dsn string) error {
var err error
db, err = sql.Open("postgres", dsn)
if err != nil {
return err
}
return db.Ping()
}

126
internal/db_channel.go Normal file
View File

@@ -0,0 +1,126 @@
package internal
import (
"context"
"database/sql"
"fmt"
"os"
"path/filepath"
"time"
"exporter/database/telegram_resource/public/model"
"exporter/database/telegram_resource/public/table"
. "github.com/go-jet/jet/v2/postgres"
"github.com/pkg/errors"
"github.com/samber/lo"
)
type DBChannel struct {
UUID int64
Username string
Title string
Offset int
MinID int
}
func NewDBChannel(uuid int64, username, title string) *DBChannel {
if uuid == 0 {
panic("channel id is required")
}
return &DBChannel{
UUID: uuid,
Username: username,
Title: title,
}
}
func (c *DBChannel) Asset(assetID int64, ext string) string {
assetFile := fmt.Sprintf("outputs/%d/%d.%s", c.UUID, assetID, ext)
// if file dir not exists then create it
if _, err := os.Stat(filepath.Dir(assetFile)); os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(assetFile), 0o755); err != nil {
panic(err)
}
}
// if file exists then delete it
if _, err := os.Stat(assetFile); err == nil {
if err := os.Remove(assetFile); err != nil {
panic(err)
}
}
return assetFile
}
func (c *DBChannel) Get(ctx context.Context) error {
tbl := table.Channels
var m *model.Channels
err := tbl.SELECT(tbl.AllColumns).QueryContext(ctx, db, &m)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// create new channel with default value
m = &model.Channels{
UUID: c.UUID,
Username: c.Username,
Title: c.Title,
MinID: 0,
Offset: 0,
CreatedAt: lo.ToPtr(time.Now()),
UpdatedAt: lo.ToPtr(time.Now()),
}
if _, err := tbl.INSERT(tbl.AllColumns).MODEL(m).ExecContext(ctx, db); err != nil {
return errors.Wrap(err, "insert channel")
}
} else {
return errors.Wrap(err, "select channel")
}
}
c.MinID = int(m.MinID)
c.Offset = int(m.Offset)
return nil
}
func (c *DBChannel) Update(ctx context.Context, offsetID int) error {
c.Offset = offsetID
if c.MinID < offsetID {
c.MinID = offsetID
}
tbl := table.Channels
_, err := tbl.UPDATE().SET(
tbl.Offset.SET(Int(int64(c.Offset))),
tbl.MinID.SET(Int(int64(c.MinID))),
).ExecContext(ctx, db)
if err != nil {
return errors.Wrap(err, "update channel")
}
return nil
}
func (c *DBChannel) SaveMessage(ctx context.Context, msg *ChannelMessage) error {
message := &model.ChannelMessages{
ChannelID: c.UUID,
UUID: int64(msg.ID),
Content: lo.ToPtr(msg.Message),
Media: msg.GetMedia(),
PublishedAt: msg.PublishAt,
CreatedAt: time.Now(),
}
tbl := table.ChannelMessages
_, err := tbl.INSERT(tbl.AllColumns).MODEL(message).ExecContext(ctx, db)
if err != nil {
return errors.Wrap(err, "insert message")
}
return nil
}

36
internal/db_test.go Normal file
View File

@@ -0,0 +1,36 @@
package internal
import (
"context"
"testing"
"time"
"exporter/database/telegram_resource/public/model"
"exporter/database/telegram_resource/public/table"
"github.com/samber/lo"
)
func TestInitDB(t *testing.T) {
dsn := "postgresql://postgres:xixi0202@10.1.1.3:5432/telegram_resource?sslmode=disable"
// dns1 := "host=%s user=%s password=%s dbname=%s port=%s sslmode=%s TimeZone=%s"
if err := InitDB(dsn); err != nil {
t.Fatalf("InitDB() error = %v", err)
}
t.Logf("InitDB() success")
stmt := table.Channels.INSERT(table.Channels.AllColumns).MODEL(model.Channels{
ID: 1,
Username: "test",
Title: "helo",
CreatedAt: lo.ToPtr(time.Now()),
UpdatedAt: lo.ToPtr(time.Now()),
Offset: 10,
MinID: 11,
})
t.Logf("sql: %v", stmt.DebugSql())
if _, err := stmt.ExecContext(context.Background(), db); err != nil {
t.Fatalf("stmt.ExecContext() error = %v", err)
}
}