feat: add cmds
This commit is contained in:
@@ -16,10 +16,10 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel) error {
|
||||
func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel, onlyAddChannel bool) error {
|
||||
inputPeer := &tg.InputPeerChannel{ChannelID: channel.ID, AccessHash: channel.AccessHash}
|
||||
|
||||
limit := 50
|
||||
limit := 20
|
||||
request := &tg.MessagesGetHistoryRequest{
|
||||
Peer: inputPeer,
|
||||
Limit: limit,
|
||||
@@ -32,12 +32,21 @@ func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChann
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "messages.getHistory")
|
||||
}
|
||||
if len(history.(*tg.MessagesChannelMessages).GetMessages()) == 0 {
|
||||
|
||||
messages := history.(*tg.MessagesChannelMessages).GetMessages()
|
||||
if len(messages) == 0 {
|
||||
if cfg.OffsetID > 0 && request.OffsetID < cfg.OffsetID {
|
||||
cfg.Update(ctx, request.OffsetID)
|
||||
return nil
|
||||
}
|
||||
logger.Info("no new message", zap.Int64("channel", channel.ID))
|
||||
return errorx.ErrNoNewMessage
|
||||
}
|
||||
|
||||
messages := history.(*tg.MessagesChannelMessages).GetMessages()
|
||||
if onlyAddChannel {
|
||||
return cfg.SetOffset(ctx, messages[0].GetID())
|
||||
}
|
||||
|
||||
messages = lo.Reverse(messages)
|
||||
|
||||
for _, item := range messages {
|
||||
|
||||
20
internal/cmd_channel.go
Normal file
20
internal/cmd_channel.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package internal
|
||||
|
||||
import "github.com/spf13/cobra"
|
||||
|
||||
func ChannelCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "channel",
|
||||
Aliases: []string{"ch"},
|
||||
Short: "channel commands",
|
||||
}
|
||||
|
||||
cmd.AddCommand(
|
||||
ChannelAddCmd(),
|
||||
ChannelListCmd(),
|
||||
ChannelExportCmd(),
|
||||
ChannelWatchCmd(),
|
||||
)
|
||||
|
||||
return cmd
|
||||
}
|
||||
@@ -14,27 +14,31 @@ import (
|
||||
var (
|
||||
channelID int64
|
||||
channelAlias string
|
||||
exportMedia bool
|
||||
)
|
||||
|
||||
func ExportCmd() *cobra.Command {
|
||||
func ChannelAddCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "export",
|
||||
Short: "export channels",
|
||||
Use: "add",
|
||||
Aliases: []string{"a"},
|
||||
Short: "add a new channel",
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
log.Println("init client")
|
||||
defer log.Println("init client done")
|
||||
return InitClient(config.C)
|
||||
},
|
||||
RunE: wrapE(exportCmd),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return channelAddCmd(context.Background())
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().Int64Var(&channelID, "channel", 0, "channel id")
|
||||
cmd.Flags().StringVar(&channelAlias, "alias", "", "channel alias")
|
||||
cmd.Flags().BoolVar(&exportMedia, "media", false, "export media")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func exportCmd(ctx context.Context) error {
|
||||
func channelAddCmd(ctx context.Context) error {
|
||||
if channelID == 0 && channelAlias == "" {
|
||||
return errors.New("channel id or alias is required")
|
||||
}
|
||||
@@ -58,14 +62,10 @@ func exportCmd(ctx context.Context) error {
|
||||
}
|
||||
|
||||
cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title)
|
||||
if err := cfg.Get(ctx); err != nil {
|
||||
cfg.ExportMedia = exportMedia
|
||||
if err := cfg.GetOrCreate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// https://t.me/yunpanshare/37426
|
||||
for {
|
||||
if err := client.Channel(ctx, channel, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return client.Channel(ctx, channel, cfg, true)
|
||||
}
|
||||
69
internal/cmd_channel_export.go
Normal file
69
internal/cmd_channel_export.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"exporter/config"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var dbChannelID int64
|
||||
|
||||
func ChannelExportCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "export",
|
||||
Aliases: []string{"e"},
|
||||
Short: "export channels",
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
log.Println("init client")
|
||||
defer log.Println("init client done")
|
||||
return InitClient(config.C)
|
||||
},
|
||||
RunE: wrapE(channelExportCmd),
|
||||
}
|
||||
|
||||
cmd.Flags().Int64Var(&dbChannelID, "channel", 0, "channel id")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func channelExportCmd(ctx context.Context) error {
|
||||
if dbChannelID == 0 {
|
||||
return errors.New("db channel id required")
|
||||
}
|
||||
|
||||
channel, err := client.ChannelInfoByID(ctx, channelID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if channel.GetID() == 0 {
|
||||
return errors.New("channel not found")
|
||||
}
|
||||
|
||||
cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title)
|
||||
if err := cfg.GetOrCreate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var continueRetryTimes int
|
||||
for {
|
||||
if err := client.Channel(ctx, channel, cfg, false); err != nil {
|
||||
logger.Error("failed to export channel", zap.Error(err), zap.Int64("uuid", channel.GetID()))
|
||||
continueRetryTimes++
|
||||
sleepSeconds := math.Pow(2, float64(continueRetryTimes))
|
||||
if sleepSeconds > 128 {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("retry after", zap.Int("seconds", int(sleepSeconds)))
|
||||
time.Sleep(time.Second * time.Duration(sleepSeconds))
|
||||
}
|
||||
}
|
||||
}
|
||||
50
internal/cmd_channel_list.go
Normal file
50
internal/cmd_channel_list.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"exporter/database/telegram_resource/public/model"
|
||||
"exporter/database/telegram_resource/public/table"
|
||||
|
||||
tablePrinter "github.com/jedib0t/go-pretty/v6/table"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func ChannelListCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "list",
|
||||
Aliases: []string{"ls"},
|
||||
Short: "list all channels",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return channelListCmd(context.Background())
|
||||
},
|
||||
}
|
||||
return cmd
|
||||
}
|
||||
|
||||
func channelListCmd(ctx context.Context) error {
|
||||
var channels []model.Channels
|
||||
tbl := table.Channels
|
||||
if err := tbl.SELECT(tbl.AllColumns).ORDER_BY(tbl.ID.ASC()).QueryContext(ctx, db, &channels); err != nil {
|
||||
logger.Fatal("failed to get channels", zap.Error(err))
|
||||
}
|
||||
|
||||
t := tablePrinter.NewWriter()
|
||||
t.SetOutputMirror(os.Stdout)
|
||||
t.AppendHeader(tablePrinter.Row{"Channel ID", "Min ID", "Offset", "Export Media", "PK", "Title"})
|
||||
|
||||
for _, ch := range channels {
|
||||
t.AppendRow([]interface{}{
|
||||
ch.UUID,
|
||||
ch.MinID,
|
||||
ch.Offset,
|
||||
ch.ExportMedia,
|
||||
ch.ID,
|
||||
ch.Title,
|
||||
})
|
||||
}
|
||||
t.Render()
|
||||
return nil
|
||||
}
|
||||
@@ -15,22 +15,23 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func WatchCmd() *cobra.Command {
|
||||
func ChannelWatchCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "watch",
|
||||
Short: "watch channels",
|
||||
Use: "watch",
|
||||
Aliases: []string{"w"},
|
||||
Short: "watch channels",
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
log.Println("init client")
|
||||
defer log.Println("init client done")
|
||||
return InitClient(config.C)
|
||||
},
|
||||
RunE: wrapE(watchCmd),
|
||||
RunE: wrapE(channelWatchCmd),
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func watchCmd(ctx context.Context) error {
|
||||
func channelWatchCmd(ctx context.Context) error {
|
||||
tbl := table.Channels
|
||||
|
||||
var lastTravelAt time.Time
|
||||
@@ -60,13 +61,13 @@ func watchCmd(ctx context.Context) error {
|
||||
}
|
||||
|
||||
cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title)
|
||||
if err := cfg.Get(ctx); err != nil {
|
||||
if err := cfg.GetOrCreate(ctx); err != nil {
|
||||
logger.Error("failed to get channel config", zap.Error(err), zap.Int64("uuid", ch.UUID))
|
||||
continue
|
||||
}
|
||||
|
||||
for {
|
||||
if err := client.Channel(ctx, channel, cfg); err != nil {
|
||||
if err := client.Channel(ctx, channel, cfg, false); err != nil {
|
||||
if errors.Is(err, errorx.ErrNoNewMessage) {
|
||||
logger.Info("no new message", zap.Int64("uuid", ch.UUID))
|
||||
} else {
|
||||
@@ -23,10 +23,30 @@ type DBChannel struct {
|
||||
UUID int64
|
||||
Username string
|
||||
Title string
|
||||
OffsetID int
|
||||
MinID int
|
||||
ExportMedia bool
|
||||
}
|
||||
|
||||
func NewDBChannelFromDB(id int64) (*DBChannel, error) {
|
||||
var m model.Channels
|
||||
|
||||
tbl := table.Channels
|
||||
err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.ID.EQ(Int64(id))).QueryContext(context.Background(), db, &m)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "find channel by id(%d) failed", id)
|
||||
}
|
||||
|
||||
return &DBChannel{
|
||||
UUID: m.UUID,
|
||||
Username: m.Username,
|
||||
Title: m.Title,
|
||||
OffsetID: int(m.Offset),
|
||||
MinID: int(m.MinID),
|
||||
ExportMedia: m.ExportMedia,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewDBChannel(uuid int64, username, title string) *DBChannel {
|
||||
if uuid == 0 {
|
||||
panic("channel id is required")
|
||||
@@ -59,7 +79,7 @@ func (c *DBChannel) Asset(assetID int64, ext string) string {
|
||||
return assetFile
|
||||
}
|
||||
|
||||
func (c *DBChannel) Get(ctx context.Context) error {
|
||||
func (c *DBChannel) GetOrCreate(ctx context.Context) error {
|
||||
tbl := table.Channels
|
||||
|
||||
var m model.Channels
|
||||
@@ -74,7 +94,7 @@ func (c *DBChannel) Get(ctx context.Context) error {
|
||||
Title: c.Title,
|
||||
MinID: 0,
|
||||
Offset: 0,
|
||||
ExportMedia: false,
|
||||
ExportMedia: c.ExportMedia,
|
||||
CreatedAt: lo.ToPtr(time.Now()),
|
||||
UpdatedAt: lo.ToPtr(time.Now()),
|
||||
}
|
||||
@@ -87,20 +107,45 @@ func (c *DBChannel) Get(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
c.MinID = int(m.MinID)
|
||||
c.OffsetID = int(m.Offset)
|
||||
c.ExportMedia = m.ExportMedia
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DBChannel) SetOffset(ctx context.Context, offset int) error {
|
||||
if c.OffsetID < offset {
|
||||
c.OffsetID = offset
|
||||
}
|
||||
|
||||
tbl := table.Channels
|
||||
_, err := tbl.UPDATE().
|
||||
SET(
|
||||
tbl.Offset.SET(Int64(int64(c.OffsetID))),
|
||||
).
|
||||
WHERE(tbl.UUID.EQ(Int64(c.UUID))).
|
||||
ExecContext(ctx, db)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "update channel")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DBChannel) Update(ctx context.Context, msgID int) error {
|
||||
if c.MinID < msgID {
|
||||
c.MinID = msgID
|
||||
}
|
||||
|
||||
if c.OffsetID < msgID {
|
||||
c.OffsetID = msgID
|
||||
}
|
||||
|
||||
tbl := table.Channels
|
||||
_, err := tbl.UPDATE().
|
||||
SET(
|
||||
tbl.MinID.SET(Int64(int64(c.MinID))),
|
||||
tbl.Offset.SET(Int64(int64(c.OffsetID))),
|
||||
).
|
||||
WHERE(tbl.UUID.EQ(Int64(c.UUID))).
|
||||
ExecContext(ctx, db)
|
||||
|
||||
Reference in New Issue
Block a user