fix: offset

This commit is contained in:
Rogee
2024-09-04 18:57:58 +08:00
parent c34547a3f1
commit e6d808934d
4 changed files with 19 additions and 32 deletions

View File

@@ -11,36 +11,35 @@ import (
"github.com/gotd/td/telegram/downloader" "github.com/gotd/td/telegram/downloader"
"github.com/gotd/td/tg" "github.com/gotd/td/tg"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel, modeHistory bool) error { func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel) error {
inputPeer := &tg.InputPeerChannel{ChannelID: channel.ID, AccessHash: channel.AccessHash} inputPeer := &tg.InputPeerChannel{ChannelID: channel.ID, AccessHash: channel.AccessHash}
limit := 50
request := &tg.MessagesGetHistoryRequest{ request := &tg.MessagesGetHistoryRequest{
Peer: inputPeer, Peer: inputPeer,
Limit: 10, Limit: limit,
} }
if modeHistory { // 提供此ID供遍历历史消息 request.OffsetID = cfg.MinID + limit
request.OffsetID = cfg.Offset request.MinID = cfg.MinID
} else {
request.MinID = cfg.MinID // 提供此ID供新增加的消息
}
// request.OffsetID = 1339 history, err := t.Client.API().MessagesGetHistory(ctx, request)
// request.Limit = 1
messages, err := t.Client.API().MessagesGetHistory(ctx, request)
if err != nil { if err != nil {
return errors.Wrap(err, "messages.getHistory") return errors.Wrap(err, "messages.getHistory")
} }
if len(messages.(*tg.MessagesChannelMessages).GetMessages()) == 0 { if len(history.(*tg.MessagesChannelMessages).GetMessages()) == 0 {
logger.Info("no new message", zap.Int64("channel", channel.ID)) logger.Info("no new message", zap.Int64("channel", channel.ID))
return errorx.ErrNoNewMessage return errorx.ErrNoNewMessage
} }
for _, item := range messages.(*tg.MessagesChannelMessages).GetMessages() { messages := history.(*tg.MessagesChannelMessages).GetMessages()
messages = lo.Reverse(messages)
for _, item := range messages {
switch item.(type) { switch item.(type) {
case *tg.MessageEmpty: case *tg.MessageEmpty:
continue continue
@@ -82,6 +81,7 @@ func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChann
zap.Int64("channel", channel.ID), zap.Int64("channel", channel.ID),
zap.Int64("size", doc.GetSize()), zap.Int64("size", doc.GetSize()),
zap.String("SizeHuman", humanize.Bytes(uint64(doc.GetSize()))), zap.String("SizeHuman", humanize.Bytes(uint64(doc.GetSize()))),
zap.String("Limit", t.Config.MaxSize),
) )
break break
} }
@@ -120,7 +120,7 @@ func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChann
return err return err
} }
logger.Info("update config", zap.Int("offset", cfg.Offset), zap.Int64("channel", channel.ID)) logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", item.GetID()))
if err := cfg.Update(ctx, item.GetID()); err != nil { if err := cfg.Update(ctx, item.GetID()); err != nil {
logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID)) logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err return err

View File

@@ -13,7 +13,6 @@ import (
var ( var (
channelID int64 channelID int64
modeHistory bool
channelAlias string channelAlias string
) )
@@ -30,7 +29,6 @@ func ExportCmd() *cobra.Command {
} }
cmd.Flags().Int64Var(&channelID, "channel", 0, "channel id") cmd.Flags().Int64Var(&channelID, "channel", 0, "channel id")
cmd.Flags().BoolVar(&modeHistory, "history", false, "history mode")
cmd.Flags().StringVar(&channelAlias, "alias", "", "channel alias") cmd.Flags().StringVar(&channelAlias, "alias", "", "channel alias")
return cmd return cmd
@@ -66,7 +64,7 @@ func exportCmd(ctx context.Context) error {
// https://t.me/yunpanshare/37426 // https://t.me/yunpanshare/37426
for { for {
if err := client.Channel(ctx, channel, cfg, modeHistory); err != nil { if err := client.Channel(ctx, channel, cfg); err != nil {
return err return err
} }
} }

View File

@@ -66,7 +66,7 @@ func watchCmd(ctx context.Context) error {
} }
for { for {
if err := client.Channel(ctx, channel, cfg, modeHistory); err != nil { if err := client.Channel(ctx, channel, cfg); err != nil {
if errors.Is(err, errorx.ErrNoNewMessage) { if errors.Is(err, errorx.ErrNoNewMessage) {
logger.Info("no new message", zap.Int64("uuid", ch.UUID)) logger.Info("no new message", zap.Int64("uuid", ch.UUID))
} else { } else {

View File

@@ -23,7 +23,6 @@ type DBChannel struct {
UUID int64 UUID int64
Username string Username string
Title string Title string
Offset int
MinID int MinID int
} }
@@ -86,28 +85,18 @@ func (c *DBChannel) Get(ctx context.Context) error {
} }
} }
c.MinID = int(m.MinID) c.MinID = int(m.MinID)
c.Offset = int(m.Offset)
return nil return nil
} }
func (c *DBChannel) Update(ctx context.Context, offsetID int) error { func (c *DBChannel) Update(ctx context.Context, msgID int) error {
if c.Offset == 0 { if c.MinID < msgID {
c.Offset = offsetID c.MinID = msgID
}
if c.Offset > offsetID {
c.Offset = offsetID
}
if c.MinID < offsetID {
c.MinID = offsetID
} }
tbl := table.Channels tbl := table.Channels
_, err := tbl.UPDATE(). _, err := tbl.UPDATE().
SET( SET(
tbl.Offset.SET(Int64(int64(c.Offset))),
tbl.MinID.SET(Int64(int64(c.MinID))), tbl.MinID.SET(Int64(int64(c.MinID))),
). ).
WHERE(tbl.UUID.EQ(Int64(c.UUID))). WHERE(tbl.UUID.EQ(Int64(c.UUID))).