From 18bcae9f962e034538c64d4c9311d37b8ab9b6b4 Mon Sep 17 00:00:00 2001 From: Rogee Date: Fri, 27 Dec 2024 21:01:39 +0800 Subject: [PATCH] fix: job consts --- templates/project/app/jobs/-gitkeep.tpl | 0 templates/project/app/jobs/demo_job.go.tpl | 53 +++ .../project/database/migrations/-gitkeep | 0 .../20140202165500_river_job.sql.tpl | 407 ++++++++++++++++++ templates/project/providers/job/config.go.tpl | 14 +- 5 files changed, 471 insertions(+), 3 deletions(-) delete mode 100644 templates/project/app/jobs/-gitkeep.tpl create mode 100644 templates/project/app/jobs/demo_job.go.tpl delete mode 100644 templates/project/database/migrations/-gitkeep create mode 100644 templates/project/database/migrations/20140202165500_river_job.sql.tpl diff --git a/templates/project/app/jobs/-gitkeep.tpl b/templates/project/app/jobs/-gitkeep.tpl deleted file mode 100644 index e69de29..0000000 diff --git a/templates/project/app/jobs/demo_job.go.tpl b/templates/project/app/jobs/demo_job.go.tpl new file mode 100644 index 0000000..f94edc0 --- /dev/null +++ b/templates/project/app/jobs/demo_job.go.tpl @@ -0,0 +1,53 @@ +package jobs + +import ( + "context" + "sort" + "time" + + _ "git.ipao.vip/rogeecn/atom" + _ "git.ipao.vip/rogeecn/atom/contracts" + . "github.com/riverqueue/river" + log "github.com/sirupsen/logrus" +) + +// provider:[except|only] [returnType] [group] + +var ( + _ JobArgs = SortArgs{} + _ JobArgsWithInsertOpts = SortArgs{} +) + +type SortArgs struct { + Strings []string `json:"strings"` +} + +// InsertOpts implements JobArgsWithInsertOpts. +func (s SortArgs) InsertOpts() InsertOpts { + return InsertOpts{ + Queue: "high_priority", + Priority: PriorityDefault, + } +} + +func (SortArgs) Kind() string { + return "sort" +} + +var _ Worker[SortArgs] = (*SortWorker)(nil) + +// @provider(job) +type SortWorker struct { + WorkerDefaults[SortArgs] +} + +func (w *SortWorker) Work(ctx context.Context, job *Job[SortArgs]) error { + sort.Strings(job.Args.Strings) + + log.Infof("[%s] Sorted strings: %v\n", time.Now().Format(time.TimeOnly), job.Args.Strings) + return nil +} + +func (w *SortWorker) NextRetry(job *Job[SortArgs]) time.Time { + return time.Now().Add(5 * time.Second) +} diff --git a/templates/project/database/migrations/-gitkeep b/templates/project/database/migrations/-gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/templates/project/database/migrations/20140202165500_river_job.sql.tpl b/templates/project/database/migrations/20140202165500_river_job.sql.tpl new file mode 100644 index 0000000..942da3c --- /dev/null +++ b/templates/project/database/migrations/20140202165500_river_job.sql.tpl @@ -0,0 +1,407 @@ +-- +goose Up +-- +goose StatementBegin + +-- River migration 002 [up] +CREATE TYPE river_job_state AS ENUM( + 'available', + 'cancelled', + 'completed', + 'discarded', + 'retryable', + 'running', + 'scheduled' +); + +CREATE TABLE river_job( + -- 8 bytes + id bigserial PRIMARY KEY, + + -- 8 bytes (4 bytes + 2 bytes + 2 bytes) + -- + -- `state` is kept near the top of the table for operator convenience -- when + -- looking at jobs with `SELECT *` it'll appear first after ID. The other two + -- fields aren't as important but are kept adjacent to `state` for alignment + -- to get an 8-byte block. + state river_job_state NOT NULL DEFAULT 'available', + attempt smallint NOT NULL DEFAULT 0, + max_attempts smallint NOT NULL, + + -- 8 bytes each (no alignment needed) + attempted_at timestamptz, + created_at timestamptz NOT NULL DEFAULT NOW(), + finalized_at timestamptz, + scheduled_at timestamptz NOT NULL DEFAULT NOW(), + + -- 2 bytes (some wasted padding probably) + priority smallint NOT NULL DEFAULT 1, + + -- types stored out-of-band + args jsonb, + attempted_by text[], + errors jsonb[], + kind text NOT NULL, + metadata jsonb NOT NULL DEFAULT '{}', + queue text NOT NULL DEFAULT 'default', + tags varchar(255)[], + + CONSTRAINT finalized_or_finalized_at_null CHECK ((state IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NOT NULL) OR finalized_at IS NULL), + CONSTRAINT max_attempts_is_positive CHECK (max_attempts > 0), + CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4), + CONSTRAINT queue_length CHECK (char_length(queue) > 0 AND char_length(queue) < 128), + CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128) +); + +-- We may want to consider adding another property here after `kind` if it seems +-- like it'd be useful for something. +CREATE INDEX river_job_kind ON river_job USING btree(kind); + +CREATE INDEX river_job_state_and_finalized_at_index ON river_job USING btree(state, finalized_at) WHERE finalized_at IS NOT NULL; + +CREATE INDEX river_job_prioritized_fetching_index ON river_job USING btree(state, queue, priority, scheduled_at, id); + +CREATE INDEX river_job_args_index ON river_job USING GIN(args); + +CREATE INDEX river_job_metadata_index ON river_job USING GIN(metadata); + +CREATE OR REPLACE FUNCTION river_job_notify() + RETURNS TRIGGER + AS $$ +DECLARE + payload json; +BEGIN + IF NEW.state = 'available' THEN + -- Notify will coalesce duplicate notifications within a transaction, so + -- keep these payloads generalized: + payload = json_build_object('queue', NEW.queue); + PERFORM + pg_notify('river_insert', payload::text); + END IF; + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER river_notify + AFTER INSERT ON river_job + FOR EACH ROW + EXECUTE PROCEDURE river_job_notify(); + +CREATE UNLOGGED TABLE river_leader( + -- 8 bytes each (no alignment needed) + elected_at timestamptz NOT NULL, + expires_at timestamptz NOT NULL, + + -- types stored out-of-band + leader_id text NOT NULL, + name text PRIMARY KEY, + + CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128), + CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128) +); + +-- River migration 003 [up] +ALTER TABLE river_job ALTER COLUMN tags SET DEFAULT '{}'; +UPDATE river_job SET tags = '{}' WHERE tags IS NULL; +ALTER TABLE river_job ALTER COLUMN tags SET NOT NULL; + +-- River migration 004 [up] +-- The args column never had a NOT NULL constraint or default value at the +-- database level, though we tried to ensure one at the application level. +ALTER TABLE river_job ALTER COLUMN args SET DEFAULT '{}'; +UPDATE river_job SET args = '{}' WHERE args IS NULL; +ALTER TABLE river_job ALTER COLUMN args SET NOT NULL; +ALTER TABLE river_job ALTER COLUMN args DROP DEFAULT; + +-- The metadata column never had a NOT NULL constraint or default value at the +-- database level, though we tried to ensure one at the application level. +ALTER TABLE river_job ALTER COLUMN metadata SET DEFAULT '{}'; +UPDATE river_job SET metadata = '{}' WHERE metadata IS NULL; +ALTER TABLE river_job ALTER COLUMN metadata SET NOT NULL; + +-- The 'pending' job state will be used for upcoming functionality: +ALTER TYPE river_job_state ADD VALUE IF NOT EXISTS 'pending' AFTER 'discarded'; + +ALTER TABLE river_job DROP CONSTRAINT finalized_or_finalized_at_null; +ALTER TABLE river_job ADD CONSTRAINT finalized_or_finalized_at_null CHECK ( + (finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR + (finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded')) +); + +DROP TRIGGER river_notify ON river_job; +DROP FUNCTION river_job_notify; + +CREATE TABLE river_queue( + name text PRIMARY KEY NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + metadata jsonb NOT NULL DEFAULT '{}' ::jsonb, + paused_at timestamptz, + updated_at timestamptz NOT NULL +); + +ALTER TABLE river_leader + ALTER COLUMN name SET DEFAULT 'default', + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (name = 'default'); + +-- River migration 005 [up] +-- +-- Rebuild the migration table so it's based on `(line, version)`. +-- + +DO +$body$ +BEGIN + -- Tolerate users who may be using their own migration system rather than + -- River's. If they are, they will have skipped version 001 containing + -- `CREATE TABLE river_migration`, so this table won't exist. + IF (SELECT to_regclass('river_migration') IS NOT NULL) THEN + ALTER TABLE river_migration + RENAME TO river_migration_old; + + CREATE TABLE river_migration( + line TEXT NOT NULL, + version bigint NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW(), + CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128), + CONSTRAINT version_gte_1 CHECK (version >= 1), + PRIMARY KEY (line, version) + ); + + INSERT INTO river_migration + (created_at, line, version) + SELECT created_at, 'main', version + FROM river_migration_old; + + DROP TABLE river_migration_old; + END IF; +END; +$body$ +LANGUAGE 'plpgsql'; + +-- +-- Add `river_job.unique_key` and bring up an index on it. +-- + +-- These statements use `IF NOT EXISTS` to allow users with a `river_job` table +-- of non-trivial size to build the index `CONCURRENTLY` out of band of this +-- migration, then follow by completing the migration. +ALTER TABLE river_job + ADD COLUMN IF NOT EXISTS unique_key bytea; + +CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; + +-- +-- Create `river_client` and derivative. +-- +-- This feature hasn't quite yet been implemented, but we're taking advantage of +-- the migration to add the schema early so that we can add it later without an +-- additional migration. +-- + +CREATE UNLOGGED TABLE river_client ( + id text PRIMARY KEY NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + metadata jsonb NOT NULL DEFAULT '{}', + paused_at timestamptz, + updated_at timestamptz NOT NULL, + CONSTRAINT name_length CHECK (char_length(id) > 0 AND char_length(id) < 128) +); + +-- Differs from `river_queue` in that it tracks the queue state for a particular +-- active client. +CREATE UNLOGGED TABLE river_client_queue ( + river_client_id text NOT NULL REFERENCES river_client (id) ON DELETE CASCADE, + name text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + max_workers bigint NOT NULL DEFAULT 0, + metadata jsonb NOT NULL DEFAULT '{}', + num_jobs_completed bigint NOT NULL DEFAULT 0, + num_jobs_running bigint NOT NULL DEFAULT 0, + updated_at timestamptz NOT NULL, + PRIMARY KEY (river_client_id, name), + CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128), + CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0), + CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) +); + +-- River migration 006 [up] +CREATE OR REPLACE FUNCTION river_job_state_in_bitmask(bitmask BIT(8), state river_job_state) +RETURNS boolean +LANGUAGE SQL +IMMUTABLE +AS $$ + SELECT CASE state + WHEN 'available' THEN get_bit(bitmask, 7) + WHEN 'cancelled' THEN get_bit(bitmask, 6) + WHEN 'completed' THEN get_bit(bitmask, 5) + WHEN 'discarded' THEN get_bit(bitmask, 4) + WHEN 'pending' THEN get_bit(bitmask, 3) + WHEN 'retryable' THEN get_bit(bitmask, 2) + WHEN 'running' THEN get_bit(bitmask, 1) + WHEN 'scheduled' THEN get_bit(bitmask, 0) + ELSE 0 + END = 1; +$$; + +-- +-- Add `river_job.unique_states` and bring up an index on it. +-- +-- This column may exist already if users manually created the column and index +-- as instructed in the changelog so the index could be created `CONCURRENTLY`. +-- +ALTER TABLE river_job ADD COLUMN IF NOT EXISTS unique_states BIT(8); + +-- This statement uses `IF NOT EXISTS` to allow users with a `river_job` table +-- of non-trivial size to build the index `CONCURRENTLY` out of band of this +-- migration, then follow by completing the migration. +CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river_job (unique_key) + WHERE unique_key IS NOT NULL + AND unique_states IS NOT NULL + AND river_job_state_in_bitmask(unique_states, state); + +-- Remove the old unique index. Users who are actively using the unique jobs +-- feature and who wish to avoid deploy downtime may want od drop this in a +-- subsequent migration once all jobs using the old unique system have been +-- completed (i.e. no more rows with non-null unique_key and null +-- unique_states). +DROP INDEX river_job_kind_unique_key_idx; + + +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin + +-- Drop Users Table +-- River migration 006 [down] +-- +-- Drop `river_job.unique_states` and its index. +-- + +DROP INDEX river_job_unique_idx; + +ALTER TABLE river_job + DROP COLUMN unique_states; + +CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx ON river_job (kind, unique_key) WHERE unique_key IS NOT NULL; + +-- +-- Drop `river_job_state_in_bitmask` function. +-- +DROP FUNCTION river_job_state_in_bitmask; + +-- River migration 005 [down] +-- +-- Revert to migration table based only on `(version)`. +-- +-- If any non-main migrations are present, 005 is considered irreversible. +-- + +DO +$body$ +BEGIN + -- Tolerate users who may be using their own migration system rather than + -- River's. If they are, they will have skipped version 001 containing + -- `CREATE TABLE river_migration`, so this table won't exist. + IF (SELECT to_regclass('river_migration') IS NOT NULL) THEN + IF EXISTS ( + SELECT * + FROM river_migration + WHERE line <> 'main' + ) THEN + RAISE EXCEPTION 'Found non-main migration lines in the database; version 005 migration is irreversible because it would result in loss of migration information.'; + END IF; + + ALTER TABLE river_migration + RENAME TO river_migration_old; + + CREATE TABLE river_migration( + id bigserial PRIMARY KEY, + created_at timestamptz NOT NULL DEFAULT NOW(), + version bigint NOT NULL, + CONSTRAINT version CHECK (version >= 1) + ); + + CREATE UNIQUE INDEX ON river_migration USING btree(version); + + INSERT INTO river_migration + (created_at, version) + SELECT created_at, version + FROM river_migration_old; + + DROP TABLE river_migration_old; + END IF; +END; +$body$ +LANGUAGE 'plpgsql'; + +-- +-- Drop `river_job.unique_key`. +-- + +ALTER TABLE river_job + DROP COLUMN unique_key; + +-- +-- Drop `river_client` and derivative. +-- + +DROP TABLE river_client_queue; +DROP TABLE river_client; + +-- River migration 004 [down] +ALTER TABLE river_job ALTER COLUMN args DROP NOT NULL; + +ALTER TABLE river_job ALTER COLUMN metadata DROP NOT NULL; +ALTER TABLE river_job ALTER COLUMN metadata DROP DEFAULT; + +-- It is not possible to safely remove 'pending' from the river_job_state enum, +-- so leave it in place. + +ALTER TABLE river_job DROP CONSTRAINT finalized_or_finalized_at_null; +ALTER TABLE river_job ADD CONSTRAINT finalized_or_finalized_at_null CHECK ( + (state IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NOT NULL) OR finalized_at IS NULL +); + +CREATE OR REPLACE FUNCTION river_job_notify() + RETURNS TRIGGER + AS $$ +DECLARE + payload json; +BEGIN + IF NEW.state = 'available' THEN + -- Notify will coalesce duplicate notifications within a transaction, so + -- keep these payloads generalized: + payload = json_build_object('queue', NEW.queue); + PERFORM + pg_notify('river_insert', payload::text); + END IF; + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER river_notify + AFTER INSERT ON river_job + FOR EACH ROW + EXECUTE PROCEDURE river_job_notify(); + +DROP TABLE river_queue; + +ALTER TABLE river_leader + ALTER COLUMN name DROP DEFAULT, + DROP CONSTRAINT name_length, + ADD CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128); + +-- River migration 003 [down] +ALTER TABLE river_job ALTER COLUMN tags DROP NOT NULL, + ALTER COLUMN tags DROP DEFAULT; + +-- River migration 002 [down] +DROP TABLE river_job; +DROP FUNCTION river_job_notify; +DROP TYPE river_job_state; + +DROP TABLE river_leader; + +-- +goose StatementEnd diff --git a/templates/project/providers/job/config.go.tpl b/templates/project/providers/job/config.go.tpl index 76a5695..fa6ea12 100644 --- a/templates/project/providers/job/config.go.tpl +++ b/templates/project/providers/job/config.go.tpl @@ -3,6 +3,7 @@ package job import ( "git.ipao.vip/rogeecn/atom/container" "git.ipao.vip/rogeecn/atom/utils/opt" + "github.com/riverqueue/river" ) const DefaultPrefix = "Job" @@ -19,7 +20,14 @@ func DefaultProvider() container.ProviderContainer { type Config struct{} const ( - PriorityHigh = "high" - PriorityDefault = "default" - PriorityLow = "low" + PriorityDefault = river.PriorityDefault + PriorityLow = 2 + PriorityMiddle = 3 + PriorityHigh = 3 +) + +const ( + QueueHigh = "high" + QueueDefault = "default" + QueueLow = "low" )