Files
tg_exporter/internal/client_channel.go
2024-09-06 18:14:26 +08:00

274 lines
8.0 KiB
Go

package internal
import (
"context"
"mime"
"os"
"path/filepath"
"strings"
"exporter/pkg/errorx"
"github.com/dustin/go-humanize"
"github.com/gotd/td/telegram/downloader"
"github.com/gotd/td/tg"
"github.com/pkg/errors"
"github.com/samber/lo"
"go.uber.org/zap"
)
func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel, onlyAddChannel bool) error {
inputPeer := &tg.InputPeerChannel{ChannelID: channel.ID, AccessHash: channel.AccessHash}
limit := 50
request := &tg.MessagesGetHistoryRequest{
Peer: inputPeer,
Limit: limit,
}
request.OffsetID = cfg.MinID + limit
request.MinID = cfg.MinID
logger.Info("get channel message history", zap.Int("min_id", request.MinID), zap.Int("offset", request.OffsetID), zap.Int("limit", request.Limit))
history, err := t.Client.API().MessagesGetHistory(ctx, request)
if err != nil {
return errors.Wrap(err, "messages.getHistory")
}
messages := history.(*tg.MessagesChannelMessages).GetMessages()
if len(messages) == 0 {
if cfg.OffsetID > 0 && request.OffsetID < cfg.OffsetID {
logger.Info("no new message, but update config", zap.Int64("channel", channel.ID), zap.Int("offset", request.OffsetID))
if err := cfg.Update(ctx, request.OffsetID); err != nil {
logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
return nil
}
logger.Info("no new message", zap.Int64("channel", channel.ID))
return errorx.ErrNoNewMessage
}
if onlyAddChannel {
return cfg.SetOffset(ctx, messages[0].GetID())
}
messages = lo.Reverse(messages)
for _, item := range messages {
switch item.(type) {
case *tg.MessageEmpty:
logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", item.GetID()))
if err := cfg.Update(ctx, item.GetID()); err != nil {
logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
continue
case *tg.MessageService:
logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", item.GetID()))
if err := cfg.Update(ctx, item.GetID()); err != nil {
logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
continue
}
msg, ok := item.(*tg.Message)
if !ok {
logger.Error("convert msg to *tg.Message failed", zap.Int64("channel", channel.ID))
logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", item.GetID()))
if err := cfg.Update(ctx, item.GetID()); err != nil {
logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
continue
}
channelMessage := NewChannelMessage(msg.ID, msg.GetDate())
channelMessage.WithMessage(msg.GetMessage())
if grpID, ok := msg.GetGroupedID(); ok {
channelMessage.WithGroupID(grpID)
}
mediaClass, ok := msg.GetMedia()
if ok {
switch mediaClass.(type) {
case *tg.MessageMediaDocument:
if docClass, ok := mediaClass.(*tg.MessageMediaDocument).GetDocument(); ok && cfg.ExportMedia {
// logger.Warn("document",
// zap.Int64("channel", channel.ID),
// zap.Int("msg_id", msg.ID),
// zap.String("file_name", docClass.String()),
// )
doc := docClass.(*tg.Document)
if doc.GetSize() > int64(t.Config.GetMaxSize()) {
logger.Warn(
"document size too large",
zap.Int64("channel", channel.ID),
zap.Int64("size", doc.GetSize()),
zap.String("SizeHuman", humanize.Bytes(uint64(doc.GetSize()))),
zap.String("Limit", t.Config.MaxSize),
)
break
}
data, err := t.saveDocument(ctx, cfg, doc)
if err != nil {
logger.Error("save document failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
channelMessage.WithDocument(doc.GetID(), data)
}
case *tg.MessageMediaWebPage:
page, ok := mediaClass.(*tg.MessageMediaWebPage).GetWebpage().(*tg.WebPage)
if ok {
channelMessage.WithWebPage(page.GetID(), page.Title, page.URL)
} else {
logger.Warn("web_page",
zap.Int64("channel", channel.ID),
zap.Int64("msg_id", page.GetID()),
zap.String("url", mediaClass.(*tg.MessageMediaWebPage).GetWebpage().String()),
)
}
case *tg.MessageMediaPhoto:
if photoClass, ok := mediaClass.(*tg.MessageMediaPhoto).GetPhoto(); ok && cfg.ExportMedia {
photo := photoClass.(*tg.Photo)
if err := t.savePhoto(ctx, cfg, photo); err != nil {
logger.Error("save photo failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
channelMessage.WithPhoto(photo.GetID(), "jpg")
}
}
}
logger.Info("save message", zap.Int("msg_id", channelMessage.ID))
if err := cfg.SaveMessage(ctx, channelMessage); err != nil {
logger.Error("save message failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", channelMessage.ID))
if err := cfg.Update(ctx, item.GetID()); err != nil {
logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID))
return err
}
}
return nil
}
// savePhoto
func (t *TClient) savePhoto(ctx context.Context, cfg *DBChannel, photo *tg.Photo) error {
thumbSize := ""
if len(photo.Sizes) > 1 {
thumbSize = photo.Sizes[len(photo.Sizes)-1].GetType()
}
location := &tg.InputPhotoFileLocation{
ID: photo.GetID(),
AccessHash: photo.GetAccessHash(),
FileReference: photo.GetFileReference(),
ThumbSize: thumbSize,
}
saveTo := cfg.Asset(photo.GetID(), "jpg")
downloader := downloader.NewDownloader()
_, err := downloader.Download(t.Client.API(), location).ToPath(ctx, saveTo)
if err != nil {
os.Remove(saveTo)
logger.Error("download failed", zap.Error(err))
return err
}
logger.Info("download photo success", zap.String("location", saveTo))
return nil
}
func (t *TClient) getExtByFilenameAndMimeType(filename, mimeType string) string {
ext := ""
if filename != "" && strings.Contains(filename, ".") {
ext = filepath.Ext(filename)
if ext != "" {
return strings.ToLower(ext)
}
}
exts, err := mime.ExtensionsByType(mimeType)
if err != nil {
logger.Error("get extension failed", zap.Error(err), zap.String("mime_type", mimeType))
return ext
}
if len(exts) == 0 {
logger.Warn("no extension found", zap.String("mime_type", mimeType))
switch mimeType {
case "application/rar":
ext = ".rar"
}
} else {
ext = exts[len(exts)-1]
}
return strings.ToLower(ext)
}
// saveDocument
func (t *TClient) saveDocument(ctx context.Context, cfg *DBChannel, doc *tg.Document) (ChannelMessageDocument, error) {
location := &tg.InputDocumentFileLocation{
ID: doc.GetID(),
AccessHash: doc.GetAccessHash(),
FileReference: doc.GetFileReference(),
}
docAttr := doc.GetAttributes()
ext := t.getExtByFilenameAndMimeType("", doc.GetMimeType())
data := ChannelMessageDocument{
Ext: ext,
MimeType: doc.GetMimeType(),
Size: doc.GetSize(),
}
if len(docAttr) > 0 {
for _, attr := range docAttr {
switch attr.(type) {
case *tg.DocumentAttributeFilename:
data.Filename = attr.(*tg.DocumentAttributeFilename).GetFileName()
data.Ext = t.getExtByFilenameAndMimeType(data.Filename, doc.GetMimeType())
ext = data.Ext
case *tg.DocumentAttributeVideo:
m := attr.(*tg.DocumentAttributeVideo)
data.Video = &ChannelMessageDocumentVideo{
Duration: m.GetDuration(),
Width: m.GetW(),
Height: m.GetH(),
}
}
}
}
saveTo := cfg.Asset(doc.GetID(), ext)
downloader := downloader.NewDownloader()
logger.Info("downloading document", zap.String("size", humanize.Bytes(uint64(doc.GetSize()))), zap.String("location", saveTo), zap.String("mime", doc.GetMimeType()))
_, err := downloader.Download(t.Client.API(), location).ToPath(ctx, saveTo)
if err != nil {
os.Remove(saveTo)
logger.Error("download failed", zap.Error(err))
return ChannelMessageDocument{}, err
}
logger.Info("download document success", zap.String("location", saveTo), zap.Any("document", data))
return data, nil
}