diff --git a/.gitignore b/.gitignore index 34e1251..bac9286 100644 --- a/.gitignore +++ b/.gitignore @@ -26,4 +26,4 @@ log.json log-*.json session.json tdl-export.json -./exporter \ No newline at end of file +exporter \ No newline at end of file diff --git a/config/config.go b/config/config.go index 0a0c9bd..0fae0a1 100644 --- a/config/config.go +++ b/config/config.go @@ -8,15 +8,17 @@ import ( var C *Config type Config struct { - Phone string `mapstructure:"phone"` - AppID int `mapstructure:"app_id"` - AppHash string `mapstructure:"app_hash"` - BotToken string `mapstructure:"bot_token"` - SessionFile string `mapstructure:"session_file"` - LogFile string `mapstructure:"log_file"` - MaxSize string `mapstructure:"max_size"` - DSN string `mapstructure:"dsn"` - Output string `mapstructure:"output"` + Phone string `mapstructure:"phone"` + AppID int `mapstructure:"app_id"` + AppHash string `mapstructure:"app_hash"` + BotToken string `mapstructure:"bot_token"` + SessionFile string `mapstructure:"session_file"` + LogFile string `mapstructure:"log_file"` + MaxSize string `mapstructure:"max_size"` + DSN string `mapstructure:"dsn"` + Output string `mapstructure:"output"` + PublishHost string `mapstructure:"publish_host"` + PublishToken string `mapstructure:"publish_token"` } // GetMaxSize diff --git a/internal/client_channel.go b/internal/client_channel.go index d5902cb..defa5e5 100644 --- a/internal/client_channel.go +++ b/internal/client_channel.go @@ -5,6 +5,8 @@ import ( "mime" "os" + "exporter/pkg/errorx" + "github.com/dustin/go-humanize" "github.com/gotd/td/telegram/downloader" "github.com/gotd/td/tg" @@ -35,7 +37,7 @@ func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChann } if len(messages.(*tg.MessagesChannelMessages).GetMessages()) == 0 { logger.Info("no new message", zap.Int64("channel", channel.ID)) - return errors.New("no new message") + return errorx.ErrNoNewMessage } for _, item := range messages.(*tg.MessagesChannelMessages).GetMessages() { diff --git a/internal/cmd_publish.go b/internal/cmd_publish.go index 5e59c79..348f5a1 100644 --- a/internal/cmd_publish.go +++ b/internal/cmd_publish.go @@ -3,6 +3,13 @@ package internal import ( "context" + "exporter/config" + "exporter/database/telegram_resource/public/model" + "exporter/database/telegram_resource/public/table" + "exporter/pkg/memos" + + . "github.com/go-jet/jet/v2/postgres" + "github.com/pkg/errors" "github.com/spf13/cobra" ) @@ -17,5 +24,35 @@ func PublishCmd() *cobra.Command { } func publishCmd(ctx context.Context) error { + var msg model.ChannelMessages + + tbl := table.ChannelMessages + err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.Published.IS_TRUE()).LIMIT(1).QueryContext(ctx, db, &msg) + if err != nil { + return err + } + + // publish item + if err := publish(ctx, msg); err != nil { + return errors.Wrap(err, "failed to publish") + } + + _, err = tbl.UPDATE().SET(tbl.Published.SET(Bool(true))).WHERE(tbl.ID.EQ(Int(msg.ID))).ExecContext(ctx, db) + if err != nil { + return err + } + return nil } + +func publish(ctx context.Context, msg model.ChannelMessages) error { + data := memos.PostData{ + Host: config.C.PublishHost, + Token: config.C.PublishToken, + Content: *msg.Content, + ChannelID: msg.ChannelID, + ChannelTitle: "", + Resources: []memos.Resource{}, + } + return memos.Post(data) +} diff --git a/internal/cmd_watch.go b/internal/cmd_watch.go new file mode 100644 index 0000000..2e34209 --- /dev/null +++ b/internal/cmd_watch.go @@ -0,0 +1,73 @@ +package internal + +import ( + "context" + "time" + + "exporter/database/telegram_resource/public/model" + "exporter/database/telegram_resource/public/table" + "exporter/pkg/errorx" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +func WatchCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "watch", + Short: "watch channels", + RunE: wrapE(watchCmd), + } + + return cmd +} + +func watchCmd(ctx context.Context) error { + tbl := table.Channels + + var lastTravelAt time.Time + for { + if time.Since(lastTravelAt) < 5*time.Minute { + time.Sleep(time.Minute) + continue + } + + lastTravelAt = time.Now() + var channels []model.Channels + if err := tbl.SELECT(tbl.AllColumns).QueryContext(ctx, db, &channels); err != nil { + logger.Fatal("failed to get channels", zap.Error(err)) + } + + logger.Info("pending channels", zap.Int("count", len(channels))) + for _, ch := range channels { + logger.Info("crawl channel", zap.String("channel", ch.Title), zap.Int64("uuid", ch.UUID)) + + channel, err := client.ChannelInfoByID(ctx, ch.UUID) + if err != nil { + logger.Error("failed to get channel info", zap.Error(err), zap.Int64("uuid", ch.UUID)) + } + + if channel.GetID() == 0 { + return errors.New("channel not found") + } + + cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title) + if err := cfg.Get(ctx); err != nil { + logger.Error("failed to get channel config", zap.Error(err), zap.Int64("uuid", ch.UUID)) + continue + } + + for { + if err := client.Channel(ctx, channel, cfg, modeHistory); err != nil { + if errors.Is(err, errorx.ErrNoNewMessage) { + logger.Info("no new message", zap.Int64("uuid", ch.UUID)) + } else { + logger.Error("failed to get channel messages", zap.Error(err), zap.Int64("uuid", ch.UUID)) + } + break + } + } + } + } +} diff --git a/internal/db_channel.go b/internal/db_channel.go index 0c1e31a..8a83b4f 100644 --- a/internal/db_channel.go +++ b/internal/db_channel.go @@ -92,7 +92,13 @@ func (c *DBChannel) Get(ctx context.Context) error { } func (c *DBChannel) Update(ctx context.Context, offsetID int) error { - c.Offset = offsetID + if c.Offset == 0 { + c.Offset = offsetID + } + + if c.Offset > offsetID { + c.Offset = offsetID + } if c.MinID < offsetID { c.MinID = offsetID diff --git a/internal/db_channel_test.go b/internal/db_channel_test.go index 7741d30..b93bed1 100644 --- a/internal/db_channel_test.go +++ b/internal/db_channel_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + "exporter/database/telegram_resource/public/table" + "github.com/samber/lo" ) @@ -48,3 +50,18 @@ func TestDBChannel_SaveMessage(t *testing.T) { t.Logf("%+v", err) } } + +func Test_Join(t *testing.T) { + dsn := "postgresql://postgres:xixi0202@10.1.1.3:5432/telegram_resource?sslmode=disable" + if err := InitDB(dsn); err != nil { + t.Error(err) + } + + tblC := table.Channels + tbl := table.ChannelMessages + stmt := tbl.SELECT(tbl.AllColumns, tblC.Title). + WHERE(tbl.Published.IS_TRUE()). + LIMIT(1). + FROM(tbl.LEFT_JOIN(tblC, tbl.ChannelID.EQ(tblC.UUID))) + t.Log(stmt.DebugSql()) +} diff --git a/pkg/errorx/err.go b/pkg/errorx/err.go new file mode 100644 index 0000000..2ead098 --- /dev/null +++ b/pkg/errorx/err.go @@ -0,0 +1,5 @@ +package errorx + +import "errors" + +var ErrNoNewMessage = errors.New("no new message")