feat: update
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -26,4 +26,4 @@ log.json
|
|||||||
log-*.json
|
log-*.json
|
||||||
session.json
|
session.json
|
||||||
tdl-export.json
|
tdl-export.json
|
||||||
./exporter
|
exporter
|
||||||
@@ -17,6 +17,8 @@ type Config struct {
|
|||||||
MaxSize string `mapstructure:"max_size"`
|
MaxSize string `mapstructure:"max_size"`
|
||||||
DSN string `mapstructure:"dsn"`
|
DSN string `mapstructure:"dsn"`
|
||||||
Output string `mapstructure:"output"`
|
Output string `mapstructure:"output"`
|
||||||
|
PublishHost string `mapstructure:"publish_host"`
|
||||||
|
PublishToken string `mapstructure:"publish_token"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMaxSize
|
// GetMaxSize
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import (
|
|||||||
"mime"
|
"mime"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"exporter/pkg/errorx"
|
||||||
|
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
"github.com/gotd/td/telegram/downloader"
|
"github.com/gotd/td/telegram/downloader"
|
||||||
"github.com/gotd/td/tg"
|
"github.com/gotd/td/tg"
|
||||||
@@ -35,7 +37,7 @@ func (t *TClient) Channel(ctx context.Context, channel *tg.Channel, cfg *DBChann
|
|||||||
}
|
}
|
||||||
if len(messages.(*tg.MessagesChannelMessages).GetMessages()) == 0 {
|
if len(messages.(*tg.MessagesChannelMessages).GetMessages()) == 0 {
|
||||||
logger.Info("no new message", zap.Int64("channel", channel.ID))
|
logger.Info("no new message", zap.Int64("channel", channel.ID))
|
||||||
return errors.New("no new message")
|
return errorx.ErrNoNewMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range messages.(*tg.MessagesChannelMessages).GetMessages() {
|
for _, item := range messages.(*tg.MessagesChannelMessages).GetMessages() {
|
||||||
|
|||||||
@@ -3,6 +3,13 @@ package internal
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"exporter/config"
|
||||||
|
"exporter/database/telegram_resource/public/model"
|
||||||
|
"exporter/database/telegram_resource/public/table"
|
||||||
|
"exporter/pkg/memos"
|
||||||
|
|
||||||
|
. "github.com/go-jet/jet/v2/postgres"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,5 +24,35 @@ func PublishCmd() *cobra.Command {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func publishCmd(ctx context.Context) error {
|
func publishCmd(ctx context.Context) error {
|
||||||
|
var msg model.ChannelMessages
|
||||||
|
|
||||||
|
tbl := table.ChannelMessages
|
||||||
|
err := tbl.SELECT(tbl.AllColumns).WHERE(tbl.Published.IS_TRUE()).LIMIT(1).QueryContext(ctx, db, &msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// publish item
|
||||||
|
if err := publish(ctx, msg); err != nil {
|
||||||
|
return errors.Wrap(err, "failed to publish")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tbl.UPDATE().SET(tbl.Published.SET(Bool(true))).WHERE(tbl.ID.EQ(Int(msg.ID))).ExecContext(ctx, db)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func publish(ctx context.Context, msg model.ChannelMessages) error {
|
||||||
|
data := memos.PostData{
|
||||||
|
Host: config.C.PublishHost,
|
||||||
|
Token: config.C.PublishToken,
|
||||||
|
Content: *msg.Content,
|
||||||
|
ChannelID: msg.ChannelID,
|
||||||
|
ChannelTitle: "",
|
||||||
|
Resources: []memos.Resource{},
|
||||||
|
}
|
||||||
|
return memos.Post(data)
|
||||||
|
}
|
||||||
|
|||||||
73
internal/cmd_watch.go
Normal file
73
internal/cmd_watch.go
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"exporter/database/telegram_resource/public/model"
|
||||||
|
"exporter/database/telegram_resource/public/table"
|
||||||
|
"exporter/pkg/errorx"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func WatchCmd() *cobra.Command {
|
||||||
|
cmd := &cobra.Command{
|
||||||
|
Use: "watch",
|
||||||
|
Short: "watch channels",
|
||||||
|
RunE: wrapE(watchCmd),
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func watchCmd(ctx context.Context) error {
|
||||||
|
tbl := table.Channels
|
||||||
|
|
||||||
|
var lastTravelAt time.Time
|
||||||
|
for {
|
||||||
|
if time.Since(lastTravelAt) < 5*time.Minute {
|
||||||
|
time.Sleep(time.Minute)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lastTravelAt = time.Now()
|
||||||
|
var channels []model.Channels
|
||||||
|
if err := tbl.SELECT(tbl.AllColumns).QueryContext(ctx, db, &channels); err != nil {
|
||||||
|
logger.Fatal("failed to get channels", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("pending channels", zap.Int("count", len(channels)))
|
||||||
|
for _, ch := range channels {
|
||||||
|
logger.Info("crawl channel", zap.String("channel", ch.Title), zap.Int64("uuid", ch.UUID))
|
||||||
|
|
||||||
|
channel, err := client.ChannelInfoByID(ctx, ch.UUID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("failed to get channel info", zap.Error(err), zap.Int64("uuid", ch.UUID))
|
||||||
|
}
|
||||||
|
|
||||||
|
if channel.GetID() == 0 {
|
||||||
|
return errors.New("channel not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := NewDBChannel(channel.GetID(), channel.Username, channel.Title)
|
||||||
|
if err := cfg.Get(ctx); err != nil {
|
||||||
|
logger.Error("failed to get channel config", zap.Error(err), zap.Int64("uuid", ch.UUID))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err := client.Channel(ctx, channel, cfg, modeHistory); err != nil {
|
||||||
|
if errors.Is(err, errorx.ErrNoNewMessage) {
|
||||||
|
logger.Info("no new message", zap.Int64("uuid", ch.UUID))
|
||||||
|
} else {
|
||||||
|
logger.Error("failed to get channel messages", zap.Error(err), zap.Int64("uuid", ch.UUID))
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -92,7 +92,13 @@ func (c *DBChannel) Get(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *DBChannel) Update(ctx context.Context, offsetID int) error {
|
func (c *DBChannel) Update(ctx context.Context, offsetID int) error {
|
||||||
|
if c.Offset == 0 {
|
||||||
c.Offset = offsetID
|
c.Offset = offsetID
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Offset > offsetID {
|
||||||
|
c.Offset = offsetID
|
||||||
|
}
|
||||||
|
|
||||||
if c.MinID < offsetID {
|
if c.MinID < offsetID {
|
||||||
c.MinID = offsetID
|
c.MinID = offsetID
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"exporter/database/telegram_resource/public/table"
|
||||||
|
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -48,3 +50,18 @@ func TestDBChannel_SaveMessage(t *testing.T) {
|
|||||||
t.Logf("%+v", err)
|
t.Logf("%+v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_Join(t *testing.T) {
|
||||||
|
dsn := "postgresql://postgres:xixi0202@10.1.1.3:5432/telegram_resource?sslmode=disable"
|
||||||
|
if err := InitDB(dsn); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tblC := table.Channels
|
||||||
|
tbl := table.ChannelMessages
|
||||||
|
stmt := tbl.SELECT(tbl.AllColumns, tblC.Title).
|
||||||
|
WHERE(tbl.Published.IS_TRUE()).
|
||||||
|
LIMIT(1).
|
||||||
|
FROM(tbl.LEFT_JOIN(tblC, tbl.ChannelID.EQ(tblC.UUID)))
|
||||||
|
t.Log(stmt.DebugSql())
|
||||||
|
}
|
||||||
|
|||||||
5
pkg/errorx/err.go
Normal file
5
pkg/errorx/err.go
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
package errorx
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var ErrNoNewMessage = errors.New("no new message")
|
||||||
Reference in New Issue
Block a user