From 005fd9696b4e51f3ab0cab8fff88a5c287452b51 Mon Sep 17 00:00:00 2001 From: Rogee Date: Wed, 10 Sep 2025 18:41:37 +0800 Subject: [PATCH] fix migration issues --- .../project/app/srv/migrate/migrate.go.tpl | 35 ++ .../project/database/-transform.yaml.raw | 6 + .../project/database/.transform.yaml.raw | 6 - .../project/database/migrations/-gitkeep | 0 .../20140202165500_river_job.sql.tpl | 408 ------------------ 5 files changed, 41 insertions(+), 414 deletions(-) create mode 100755 templates/project/database/-transform.yaml.raw delete mode 100755 templates/project/database/.transform.yaml.raw create mode 100644 templates/project/database/migrations/-gitkeep delete mode 100644 templates/project/database/migrations/20140202165500_river_job.sql.tpl diff --git a/templates/project/app/srv/migrate/migrate.go.tpl b/templates/project/app/srv/migrate/migrate.go.tpl index fadb4d4..03e3929 100644 --- a/templates/project/app/srv/migrate/migrate.go.tpl +++ b/templates/project/app/srv/migrate/migrate.go.tpl @@ -14,6 +14,15 @@ import ( "go.ipao.vip/atom" "go.ipao.vip/atom/container" "go.uber.org/dig" + + "github.com/pressly/goose/v3" + "github.com/riverqueue/river/riverdriver/riverdatabasesql" + "github.com/riverqueue/river/rivermigrate" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "go.ipao.vip/atom" + "go.ipao.vip/atom/container" + "go.uber.org/dig" ) func defaultProviders() container.Providers { @@ -54,7 +63,33 @@ func Serve(cmd *cobra.Command, args []string) error { goose.SetBaseFS(database.MigrationFS) goose.SetTableName("migrations") + goose.AddNamedMigrationNoTxContext("0001_river_job.go", RiverUp, RiverDown) return goose.RunContext(context.Background(), action, svc.DB, "migrations", args...) }) } + +func RiverUp(ctx context.Context, db *sql.DB) error { + migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil) + if err != nil { + return err + } + + // Migrate up. An empty MigrateOpts will migrate all the way up, but + // best practice is to specify a specific target version. + _, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{}) + return err +} + +func RiverDown(ctx context.Context, db *sql.DB) error { + migrator, err := rivermigrate.New(riverdatabasesql.New(db), nil) + if err != nil { + return err + } + + // TargetVersion -1 removes River's schema completely. + _, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + TargetVersion: -1, + }) + return err +} diff --git a/templates/project/database/-transform.yaml.raw b/templates/project/database/-transform.yaml.raw new file mode 100755 index 0000000..532b54e --- /dev/null +++ b/templates/project/database/-transform.yaml.raw @@ -0,0 +1,6 @@ +ignores: +- migrations +imports: +- go.ipao.vip/gen +field_type: +field_relate: diff --git a/templates/project/database/.transform.yaml.raw b/templates/project/database/.transform.yaml.raw deleted file mode 100755 index b30c655..0000000 --- a/templates/project/database/.transform.yaml.raw +++ /dev/null @@ -1,6 +0,0 @@ -ignores: - - migrations -imports: - - go.ipao.vip/gen -field_type: -field_relate: \ No newline at end of file diff --git a/templates/project/database/migrations/-gitkeep b/templates/project/database/migrations/-gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/templates/project/database/migrations/20140202165500_river_job.sql.tpl b/templates/project/database/migrations/20140202165500_river_job.sql.tpl deleted file mode 100644 index 8d01ac6..0000000 --- a/templates/project/database/migrations/20140202165500_river_job.sql.tpl +++ /dev/null @@ -1,408 +0,0 @@ --- +goose Up --- +goose StatementBegin - --- River migration 002 [up] -CREATE TYPE river_job_state AS ENUM( - 'available', - 'cancelled', - 'completed', - 'discarded', - 'pending', - 'retryable', - 'running', - 'scheduled' -); - -CREATE TABLE river_job( - -- 8 bytes - id bigserial PRIMARY KEY, - - -- 8 bytes (4 bytes + 2 bytes + 2 bytes) - -- - -- `state` is kept near the top of the table for operator convenience -- when - -- looking at jobs with `SELECT *` it'll appear first after ID. The other two - -- fields aren't as important but are kept adjacent to `state` for alignment - -- to get an 8-byte block. - state river_job_state NOT NULL DEFAULT 'available', - attempt smallint NOT NULL DEFAULT 0, - max_attempts smallint NOT NULL, - - -- 8 bytes each (no alignment needed) - attempted_at timestamptz, - created_at timestamptz NOT NULL DEFAULT NOW(), - finalized_at timestamptz, - scheduled_at timestamptz NOT NULL DEFAULT NOW(), - - -- 2 bytes (some wasted padding probably) - priority smallint NOT NULL DEFAULT 1, - - -- types stored out-of-band - args jsonb, - attempted_by text[], - errors jsonb[], - kind text NOT NULL, - metadata jsonb NOT NULL DEFAULT '{}', - queue text NOT NULL DEFAULT 'default', - tags varchar(255)[], - - CONSTRAINT finalized_or_finalized_at_null CHECK ((state IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NOT NULL) OR finalized_at IS NULL), - CONSTRAINT max_attempts_is_positive CHECK (max_attempts > 0), - CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), - CONSTRAINT queue_length CHECK (char_length(queue) > 0 AND char_length(queue) < 128), - CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128) -); - --- We may want to consider adding another property here after `kind` if it seems --- like it'd be useful for something. -CREATE INDEX river_job_kind ON river_job USING btree(kind); - -CREATE INDEX river_job_state_and_finalized_at_index ON river_job USING btree(state, finalized_at) WHERE finalized_at IS NOT NULL; - -CREATE INDEX river_job_prioritized_fetching_index ON river_job USING btree(state, queue, priority, scheduled_at, id); - -CREATE INDEX river_job_args_index ON river_job USING GIN(args); - -CREATE INDEX river_job_metadata_index ON river_job USING GIN(metadata); - -CREATE OR REPLACE FUNCTION river_job_notify() - RETURNS TRIGGER - AS $$ -DECLARE - payload json; -BEGIN - IF NEW.state = 'available' THEN - -- Notify will coalesce duplicate notifications within a transaction, so - -- keep these payloads generalized: - payload = json_build_object('queue', NEW.queue); - PERFORM - pg_notify('river_insert', payload::text); - END IF; - RETURN NULL; -END; -$$ -LANGUAGE plpgsql; - -CREATE TRIGGER river_notify - AFTER INSERT ON river_job - FOR EACH ROW - EXECUTE PROCEDURE river_job_notify(); - -CREATE UNLOGGED TABLE river_leader( - -- 8 bytes each (no alignment needed) - elected_at timestamptz NOT NULL, - expires_at timestamptz NOT NULL, - - -- types stored out-of-band - leader_id text NOT NULL, - name text PRIMARY KEY, - - CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128), - CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) -); - --- River migration 003 [up] -ALTER TABLE river_job ALTER COLUMN tags SET DEFAULT '{}'; -UPDATE river_job SET tags = '{}' WHERE tags IS NULL; -ALTER TABLE river_job ALTER COLUMN tags SET NOT NULL; - --- River migration 004 [up] --- The args column never had a NOT NULL constraint or default value at the --- database level, though we tried to ensure one at the application level. -ALTER TABLE river_job ALTER COLUMN args SET DEFAULT '{}'; -UPDATE river_job SET args = '{}' WHERE args IS NULL; -ALTER TABLE river_job ALTER COLUMN args SET NOT NULL; -ALTER TABLE river_job ALTER COLUMN args DROP DEFAULT; - --- The metadata column never had a NOT NULL constraint or default value at the --- database level, though we tried to ensure one at the application level. -ALTER TABLE river_job ALTER COLUMN metadata SET DEFAULT '{}'; -UPDATE river_job SET metadata = '{}' WHERE metadata IS NULL; -ALTER TABLE river_job ALTER COLUMN metadata SET NOT NULL; - --- The 'pending' job state will be used for upcoming functionality: --- ALTER TYPE river_job_state ADD VALUE IF NOT EXISTS 'pending' AFTER 'discarded'; - -ALTER TABLE river_job DROP CONSTRAINT finalized_or_finalized_at_null; -ALTER TABLE river_job ADD CONSTRAINT finalized_or_finalized_at_null CHECK ( - (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR - (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) -); - -DROP TRIGGER river_notify ON river_job; -DROP FUNCTION river_job_notify; - -CREATE TABLE river_queue( - name text PRIMARY KEY NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - metadata jsonb NOT NULL DEFAULT '{}' ::jsonb, - paused_at timestamptz, - updated_at timestamptz NOT NULL -); - -ALTER TABLE river_leader - ALTER COLUMN name SET DEFAULT 'default', - DROP CONSTRAINT name_length, - ADD CONSTRAINT name_length CHECK (name = 'default'); - --- River migration 005 [up] --- --- Rebuild the migration table so it's based on `(line, version)`. --- - -DO -$body$ -BEGIN - -- Tolerate users who may be using their own migration system rather than - -- River's. If they are, they will have skipped version 001 containing - -- `CREATE TABLE river_migration`, so this table won't exist. - IF (SELECT to_regclass('river_migration') IS NOT NULL) THEN - ALTER TABLE river_migration - RENAME TO river_migration_old; - - CREATE TABLE river_migration( - line TEXT NOT NULL, - version bigint NOT NULL, - created_at timestamptz NOT NULL DEFAULT NOW(), - CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), - CONSTRAINT version_gte_1 CHECK (version >= 1), - PRIMARY KEY (line, version) - ); - - INSERT INTO river_migration - (created_at, line, version) - SELECT created_at, 'main', version - FROM river_migration_old; - - DROP TABLE river_migration_old; - END IF; -END; -$body$ -LANGUAGE 'plpgsql'; - --- --- Add `river_job.unique_key` and bring up an index on it. --- - --- These statements use `IF NOT EXISTS` to allow users with a `river_job` table --- of non-trivial size to build the index `CONCURRENTLY` out of band of this --- migration, then follow by completing the migration. -ALTER TABLE river_job - ADD COLUMN IF NOT EXISTS unique_key bytea; - -CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; - --- --- Create `river_client` and derivative. --- --- This feature hasn't quite yet been implemented, but we're taking advantage of --- the migration to add the schema early so that we can add it later without an --- additional migration. --- - -CREATE UNLOGGED TABLE river_client ( - id text PRIMARY KEY NOT NULL, - created_at timestamptz NOT NULL DEFAULT now(), - metadata jsonb NOT NULL DEFAULT '{}', - paused_at timestamptz, - updated_at timestamptz NOT NULL, - CONSTRAINT name_length CHECK (char_length(id) > 0 AND char_length(id) < 128) -); - --- Differs from `river_queue` in that it tracks the queue state for a particular --- active client. -CREATE UNLOGGED TABLE river_client_queue ( - river_client_id text NOT NULL REFERENCES river_client (id) ON DELETE CASCADE, - name text NOT NULL, - created_at timestamptz NOT NULL DEFAULT now(), - max_workers bigint NOT NULL DEFAULT 0, - metadata jsonb NOT NULL DEFAULT '{}', - num_jobs_completed bigint NOT NULL DEFAULT 0, - num_jobs_running bigint NOT NULL DEFAULT 0, - updated_at timestamptz NOT NULL, - PRIMARY KEY (river_client_id, name), - CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128), - CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0), - CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) -); - --- River migration 006 [up] -CREATE OR REPLACE FUNCTION river_job_state_in_bitmask(bitmask BIT(8), state river_job_state) -RETURNS boolean -LANGUAGE SQL -IMMUTABLE -AS $$ - SELECT CASE state - WHEN 'available' THEN get_bit(bitmask, 7) - WHEN 'cancelled' THEN get_bit(bitmask, 6) - WHEN 'completed' THEN get_bit(bitmask, 5) - WHEN 'discarded' THEN get_bit(bitmask, 4) - WHEN 'pending' THEN get_bit(bitmask, 3) - WHEN 'retryable' THEN get_bit(bitmask, 2) - WHEN 'running' THEN get_bit(bitmask, 1) - WHEN 'scheduled' THEN get_bit(bitmask, 0) - ELSE 0 - END = 1; -$$; - --- --- Add `river_job.unique_states` and bring up an index on it. --- --- This column may exist already if users manually created the column and index --- as instructed in the changelog so the index could be created `CONCURRENTLY`. --- -ALTER TABLE river_job ADD COLUMN IF NOT EXISTS unique_states BIT(8); - --- This statement uses `IF NOT EXISTS` to allow users with a `river_job` table --- of non-trivial size to build the index `CONCURRENTLY` out of band of this --- migration, then follow by completing the migration. -CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key) - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state); - --- Remove the old unique index. Users who are actively using the unique jobs --- feature and who wish to avoid deploy downtime may want od drop this in a --- subsequent migration once all jobs using the old unique system have been --- completed (i.e. no more rows with non-null unique_key and null --- unique_states). -DROP INDEX river_job_kind_unique_key_idx; - - --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin - --- Drop Users Table --- River migration 006 [down] --- --- Drop `river_job.unique_states` and its index. --- - -DROP INDEX river_job_unique_idx; - -ALTER TABLE river_job - DROP COLUMN unique_states; - -CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; - --- --- Drop `river_job_state_in_bitmask` function. --- -DROP FUNCTION river_job_state_in_bitmask; - --- River migration 005 [down] --- --- Revert to migration table based only on `(version)`. --- --- If any non-main migrations are present, 005 is considered irreversible. --- - -DO -$body$ -BEGIN - -- Tolerate users who may be using their own migration system rather than - -- River's. If they are, they will have skipped version 001 containing - -- `CREATE TABLE river_migration`, so this table won't exist. - IF (SELECT to_regclass('river_migration') IS NOT NULL) THEN - IF EXISTS ( - SELECT * - FROM river_migration - WHERE line <> 'main' - ) THEN - RAISE EXCEPTION 'Found non-main migration lines in the database; version 005 migration is irreversible because it would result in loss of migration information.'; - END IF; - - ALTER TABLE river_migration - RENAME TO river_migration_old; - - CREATE TABLE river_migration( - id bigserial PRIMARY KEY, - created_at timestamptz NOT NULL DEFAULT NOW(), - version bigint NOT NULL, - CONSTRAINT version CHECK (version >= 1) - ); - - CREATE UNIQUE INDEX ON river_migration USING btree(version); - - INSERT INTO river_migration - (created_at, version) - SELECT created_at, version - FROM river_migration_old; - - DROP TABLE river_migration_old; - END IF; -END; -$body$ -LANGUAGE 'plpgsql'; - --- --- Drop `river_job.unique_key`. --- - -ALTER TABLE river_job - DROP COLUMN unique_key; - --- --- Drop `river_client` and derivative. --- - -DROP TABLE river_client_queue; -DROP TABLE river_client; - --- River migration 004 [down] -ALTER TABLE river_job ALTER COLUMN args DROP NOT NULL; - -ALTER TABLE river_job ALTER COLUMN metadata DROP NOT NULL; -ALTER TABLE river_job ALTER COLUMN metadata DROP DEFAULT; - --- It is not possible to safely remove 'pending' from the river_job_state enum, --- so leave it in place. - -ALTER TABLE river_job DROP CONSTRAINT finalized_or_finalized_at_null; -ALTER TABLE river_job ADD CONSTRAINT finalized_or_finalized_at_null CHECK ( - (state IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NOT NULL) OR finalized_at IS NULL -); - -CREATE OR REPLACE FUNCTION river_job_notify() - RETURNS TRIGGER - AS $$ -DECLARE - payload json; -BEGIN - IF NEW.state = 'available' THEN - -- Notify will coalesce duplicate notifications within a transaction, so - -- keep these payloads generalized: - payload = json_build_object('queue', NEW.queue); - PERFORM - pg_notify('river_insert', payload::text); - END IF; - RETURN NULL; -END; -$$ -LANGUAGE plpgsql; - -CREATE TRIGGER river_notify - AFTER INSERT ON river_job - FOR EACH ROW - EXECUTE PROCEDURE river_job_notify(); - -DROP TABLE river_queue; - -ALTER TABLE river_leader - ALTER COLUMN name DROP DEFAULT, - DROP CONSTRAINT name_length, - ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); - --- River migration 003 [down] -ALTER TABLE river_job ALTER COLUMN tags DROP NOT NULL, - ALTER COLUMN tags DROP DEFAULT; - --- River migration 002 [down] -DROP TABLE river_job; -DROP FUNCTION river_job_notify; -DROP TYPE river_job_state; - -DROP TABLE river_leader; - --- +goose StatementEnd