diff --git a/internal/cmd_publish.go b/internal/cmd_publish.go index 55139e9..f1e20f6 100644 --- a/internal/cmd_publish.go +++ b/internal/cmd_publish.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "fmt" + "log" "os" "path/filepath" + "time" "exporter/config" "exporter/database/telegram_resource/public/model" @@ -22,7 +24,14 @@ func PublishCmd() *cobra.Command { Use: "publish", Short: "publish posts", RunE: func(cmd *cobra.Command, args []string) error { - return publishCmd(context.Background()) + for { + if err := publishCmd(context.Background()); err != nil { + log.Println("ERROR: ", err) + return err + } + time.Sleep(time.Second) + } + return nil }, } @@ -39,9 +48,19 @@ func publishCmd(ctx context.Context) error { tbl := table.ChannelMessages tblC := table.Channels - stmt := tbl.SELECT(tbl.AllColumns, tblC.Title). - WHERE(tbl.Published.IS_FALSE()). + var maxIDResult struct { + MaxID int64 + } + stmt := tbl.SELECT(MAX(tbl.ID).AS("maxID")).FROM(tbl) + + if err := stmt.QueryContext(context.Background(), db, &maxIDResult); err != nil { + return errors.Wrap(err, "failed to get max id") + } + + stmt = tbl.SELECT(tbl.AllColumns, tblC.Title). + WHERE(tbl.Published.IS_FALSE().AND(tbl.ID.NOT_EQ(Int(maxIDResult.MaxID)))). LIMIT(1). + ORDER_BY(tbl.UUID.ASC()). FROM(tbl.LEFT_JOIN(tblC, tbl.ChannelID.EQ(tblC.UUID))) if err := stmt.QueryContext(context.Background(), db, &msg); err != nil { return errors.Wrap(err, "failed to get message") @@ -57,6 +76,7 @@ func publishCmd(ctx context.Context) error { return err } + log.Println("published: ", msg.ChannelMessages.ID) return nil } @@ -90,7 +110,7 @@ func publish(_ context.Context, msg publishMsg) error { } else if media.WebPage != nil { data.Content += fmt.Sprintf("\n\n[%s](%s)", media.WebPage.Title, media.WebPage.URL) } else { - return errors.New("unknown media type") + continue } filepath := filepath.Join(config.C.Output, file) diff --git a/internal/common.go b/internal/common.go index 35cdfc9..679be1d 100644 --- a/internal/common.go +++ b/internal/common.go @@ -17,7 +17,7 @@ var ( logger *zap.Logger ) -func InitClient(cfg *config.Config) error { +func InitLogger(cfg *config.Config) error { logWriter := zapcore.AddSync(&lj.Logger{ Filename: cfg.LogFile, MaxBackups: 3, @@ -32,6 +32,10 @@ func InitClient(cfg *config.Config) error { ) logger = zap.New(logCore) + return nil +} + +func InitClient(cfg *config.Config) error { client = NewClient(logger, cfg) return nil } diff --git a/internal/db_channel.go b/internal/db_channel.go index 6556779..c22c307 100644 --- a/internal/db_channel.go +++ b/internal/db_channel.go @@ -120,6 +120,24 @@ func (c *DBChannel) SaveMessage(ctx context.Context, msg *ChannelMessage) error } tbl := table.ChannelMessages + createNew := func() error { + _, err := tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(message).ExecContext(ctx, db) + if err != nil { + if e, ok := err.(*pq.Error); ok { + if e.Code == "23505" { + return nil + } + } + return errors.Wrap(err, "insert message") + } + return nil + } + if msg.GroupID > 0 { + if err := createNew(); err != nil { + return err + } + return nil + } cond := tbl.GroupID.EQ(Int(message.GroupID)).AND( tbl.ChannelID.EQ(Int(message.ChannelID)), @@ -134,16 +152,7 @@ func (c *DBChannel) SaveMessage(ctx context.Context, msg *ChannelMessage) error if err != nil { // 如果没有找到记录,那么插入新记录 if errors.Is(err, qrm.ErrNoRows) { - _, err = tbl.INSERT(tbl.AllColumns.Except(tbl.ID)).MODEL(message).ExecContext(ctx, db) - if err != nil { - if e, ok := err.(*pq.Error); ok { - if e.Code == "23505" { - return nil - } - } - return errors.Wrap(err, "insert message") - } - return nil + return createNew() } return errors.Wrap(err, "select message") } diff --git a/internal/db_channel_test.go b/internal/db_channel_test.go index 74b7384..2069dc3 100644 --- a/internal/db_channel_test.go +++ b/internal/db_channel_test.go @@ -8,6 +8,7 @@ import ( "exporter/database/telegram_resource/public/model" "exporter/database/telegram_resource/public/table" + . "github.com/go-jet/jet/v2/postgres" "github.com/samber/lo" ) @@ -77,3 +78,38 @@ func Test_Join(t *testing.T) { t.Logf("%+v", msg) } + +func Test_ExceptMax(t *testing.T) { + dsn := "postgresql://postgres:xixi0202@10.1.1.3:5432/telegram_resource?sslmode=disable" + if err := InitDB(dsn); err != nil { + t.Error(err) + } + + tbl := table.ChannelMessages + tblC := table.Channels + + var result struct { + MaxID int64 + } + stmt := tbl.SELECT(MAX(tbl.ID).AS("maxID")).FROM(tbl) + t.Log(stmt.DebugSql()) + + if err := stmt.QueryContext(context.Background(), db, &result); err != nil { + t.Error(err) + } + t.Log(result) + + stmt = tbl.SELECT(tbl.AllColumns, tblC.Title). + WHERE(tbl.Published.IS_FALSE().AND(tbl.ID.NOT_EQ(Int(result.MaxID)))). + LIMIT(1). + ORDER_BY(tbl.ID.DESC()). + FROM(tbl.LEFT_JOIN(tblC, tbl.ChannelID.EQ(tblC.UUID))) + t.Log(stmt.DebugSql()) + + var msg publishMsg + if err := stmt.QueryContext(context.Background(), db, &msg); err != nil { + t.Error(err) + } + + t.Log(msg.ChannelMessages.ID) +} diff --git a/main.go b/main.go index 806fba8..8d5ae0a 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,11 @@ func main() { return errors.Wrap(err, "load config") } + log.Println("init logger") + if err := internal.InitLogger(config.C); err != nil { + return errors.Wrap(err, "init db") + } + log.Println("init db") if err := internal.InitDB(config.C.DSN); err != nil { return errors.Wrap(err, "init db")