package internal import ( "context" "mime" "os" "path/filepath" "strings" "exporter/pkg/errorx" "github.com/dustin/go-humanize" "github.com/gotd/td/telegram/downloader" "github.com/gotd/td/tg" "github.com/pkg/errors" "github.com/samber/lo" "go.uber.org/zap" ) func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChannel, onlyAddChannel bool) error { inputPeer := &tg.InputPeerChannel{ChannelID: channel.ID, AccessHash: channel.AccessHash} request := &tg.MessagesGetHistoryRequest{ Peer: inputPeer, Limit: 10, } if !onlyAddChannel { request.Limit = 50 request.OffsetID = cfg.MinID + request.Limit request.MinID = cfg.MinID } logger.Info("get channel message history", zap.Int("min_id", request.MinID), zap.Int("offset", request.OffsetID), zap.Int("limit", request.Limit)) history, err := t.Client.API().MessagesGetHistory(ctx, request) if err != nil { return errors.Wrap(err, "messages.getHistory") } messages := history.(*tg.MessagesChannelMessages).GetMessages() if len(messages) == 0 { if cfg.OffsetID > 0 && request.OffsetID < cfg.OffsetID { logger.Info("no new message, but update config", zap.Int64("channel", channel.ID), zap.Int("offset", request.OffsetID)) if err := cfg.Update(ctx, request.OffsetID); err != nil { logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } return nil } logger.Info("no new message", zap.Int64("channel", channel.ID)) return errorx.ErrNoNewMessage } if onlyAddChannel { return cfg.SetOffset(ctx, messages[0].GetID()) } messages = lo.Reverse(messages) for _, item := range messages { switch item.(type) { case *tg.MessageEmpty: logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", item.GetID())) if err := cfg.Update(ctx, item.GetID()); err != nil { logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } continue case *tg.MessageService: logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", item.GetID())) if err := cfg.Update(ctx, item.GetID()); err != nil { logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } continue } msg, ok := item.(*tg.Message) if !ok { logger.Error("convert msg to *tg.Message failed", zap.Int64("channel", channel.ID)) logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", item.GetID())) if err := cfg.Update(ctx, item.GetID()); err != nil { logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } continue } channelMessage := NewChannelMessage(msg.ID, msg.GetDate()) channelMessage.WithMessage(msg.GetMessage()) if grpID, ok := msg.GetGroupedID(); ok { channelMessage.WithGroupID(grpID) } mediaClass, ok := msg.GetMedia() if ok { switch mediaClass.(type) { case *tg.MessageMediaDocument: if docClass, ok := mediaClass.(*tg.MessageMediaDocument).GetDocument(); ok && cfg.ExportMedia { // logger.Warn("document", // zap.Int64("channel", channel.ID), // zap.Int("msg_id", msg.ID), // zap.String("file_name", docClass.String()), // ) doc := docClass.(*tg.Document) if doc.GetSize() > int64(t.Config.GetMaxSize()) { logger.Warn( "document size too large", zap.Int64("channel", channel.ID), zap.Int64("size", doc.GetSize()), zap.String("SizeHuman", humanize.Bytes(uint64(doc.GetSize()))), zap.String("Limit", t.Config.MaxSize), ) break } data, err := t.saveDocument(ctx, cfg, doc) if err != nil { if !errors.Is(err, errorx.ErrResourceExpired) { logger.Error("save document failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } } channelMessage.WithDocument(doc.GetID(), data) } case *tg.MessageMediaWebPage: page, ok := mediaClass.(*tg.MessageMediaWebPage).GetWebpage().(*tg.WebPage) if ok { channelMessage.WithWebPage(page.GetID(), page.Title, page.URL) } else { logger.Warn("web_page", zap.Int64("channel", channel.ID), zap.Int64("msg_id", page.GetID()), zap.String("url", mediaClass.(*tg.MessageMediaWebPage).GetWebpage().String()), ) } case *tg.MessageMediaPhoto: if photoClass, ok := mediaClass.(*tg.MessageMediaPhoto).GetPhoto(); ok && cfg.ExportMedia { photo := photoClass.(*tg.Photo) if err := t.savePhoto(ctx, cfg, photo); err != nil { if !errors.Is(err, errorx.ErrResourceExpired) { logger.Error("save photo failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } } channelMessage.WithPhoto(photo.GetID(), "jpg") } } } logger.Info("save message", zap.Int("msg_id", channelMessage.ID)) if err := cfg.SaveMessage(ctx, channelMessage); err != nil { logger.Error("save message failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } logger.Info("update config", zap.Int64("channel", channel.ID), zap.Int("msg_id", channelMessage.ID)) if err := cfg.Update(ctx, item.GetID()); err != nil { logger.Error("update config failed", zap.Error(err), zap.Int64("channel", channel.ID)) return err } } return nil } // savePhoto func (t *TClient) savePhoto(ctx context.Context, cfg *DBChannel, photo *tg.Photo) error { thumbSize := "" if len(photo.Sizes) > 1 { thumbSize = photo.Sizes[len(photo.Sizes)-1].GetType() } location := &tg.InputPhotoFileLocation{ ID: photo.GetID(), AccessHash: photo.GetAccessHash(), FileReference: photo.GetFileReference(), ThumbSize: thumbSize, } saveTo := cfg.Asset(photo.GetID(), "jpg") downloader := downloader.NewDownloader() _, err := downloader.Download(t.Client.API(), location).ToPath(ctx, saveTo) if err != nil { os.Remove(saveTo) logger.Error("download failed", zap.Error(err)) if strings.Contains(err.Error(), "FILE_REFERENCE_EXPIRED") { return errorx.ErrResourceExpired } return err } logger.Info("download photo success", zap.String("location", saveTo)) return nil } func (t *TClient) getExtByFilenameAndMimeType(filename, mimeType string) string { ext := "" if filename != "" && strings.Contains(filename, ".") { ext = filepath.Ext(filename) if ext != "" { return strings.ToLower(ext) } } exts, err := mime.ExtensionsByType(mimeType) if err != nil { logger.Error("get extension failed", zap.Error(err), zap.String("mime_type", mimeType)) return ext } if len(exts) == 0 { logger.Warn("no extension found", zap.String("mime_type", mimeType)) switch mimeType { case "application/rar": ext = ".rar" } } else { ext = exts[len(exts)-1] } return strings.ToLower(ext) } // saveDocument func (t *TClient) saveDocument(ctx context.Context, cfg *DBChannel, doc *tg.Document) (ChannelMessageDocument, error) { location := &tg.InputDocumentFileLocation{ ID: doc.GetID(), AccessHash: doc.GetAccessHash(), FileReference: doc.GetFileReference(), } docAttr := doc.GetAttributes() ext := t.getExtByFilenameAndMimeType("", doc.GetMimeType()) data := ChannelMessageDocument{ Ext: ext, MimeType: doc.GetMimeType(), Size: doc.GetSize(), } if len(docAttr) > 0 { for _, attr := range docAttr { switch attr.(type) { case *tg.DocumentAttributeFilename: data.Filename = attr.(*tg.DocumentAttributeFilename).GetFileName() data.Ext = t.getExtByFilenameAndMimeType(data.Filename, doc.GetMimeType()) ext = data.Ext case *tg.DocumentAttributeVideo: m := attr.(*tg.DocumentAttributeVideo) data.Video = &ChannelMessageDocumentVideo{ Duration: m.GetDuration(), Width: m.GetW(), Height: m.GetH(), } } } } saveTo := cfg.Asset(doc.GetID(), ext) downloader := downloader.NewDownloader() logger.Info("downloading document", zap.String("size", humanize.Bytes(uint64(doc.GetSize()))), zap.String("location", saveTo), zap.String("mime", doc.GetMimeType())) _, err := downloader.Download(t.Client.API(), location).ToPath(ctx, saveTo) if err != nil { os.Remove(saveTo) logger.Error("download failed", zap.Error(err)) if strings.Contains(err.Error(), "FILE_REFERENCE_EXPIRED") { return ChannelMessageDocument{}, errorx.ErrResourceExpired } return ChannelMessageDocument{}, err } logger.Info("download document success", zap.String("location", saveTo), zap.Any("document", data)) return data, nil }