From 8666db6318fb798674b078ee1be17daa6f715049 Mon Sep 17 00:00:00 2001 From: Rogee Date: Thu, 5 Sep 2024 15:12:53 +0800 Subject: [PATCH] feat: add cmds --- go.mod | 3 + go.sum | 6 ++ internal/client_channel.go | 17 +++-- internal/cmd_channel.go | 20 ++++++ .../{cmd_export.go => cmd_channel_add.go} | 26 +++---- internal/cmd_channel_export.go | 69 +++++++++++++++++++ internal/cmd_channel_list.go | 50 ++++++++++++++ .../{cmd_watch.go => cmd_channel_watch.go} | 15 ++-- internal/db_channel.go | 49 ++++++++++++- main.go | 3 +- 10 files changed, 230 insertions(+), 28 deletions(-) create mode 100644 internal/cmd_channel.go rename internal/{cmd_export.go => cmd_channel_add.go} (68%) create mode 100644 internal/cmd_channel_export.go create mode 100644 internal/cmd_channel_list.go rename internal/{cmd_watch.go => cmd_channel_watch.go} (84%) diff --git a/go.mod b/go.mod index 89b141c..bd90d1e 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-jet/jet/v2 v2.11.1 github.com/gotd/td v0.107.0 github.com/imroc/req/v3 v3.43.7 + github.com/jedib0t/go-pretty/v6 v6.5.9 github.com/lib/pq v1.10.9 github.com/pkg/errors v0.9.1 github.com/samber/lo v1.47.0 @@ -37,6 +38,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/magiconair/properties v1.8.7 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/onsi/ginkgo/v2 v2.16.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect @@ -44,6 +46,7 @@ require ( github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/quic-go v0.41.0 // indirect github.com/refraction-networking/utls v1.6.3 // indirect + github.com/rivo/uniseg v0.2.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/segmentio/asm v1.2.0 // indirect diff --git a/go.sum b/go.sum index ab66100..6a4810b 100644 --- a/go.sum +++ b/go.sum @@ -53,6 +53,8 @@ github.com/imroc/req/v3 v3.43.7 h1:dOcNb9n0X83N5/5/AOkiU+cLhzx8QFXjv5MhikazzQA= github.com/imroc/req/v3 v3.43.7/go.mod h1:SQIz5iYop16MJxbo8ib+4LnostGCok8NQf8ToyQc2xA= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jedib0t/go-pretty/v6 v6.5.9 h1:ACteMBRrrmm1gMsXe9PSTOClQ63IXDUt03H5U+UV8OU= +github.com/jedib0t/go-pretty/v6 v6.5.9/go.mod h1:zbn98qrYlh95FIhwwsbIip0LYpwSG8SUOScs+v9/t0E= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -63,6 +65,8 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/onsi/ginkgo/v2 v2.16.0 h1:7q1w9frJDzninhXxjZd+Y/x54XNjG/UlRLIYPZafsPM= @@ -82,6 +86,8 @@ github.com/quic-go/quic-go v0.41.0 h1:aD8MmHfgqTURWNJy48IYFg2OnxwHT3JL7ahGs73lb4 github.com/quic-go/quic-go v0.41.0/go.mod h1:qCkNjqczPEvgsOnxZ0eCD14lv+B2LHlFAB++CNOh9hA= github.com/refraction-networking/utls v1.6.3 h1:MFOfRN35sSx6K5AZNIoESsBuBxS2LCgRilRIdHb6fDc= github.com/refraction-networking/utls v1.6.3/go.mod h1:yil9+7qSl+gBwJqztoQseO6Pr3h62pQoY1lXiNR/FPs= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/internal/client_channel.go b/internal/client_channel.go index 8713f60..b355865 100644 --- a/internal/client_channel.go +++ b/internal/client_channel.go @@ -16,10 +16,10 @@ import ( "go.uber.org/zap" ) -func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel) error { +func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel, onlyAddChannel bool) error { inputPeer := &tg.InputPeerChannel{ChannelID: channel.ID, AccessHash: channel.AccessHash} - limit := 50 + limit := 20 request := &tg.MessagesGetHistoryRequest{ Peer: inputPeer, Limit: limit, @@ -32,12 +32,21 @@ func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChann if err != nil { return errors.Wrap(err, "messages.getHistory") } - if len(history.(*tg.MessagesChannelMessages).GetMessages()) == 0 { + + messages := history.(*tg.MessagesChannelMessages).GetMessages() + if len(messages) == 0 { + if cfg.OffsetID > 0 && request.OffsetID < cfg.OffsetID { + cfg.Update(ctx, request.OffsetID) + return nil + } logger.Info("no new message", zap.Int64("channel", channel.ID)) return errorx.ErrNoNewMessage } - messages := history.(*tg.MessagesChannelMessages).GetMessages() + if onlyAddChannel { + return cfg.SetOffset(ctx, messages[0].GetID()) + } + messages = lo.Reverse(messages) for _, item := range messages { diff --git a/internal/cmd_channel.go b/internal/cmd_channel.go new file mode 100644 index 0000000..9ac0ab3 --- /dev/null +++ b/internal/cmd_channel.go @@ -0,0 +1,20 @@ +package internal + +import "github.com/spf13/cobra" + +func ChannelCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "channel", + Aliases: []string{"ch"}, + Short: "channel commands", + } + + cmd.AddCommand( + ChannelAddCmd(), + ChannelListCmd(), + ChannelExportCmd(), + ChannelWatchCmd(), + ) + + return cmd +} diff --git a/internal/cmd_export.go b/internal/cmd_channel_add.go similarity index 68% rename from internal/cmd_export.go rename to internal/cmd_channel_add.go index dd253c7..da127df 100644 --- a/internal/cmd_export.go +++ b/internal/cmd_channel_add.go @@ -14,27 +14,31 @@ import ( var ( channelID int64 channelAlias string + exportMedia bool ) -func ExportCmd() *cobra.Command { +func ChannelAddCmd() *cobra.Command { cmd := &cobra.Command{ - Use: "export", - Short: "export channels", + Use: "add", + Aliases: []string{"a"}, + Short: "add a new channel", PreRunE: func(cmd *cobra.Command, args []string) error { log.Println("init client") defer log.Println("init client done") return InitClient(config.C) }, - RunE: wrapE(exportCmd), + RunE: func(cmd *cobra.Command, args []string) error { + return channelAddCmd(context.Background()) + }, } - cmd.Flags().Int64Var(&channelID, "channel", 0, "channel id") cmd.Flags().StringVar(&channelAlias, "alias", "", "channel alias") + cmd.Flags().BoolVar(&exportMedia, "media", false, "export media") return cmd } -func exportCmd(ctx context.Context) error { +func channelAddCmd(ctx context.Context) error { if channelID == 0 && channelAlias == "" { return errors.New("channel id or alias is required") } @@ -58,14 +62,10 @@ func exportCmd(ctx context.Context) error { } cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title) - if err := cfg.Get(ctx); err != nil { + cfg.ExportMedia = exportMedia + if err := cfg.GetOrCreate(ctx); err != nil { return err } - // https://t.me/yunpanshare/37426 - for { - if err := client.Channel(ctx, channel, cfg); err != nil { - return err - } - } + return client.Channel(ctx, channel, cfg, true) } diff --git a/internal/cmd_channel_export.go b/internal/cmd_channel_export.go new file mode 100644 index 0000000..82390c0 --- /dev/null +++ b/internal/cmd_channel_export.go @@ -0,0 +1,69 @@ +package internal + +import ( + "context" + "log" + "math" + "time" + + "exporter/config" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +var dbChannelID int64 + +func ChannelExportCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "export", + Aliases: []string{"e"}, + Short: "export channels", + PreRunE: func(cmd *cobra.Command, args []string) error { + log.Println("init client") + defer log.Println("init client done") + return InitClient(config.C) + }, + RunE: wrapE(channelExportCmd), + } + + cmd.Flags().Int64Var(&dbChannelID, "channel", 0, "channel id") + + return cmd +} + +func channelExportCmd(ctx context.Context) error { + if dbChannelID == 0 { + return errors.New("db channel id required") + } + + channel, err := client.ChannelInfoByID(ctx, channelID) + if err != nil { + return err + } + + if channel.GetID() == 0 { + return errors.New("channel not found") + } + + cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title) + if err := cfg.GetOrCreate(ctx); err != nil { + return err + } + + var continueRetryTimes int + for { + if err := client.Channel(ctx, channel, cfg, false); err != nil { + logger.Error("failed to export channel", zap.Error(err), zap.Int64("uuid", channel.GetID())) + continueRetryTimes++ + sleepSeconds := math.Pow(2, float64(continueRetryTimes)) + if sleepSeconds > 128 { + return err + } + + logger.Info("retry after", zap.Int("seconds", int(sleepSeconds))) + time.Sleep(time.Second * time.Duration(sleepSeconds)) + } + } +} diff --git a/internal/cmd_channel_list.go b/internal/cmd_channel_list.go new file mode 100644 index 0000000..435787e --- /dev/null +++ b/internal/cmd_channel_list.go @@ -0,0 +1,50 @@ +package internal + +import ( + "context" + "os" + + "exporter/database/telegram_resource/public/model" + "exporter/database/telegram_resource/public/table" + + tablePrinter "github.com/jedib0t/go-pretty/v6/table" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +func ChannelListCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Aliases: []string{"ls"}, + Short: "list all channels", + RunE: func(cmd *cobra.Command, args []string) error { + return channelListCmd(context.Background()) + }, + } + return cmd +} + +func channelListCmd(ctx context.Context) error { + var channels []model.Channels + tbl := table.Channels + if err := tbl.SELECT(tbl.AllColumns).ORDER_BY(tbl.ID.ASC()).QueryContext(ctx, db, &channels); err != nil { + logger.Fatal("failed to get channels", zap.Error(err)) + } + + t := tablePrinter.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(tablePrinter.Row{"Channel ID", "Min ID", "Offset", "Export Media", "PK", "Title"}) + + for _, ch := range channels { + t.AppendRow([]interface{}{ + ch.UUID, + ch.MinID, + ch.Offset, + ch.ExportMedia, + ch.ID, + ch.Title, + }) + } + t.Render() + return nil +} diff --git a/internal/cmd_watch.go b/internal/cmd_channel_watch.go similarity index 84% rename from internal/cmd_watch.go rename to internal/cmd_channel_watch.go index 40d0554..dcbe194 100644 --- a/internal/cmd_watch.go +++ b/internal/cmd_channel_watch.go @@ -15,22 +15,23 @@ import ( "go.uber.org/zap" ) -func WatchCmd() *cobra.Command { +func ChannelWatchCmd() *cobra.Command { cmd := &cobra.Command{ - Use: "watch", - Short: "watch channels", + Use: "watch", + Aliases: []string{"w"}, + Short: "watch channels", PreRunE: func(cmd *cobra.Command, args []string) error { log.Println("init client") defer log.Println("init client done") return InitClient(config.C) }, - RunE: wrapE(watchCmd), + RunE: wrapE(channelWatchCmd), } return cmd } -func watchCmd(ctx context.Context) error { +func channelWatchCmd(ctx context.Context) error { tbl := table.Channels var lastTravelAt time.Time @@ -60,13 +61,13 @@ func watchCmd(ctx context.Context) error { } cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title) - if err := cfg.Get(ctx); err != nil { + if err := cfg.GetOrCreate(ctx); err != nil { logger.Error("failed to get channel config", zap.Error(err), zap.Int64("uuid", ch.UUID)) continue } for { - if err := client.Channel(ctx, channel, cfg); err != nil { + if err := client.Channel(ctx, channel, cfg, false); err != nil { if errors.Is(err, errorx.ErrNoNewMessage) { logger.Info("no new message", zap.Int64("uuid", ch.UUID)) } else { diff --git a/internal/db_channel.go b/internal/db_channel.go index 3b8fea2..f9ef612 100644 --- a/internal/db_channel.go +++ b/internal/db_channel.go @@ -23,10 +23,30 @@ type DBChannel struct { UUID int64 Username string Title string + OffsetID int MinID int ExportMedia bool } +func NewDBChannelFromDB(id int64) (*DBChannel, error) { + var m model.Channels + + tbl := table.Channels + err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.ID.EQ(Int64(id))).QueryContext(context.Background(), db, &m) + if err != nil { + return nil, errors.Wrapf(err, "find channel by id(%d) failed", id) + } + + return &DBChannel{ + UUID: m.UUID, + Username: m.Username, + Title: m.Title, + OffsetID: int(m.Offset), + MinID: int(m.MinID), + ExportMedia: m.ExportMedia, + }, nil +} + func NewDBChannel(uuid int64, username, title string) *DBChannel { if uuid == 0 { panic("channel id is required") @@ -59,7 +79,7 @@ func (c *DBChannel) Asset(assetID int64, ext string) string { return assetFile } -func (c *DBChannel) Get(ctx context.Context) error { +func (c *DBChannel) GetOrCreate(ctx context.Context) error { tbl := table.Channels var m model.Channels @@ -74,7 +94,7 @@ func (c *DBChannel) Get(ctx context.Context) error { Title: c.Title, MinID: 0, Offset: 0, - ExportMedia: false, + ExportMedia: c.ExportMedia, CreatedAt: lo.ToPtr(time.Now()), UpdatedAt: lo.ToPtr(time.Now()), } @@ -87,20 +107,45 @@ func (c *DBChannel) Get(ctx context.Context) error { } } c.MinID = int(m.MinID) + c.OffsetID = int(m.Offset) c.ExportMedia = m.ExportMedia return nil } +func (c *DBChannel) SetOffset(ctx context.Context, offset int) error { + if c.OffsetID < offset { + c.OffsetID = offset + } + + tbl := table.Channels + _, err := tbl.UPDATE(). + SET( + tbl.Offset.SET(Int64(int64(c.OffsetID))), + ). + WHERE(tbl.UUID.EQ(Int64(c.UUID))). + ExecContext(ctx, db) + if err != nil { + return errors.Wrap(err, "update channel") + } + + return nil +} + func (c *DBChannel) Update(ctx context.Context, msgID int) error { if c.MinID < msgID { c.MinID = msgID } + if c.OffsetID < msgID { + c.OffsetID = msgID + } + tbl := table.Channels _, err := tbl.UPDATE(). SET( tbl.MinID.SET(Int64(int64(c.MinID))), + tbl.Offset.SET(Int64(int64(c.OffsetID))), ). WHERE(tbl.UUID.EQ(Int64(c.UUID))). ExecContext(ctx, db) diff --git a/main.go b/main.go index 8d5ae0a..5e25a3e 100644 --- a/main.go +++ b/main.go @@ -43,9 +43,8 @@ func main() { rootCmd.AddCommand( internal.LoginCmd(), - internal.ExportCmd(), internal.PublishCmd(), - internal.WatchCmd(), + internal.ChannelCmd(), ) // rootCmd.SilenceErrors = true