From bb00250dda1ab5d5d8cf954f4de06f9d2df85223 Mon Sep 17 00:00:00 2001 From: Rogee Date: Sat, 15 Nov 2025 21:15:12 +0800 Subject: [PATCH] stage 1 --- .dockerignore | 2 + AGENTS.md | 2 +- docs/operations/migration.md | 58 ++++++ internal/cache/doc.go | 13 +- internal/cache/fs_store.go | 157 ++------------ internal/cache/store.go | 2 +- internal/cache/store_test.go | 85 +------- internal/cache/writer.go | 57 +++++ internal/config/loader.go | 3 + internal/config/modules.go | 7 +- internal/config/runtime.go | 14 +- internal/config/runtime_flags.go | 62 ++++++ internal/config/types.go | 14 ++ internal/config/validation.go | 5 + internal/hubmodule/README.md | 2 +- internal/hubmodule/doc.go | 2 +- internal/hubmodule/docker/module.go | 29 +++ internal/hubmodule/interfaces.go | 10 +- internal/hubmodule/legacy/legacy_module.go | 3 +- internal/hubmodule/legacy/state.go | 65 ++++++ internal/hubmodule/npm/locator.go | 41 ++++ internal/hubmodule/npm/module.go | 30 +++ internal/hubmodule/npm/module_test.go | 52 +++++ internal/hubmodule/pypi/module.go | 29 +++ internal/hubmodule/strategy.go | 15 +- internal/hubmodule/template/module.go | 3 +- internal/logging/fields.go | 15 +- internal/proxy/handler.go | 51 +++-- internal/server/bootstrap.go | 7 +- internal/server/hub_registry.go | 29 ++- internal/server/hub_registry_test.go | 10 + internal/server/router.go | 18 +- internal/server/routes/modules.go | 114 ++++++++++ main.go | 2 + specs/004-modular-proxy-cache/data-model.md | 5 +- specs/004-modular-proxy-cache/plan.md | 6 +- specs/004-modular-proxy-cache/quickstart.md | 18 +- specs/004-modular-proxy-cache/research.md | 6 +- specs/004-modular-proxy-cache/tasks.md | 26 +-- .../cache_strategy_override_test.go | 197 ++++++++++++++++++ .../integration/legacy_adapter_toggle_test.go | 118 +++++++++++ tests/integration/module_diagnostics_test.go | 154 ++++++++++++++ tests/integration/module_routing_test.go | 2 + 43 files changed, 1232 insertions(+), 308 deletions(-) create mode 100644 docs/operations/migration.md create mode 100644 internal/cache/writer.go create mode 100644 internal/config/runtime_flags.go create mode 100644 internal/hubmodule/docker/module.go create mode 100644 internal/hubmodule/legacy/state.go create mode 100644 internal/hubmodule/npm/locator.go create mode 100644 internal/hubmodule/npm/module.go create mode 100644 internal/hubmodule/npm/module_test.go create mode 100644 internal/hubmodule/pypi/module.go create mode 100644 internal/server/routes/modules.go create mode 100644 tests/integration/cache_strategy_override_test.go create mode 100644 tests/integration/legacy_adapter_toggle_test.go create mode 100644 tests/integration/module_diagnostics_test.go diff --git a/.dockerignore b/.dockerignore index 95a7c87..c856f3d 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,6 +3,8 @@ .github .codex .specify +Dockerfile* +.dockerignore configs node_modules/ coverage/ diff --git a/AGENTS.md b/AGENTS.md index efcf230..1777684 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,7 +7,7 @@ Auto-generated from all feature plans. Last updated: 2025-11-13 - 本地文件系统缓存目录 `StoragePath//`,结合文件 `mtime` + 上游 HEAD 再验证 (002-fiber-single-proxy) - Go 1.25+(静态链接单二进制) + Fiber v3(HTTP 服务)、Viper(配置加载/校验)、Logrus + Lumberjack(结构化日志 & 滚动)、标准库 `net/http`/`io`(代理回源) (003-hub-auth-fields) - 仍使用本地 `StoragePath//` 目录缓存正文,并依赖 HEAD 对动态标签再验证 (003-hub-auth-fields) -- 本地文件系统缓存目录 `StoragePath//.body` + `.meta` 元数据(模块必须复用同一布局) (004-modular-proxy-cache) +- 本地文件系统缓存目录 `StoragePath//`,模块需直接复用原始路径布局 (004-modular-proxy-cache) - Go 1.25+ (静态链接,单二进制交付) + Fiber v3(HTTP 服务)、Viper(配置)、Logrus + Lumberjack(结构化日志 [EXTRACTED FROM ALL PLAN.MD FILES] 滚动)、标准库 `net/http`/`io` (001-config-bootstrap) diff --git a/docs/operations/migration.md b/docs/operations/migration.md new file mode 100644 index 0000000..7f09e24 --- /dev/null +++ b/docs/operations/migration.md @@ -0,0 +1,58 @@ +# Modular Hub Migration Playbook + +This playbook describes how to cut a hub over from the shared legacy adapter to a dedicated module using the new rollout flags, diagnostics endpoint, and structured logs delivered in feature `004-modular-proxy-cache`. + +## Prerequisites + +- Target module must be registered via `hubmodule.MustRegister` and expose a proxy handler through `proxy.RegisterModuleHandler`. +- `config.toml` must already map the hub to its target module through `[[Hub]].Module`. +- Operators must have access to the running binary port (default `:5000`) to query `/-/modules`. + +## Rollout Workflow + +1. **Snapshot current state** + Run `curl -s http://localhost:5000/-/modules | jq '.hubs[] | select(.hub_name=="")'` to capture the current `module_key` and `rollout_flag`. Legacy hubs report `module_key=legacy` and `rollout_flag=legacy-only`. + +2. **Prepare config for dual traffic** + Edit the hub block to target the new module while keeping rollback safety: + + ```toml + [[Hub]] + Name = "npm-prod" + Domain = "npm.example.com" + Upstream = "https://registry.npmjs.org" + Module = "npm" + Rollout = "dual" + ``` + + Dual mode keeps routing on the new module but keeps observability tagged as a partial rollout. + +3. **Deploy and monitor** + Restart the service and tail logs filtered by `module_key`: + + ```sh + jq 'select(.module_key=="npm" and .rollout_flag=="dual")' /var/log/any-hub.json + ``` + + Every request now carries `module_key`/`rollout_flag`, allowing dashboards or `grep`-based analyses without extra parsing. + +4. **Verify diagnostics** + Query `/-/modules/npm` to inspect the registered metadata and confirm cache strategy, or `/-/modules` to ensure the hub binding reflects `rollout_flag=dual`. + +5. **Promote to modular** + Once metrics are healthy, change `Rollout = "modular"` in config and redeploy. Continue monitoring logs to make sure both `module_key` and `rollout_flag` show the fully promoted state. + +6. **Rollback procedure** + To rollback, set `Rollout = "legacy-only"` (without touching `Module`). The runtime forces traffic through the legacy module while keeping the desired module declaration for later reattempts. Confirm via diagnostics (`module_key` reverts to `legacy`) before announcing rollback complete. + +## Observability Checklist + +- **Logs**: Every proxy log line now contains `hub`, `module_key`, `rollout_flag`, upstream status, and `request_id`. Capture at least five minutes of traffic per flag change. +- **Diagnostics**: Store JSON snapshots from `/-/modules` before and after each rollout stage for incident timelines. +- **Config History**: Keep the `config.toml` diff (especially `Rollout` changes) attached to change records for auditability. + +## Troubleshooting + +- **Error: `module_not_found` during diagnostics** → module key not registered; ensure the module package’s `init()` calls `hubmodule.MustRegister`. +- **Requests still tagged with `legacy-only` after promotion** → double-check the running process uses the updated config path (`ANY_HUB_CONFIG` vs `--config`) and restart the service. +- **Diagnostics 404** → confirm you are hitting the correct port and that the CLI user/network path allows HTTP access; the endpoint ignores Host headers, so `curl http://127.0.0.1:/-/modules` should succeed locally. diff --git a/internal/cache/doc.go b/internal/cache/doc.go index c7e60e0..1ae4f26 100644 --- a/internal/cache/doc.go +++ b/internal/cache/doc.go @@ -1,7 +1,10 @@ // Package cache defines the disk-backed store responsible for translating hub -// requests into StoragePath//.body files. The store exposes read/write -// primitives with safe semantics (temp file + rename) and surfaces file info -// (size, modtime) for higher layers to implement conditional revalidation. -// Proxy handlers depend on this package to stream cached responses or trigger -// upstream fetches without duplicating filesystem logic. +// requests into StoragePath// directories that mirror upstream +// paths. When a given path also needs to act as the parent of other entries +// (例如 npm metadata + tarball目录), the body is stored in a `__content` file +// under that directory so两种形态可以共存。The store exposes read/write primitives +// with safe semantics (temp file + rename) and surfaces file info (size, modtime) +// for higher layers to implement conditional revalidation. Proxy handlers depend +// on this package to stream cached responses or trigger upstream fetches without +// duplicating filesystem logic. package cache diff --git a/internal/cache/fs_store.go b/internal/cache/fs_store.go index 084d04e..028d91d 100644 --- a/internal/cache/fs_store.go +++ b/internal/cache/fs_store.go @@ -15,8 +15,6 @@ import ( "time" ) -const cacheFileSuffix = ".body" - // NewStore 以 basePath 为根目录构建磁盘缓存,整站复用一份实例。 func NewStore(basePath string) (Store, error) { if basePath == "" { @@ -58,13 +56,27 @@ func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, erro default: } - primary, legacy, err := s.entryPaths(locator) + filePath, err := s.entryPath(locator) if err != nil { return nil, err } - filePath, info, f, err := s.openEntryFile(primary, legacy) + info, err := os.Stat(filePath) if err != nil { + if errors.Is(err, fs.ErrNotExist) || isNotDirError(err) { + return nil, ErrNotFound + } + return nil, err + } + if info.IsDir() { + return nil, ErrNotFound + } + + file, err := os.Open(filePath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) || isNotDirError(err) { + return nil, ErrNotFound + } return nil, err } @@ -74,11 +86,7 @@ func (s *fileStore) Get(ctx context.Context, locator Locator) (*ReadResult, erro SizeBytes: info.Size(), ModTime: info.ModTime(), } - - return &ReadResult{ - Entry: entry, - Reader: f, - }, nil + return &ReadResult{Entry: entry, Reader: file}, nil } func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, opts PutOptions) (*Entry, error) { @@ -88,12 +96,12 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op } defer unlock() - filePath, legacyPath, err := s.entryPaths(locator) + filePath, err := s.entryPath(locator) if err != nil { return nil, err } - if err := s.ensureDirWithUpgrade(filepath.Dir(filePath)); err != nil { + if err := os.MkdirAll(filepath.Dir(filePath), 0o755); err != nil { return nil, err } @@ -109,12 +117,12 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op err = closeErr } if err != nil { - os.Remove(tempName) + _ = os.Remove(tempName) return nil, err } if err := os.Rename(tempName, filePath); err != nil { - os.Remove(tempName) + _ = os.Remove(tempName) return nil, err } @@ -125,7 +133,6 @@ func (s *fileStore) Put(ctx context.Context, locator Locator, body io.Reader, op if err := os.Chtimes(filePath, modTime, modTime); err != nil { return nil, err } - _ = os.Remove(legacyPath) entry := Entry{ Locator: locator, @@ -143,16 +150,13 @@ func (s *fileStore) Remove(ctx context.Context, locator Locator) error { } defer unlock() - filePath, legacyPath, err := s.entryPaths(locator) + filePath, err := s.entryPath(locator) if err != nil { return err } if err := os.Remove(filePath); err != nil && !errors.Is(err, fs.ErrNotExist) { return err } - if err := os.Remove(legacyPath); err != nil && !errors.Is(err, fs.ErrNotExist) { - return err - } return nil } @@ -179,7 +183,7 @@ func (s *fileStore) lockEntry(locator Locator) (func(), error) { }, nil } -func (s *fileStore) path(locator Locator) (string, error) { +func (s *fileStore) entryPath(locator Locator) (string, error) { if locator.HubName == "" { return "", errors.New("hub name required") } @@ -203,121 +207,6 @@ func (s *fileStore) path(locator Locator) (string, error) { return filePath, nil } -func (s *fileStore) entryPaths(locator Locator) (string, string, error) { - legacyPath, err := s.path(locator) - if err != nil { - return "", "", err - } - return legacyPath + cacheFileSuffix, legacyPath, nil -} - -func (s *fileStore) openEntryFile(primaryPath, legacyPath string) (string, fs.FileInfo, *os.File, error) { - info, err := os.Stat(primaryPath) - if err == nil { - if info.IsDir() { - return "", nil, nil, ErrNotFound - } - f, err := os.Open(primaryPath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) || isNotDirError(err) { - return "", nil, nil, ErrNotFound - } - return "", nil, nil, err - } - return primaryPath, info, f, nil - } - if !errors.Is(err, fs.ErrNotExist) && !isNotDirError(err) { - return "", nil, nil, err - } - - info, err = os.Stat(legacyPath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) || isNotDirError(err) { - return "", nil, nil, ErrNotFound - } - return "", nil, nil, err - } - if info.IsDir() { - return "", nil, nil, ErrNotFound - } - - if migrateErr := s.migrateLegacyFile(primaryPath, legacyPath); migrateErr == nil { - return s.openEntryFile(primaryPath, legacyPath) - } - - f, err := os.Open(legacyPath) - if err != nil { - if errors.Is(err, fs.ErrNotExist) || isNotDirError(err) { - return "", nil, nil, ErrNotFound - } - return "", nil, nil, err - } - return legacyPath, info, f, nil -} - -func (s *fileStore) migrateLegacyFile(primaryPath, legacyPath string) error { - if legacyPath == "" || primaryPath == legacyPath { - return nil - } - if _, err := os.Stat(legacyPath); err != nil { - return err - } - if _, err := os.Stat(primaryPath); err == nil { - if removeErr := os.Remove(legacyPath); removeErr != nil && !errors.Is(removeErr, fs.ErrNotExist) { - return removeErr - } - return nil - } - return os.Rename(legacyPath, primaryPath) -} - -func (s *fileStore) ensureDirWithUpgrade(dir string) error { - for i := 0; i < 8; i++ { - if err := os.MkdirAll(dir, 0o755); err != nil { - if isNotDirError(err) { - var pathErr *os.PathError - if errors.As(err, &pathErr) { - if upgradeErr := s.upgradeLegacyNode(pathErr.Path); upgradeErr != nil { - return upgradeErr - } - continue - } - } - return err - } - return nil - } - return fmt.Errorf("ensure cache directory failed for %s", dir) -} - -func (s *fileStore) upgradeLegacyNode(conflictPath string) error { - if conflictPath == "" { - return errors.New("empty conflict path") - } - rel, err := filepath.Rel(s.basePath, conflictPath) - if err != nil { - return err - } - if strings.HasPrefix(rel, "..") { - return fmt.Errorf("conflict path outside storage: %s", conflictPath) - } - info, err := os.Stat(conflictPath) - if err != nil { - return err - } - if info.IsDir() { - return nil - } - if strings.HasSuffix(conflictPath, cacheFileSuffix) { - return nil - } - newPath := conflictPath + cacheFileSuffix - if _, err := os.Stat(newPath); err == nil { - return os.Remove(conflictPath) - } - return os.Rename(conflictPath, newPath) -} - func isNotDirError(err error) bool { if err == nil { return false diff --git a/internal/cache/store.go b/internal/cache/store.go index 4b8225b..1db5b0f 100644 --- a/internal/cache/store.go +++ b/internal/cache/store.go @@ -9,7 +9,7 @@ import ( // Store 负责管理磁盘缓存的读写。磁盘布局遵循: // -// //.body # 实际正文 +// // # 实际正文(与请求路径一致) // // 每个条目仅由正文文件组成,文件的 ModTime/Size 由文件系统提供。 type Store interface { diff --git a/internal/cache/store_test.go b/internal/cache/store_test.go index ef56e0d..dd1891a 100644 --- a/internal/cache/store_test.go +++ b/internal/cache/store_test.go @@ -7,8 +7,6 @@ import ( "io" "io/fs" "os" - "path/filepath" - "strings" "testing" "time" ) @@ -42,9 +40,6 @@ func TestStorePutAndGet(t *testing.T) { if !result.Entry.ModTime.Equal(modTime) { t.Fatalf("modtime mismatch: expected %v got %v", modTime, result.Entry.ModTime) } - if !strings.HasSuffix(result.Entry.FilePath, cacheFileSuffix) { - t.Fatalf("expected cache file suffix %s, got %s", cacheFileSuffix, result.Entry.FilePath) - } } func TestStoreGetMissing(t *testing.T) { @@ -78,11 +73,11 @@ func TestStoreIgnoresDirectories(t *testing.T) { t.Fatalf("unexpected store type %T", store) } - filePath, err := fs.path(locator) + filePath, err := fs.entryPath(locator) if err != nil { t.Fatalf("path error: %v", err) } - if err := os.MkdirAll(filePath+cacheFileSuffix, 0o755); err != nil { + if err := os.MkdirAll(filePath, 0o755); err != nil { t.Fatalf("mkdir error: %v", err) } @@ -91,82 +86,6 @@ func TestStoreIgnoresDirectories(t *testing.T) { } } -func TestStoreMigratesLegacyEntryOnGet(t *testing.T) { - store := newTestStore(t) - fs, ok := store.(*fileStore) - if !ok { - t.Fatalf("unexpected store type %T", store) - } - locator := Locator{HubName: "npm", Path: "/pkg"} - legacyPath, err := fs.path(locator) - if err != nil { - t.Fatalf("path error: %v", err) - } - if err := os.MkdirAll(filepath.Dir(legacyPath), 0o755); err != nil { - t.Fatalf("mkdir error: %v", err) - } - if err := os.WriteFile(legacyPath, []byte("legacy"), 0o644); err != nil { - t.Fatalf("write legacy error: %v", err) - } - - result, err := store.Get(context.Background(), locator) - if err != nil { - t.Fatalf("get legacy error: %v", err) - } - body, err := io.ReadAll(result.Reader) - if err != nil { - t.Fatalf("read legacy error: %v", err) - } - result.Reader.Close() - if string(body) != "legacy" { - t.Fatalf("unexpected legacy body: %s", string(body)) - } - if !strings.HasSuffix(result.Entry.FilePath, cacheFileSuffix) { - t.Fatalf("expected migrated file suffix, got %s", result.Entry.FilePath) - } - if _, statErr := os.Stat(legacyPath); !errors.Is(statErr, fs.ErrNotExist) { - t.Fatalf("expected legacy path removed, got %v", statErr) - } -} - -func TestStoreHandlesAncestorFileConflict(t *testing.T) { - store := newTestStore(t) - fs, ok := store.(*fileStore) - if !ok { - t.Fatalf("unexpected store type %T", store) - } - metaLocator := Locator{HubName: "npm", Path: "/pkg"} - legacyPath, err := fs.path(metaLocator) - if err != nil { - t.Fatalf("path error: %v", err) - } - if err := os.MkdirAll(filepath.Dir(legacyPath), 0o755); err != nil { - t.Fatalf("mkdir error: %v", err) - } - if err := os.WriteFile(legacyPath, []byte("legacy"), 0o644); err != nil { - t.Fatalf("write legacy error: %v", err) - } - - tarLocator := Locator{HubName: "npm", Path: "/pkg/-/pkg-1.0.0.tgz"} - if _, err := store.Put(context.Background(), tarLocator, bytes.NewReader([]byte("tar")), PutOptions{}); err != nil { - t.Fatalf("put tar error: %v", err) - } - - if _, err := os.Stat(legacyPath); !errors.Is(err, fs.ErrNotExist) { - t.Fatalf("expected legacy metadata renamed, got %v", err) - } - if _, err := os.Stat(legacyPath + cacheFileSuffix); err != nil { - t.Fatalf("expected migrated legacy cache, got %v", err) - } - primary, _, err := fs.entryPaths(tarLocator) - if err != nil { - t.Fatalf("entry path error: %v", err) - } - if _, err := os.Stat(primary); err != nil { - t.Fatalf("expected tar cache file, got %v", err) - } -} - // newTestStore returns a Store backed by a temporary directory. func newTestStore(t *testing.T) Store { t.Helper() diff --git a/internal/cache/writer.go b/internal/cache/writer.go new file mode 100644 index 0000000..349feca --- /dev/null +++ b/internal/cache/writer.go @@ -0,0 +1,57 @@ +package cache + +import ( + "context" + "errors" + "io" + "time" + + "github.com/any-hub/any-hub/internal/hubmodule" +) + +// ErrStoreUnavailable 表示当前模块未注入缓存存储实例。 +var ErrStoreUnavailable = errors.New("cache store unavailable") + +// StrategyWriter 注入模块的缓存策略,提供 TTL 决策与写入封装。 +type StrategyWriter struct { + store Store + strategy hubmodule.CacheStrategyProfile + now func() time.Time +} + +// NewStrategyWriter 构造策略感知的写入器,默认使用 time.Now 作为时钟。 +func NewStrategyWriter(store Store, strategy hubmodule.CacheStrategyProfile) StrategyWriter { + return StrategyWriter{ + store: store, + strategy: strategy, + now: time.Now, + } +} + +// Enabled 返回当前是否具备缓存写入能力。 +func (w StrategyWriter) Enabled() bool { + return w.store != nil +} + +// Put 写入缓存正文,并保持与 Store 相同的语义。 +func (w StrategyWriter) Put(ctx context.Context, locator Locator, body io.Reader, opts PutOptions) (*Entry, error) { + if w.store == nil { + return nil, ErrStoreUnavailable + } + return w.store.Put(ctx, locator, body, opts) +} + +// ShouldBypassValidation 根据策略 TTL 判断是否可以直接复用缓存,避免重复 HEAD。 +func (w StrategyWriter) ShouldBypassValidation(entry Entry) bool { + ttl := w.strategy.TTLHint + if ttl <= 0 { + return false + } + expireAt := entry.ModTime.Add(ttl) + return w.now().Before(expireAt) +} + +// SupportsValidation 返回当前策略是否允许通过 HEAD/Etag 等方式再验证。 +func (w StrategyWriter) SupportsValidation() bool { + return w.strategy.ValidationMode != hubmodule.ValidationModeNever +} diff --git a/internal/config/loader.go b/internal/config/loader.go index e579d80..b24e5be 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -94,6 +94,9 @@ func applyHubDefaults(h *HubConfig) { } else { h.Module = strings.ToLower(trimmed) } + if rollout := strings.TrimSpace(h.Rollout); rollout != "" { + h.Rollout = strings.ToLower(rollout) + } if h.ValidationMode == "" { h.ValidationMode = string(hubmodule.ValidationModeETag) } diff --git a/internal/config/modules.go b/internal/config/modules.go index d6ec19e..1039f2e 100644 --- a/internal/config/modules.go +++ b/internal/config/modules.go @@ -1,3 +1,8 @@ package config -import _ "github.com/any-hub/any-hub/internal/hubmodule/legacy" +import ( + _ "github.com/any-hub/any-hub/internal/hubmodule/docker" + _ "github.com/any-hub/any-hub/internal/hubmodule/legacy" + _ "github.com/any-hub/any-hub/internal/hubmodule/npm" + _ "github.com/any-hub/any-hub/internal/hubmodule/pypi" +) diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 0821bb6..8225332 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -1,7 +1,10 @@ package config import ( + "time" + "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/hubmodule/legacy" ) // HubRuntime 将 Hub 配置与模块元数据合并,方便运行时快速取用策略。 @@ -9,17 +12,16 @@ type HubRuntime struct { Config HubConfig Module hubmodule.ModuleMetadata CacheStrategy hubmodule.CacheStrategyProfile + Rollout legacy.RolloutFlag } -// BuildHubRuntime 根据 Hub 配置和模块元数据创建运行时描述。 -func BuildHubRuntime(cfg HubConfig, meta hubmodule.ModuleMetadata) HubRuntime { - strategy := hubmodule.ResolveStrategy(meta, hubmodule.StrategyOptions{ - TTLOverride: cfg.CacheTTL.DurationValue(), - ValidationOverride: hubmodule.ValidationMode(cfg.ValidationMode), - }) +// BuildHubRuntime 根据 Hub 配置和模块元数据创建运行时描述,应用最终 TTL 覆盖。 +func BuildHubRuntime(cfg HubConfig, meta hubmodule.ModuleMetadata, ttl time.Duration, flag legacy.RolloutFlag) HubRuntime { + strategy := hubmodule.ResolveStrategy(meta, cfg.StrategyOverrides(ttl)) return HubRuntime{ Config: cfg, Module: meta, CacheStrategy: strategy, + Rollout: flag, } } diff --git a/internal/config/runtime_flags.go b/internal/config/runtime_flags.go new file mode 100644 index 0000000..a95a590 --- /dev/null +++ b/internal/config/runtime_flags.go @@ -0,0 +1,62 @@ +package config + +import ( + "fmt" + "strings" + + "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/hubmodule/legacy" +) + +// parseRolloutFlag 将配置中的 rollout 字段标准化,并结合模块类型输出最终状态。 +func parseRolloutFlag(raw string, moduleKey string) (legacy.RolloutFlag, error) { + normalized := strings.ToLower(strings.TrimSpace(raw)) + if normalized == "" { + return defaultRolloutFlag(moduleKey), nil + } + + switch normalized { + case string(legacy.RolloutLegacyOnly): + return legacy.RolloutLegacyOnly, nil + case string(legacy.RolloutDual): + if moduleKey == hubmodule.DefaultModuleKey() { + return legacy.RolloutLegacyOnly, nil + } + return legacy.RolloutDual, nil + case string(legacy.RolloutModular): + if moduleKey == hubmodule.DefaultModuleKey() { + return legacy.RolloutLegacyOnly, nil + } + return legacy.RolloutModular, nil + default: + return "", fmt.Errorf("不支持的 rollout 值: %s", raw) + } +} + +func defaultRolloutFlag(moduleKey string) legacy.RolloutFlag { + if strings.TrimSpace(moduleKey) == "" || moduleKey == hubmodule.DefaultModuleKey() { + return legacy.RolloutLegacyOnly + } + return legacy.RolloutModular +} + +// EffectiveModuleKey 根据 rollout 状态计算真实运行的模块。 +func EffectiveModuleKey(moduleKey string, flag legacy.RolloutFlag) string { + if flag == legacy.RolloutLegacyOnly { + return hubmodule.DefaultModuleKey() + } + normalized := strings.ToLower(strings.TrimSpace(moduleKey)) + if normalized == "" { + return hubmodule.DefaultModuleKey() + } + return normalized +} + +// RolloutFlagValue 返回当前 Hub 的 rollout flag(假定 Validate 已经通过)。 +func (h HubConfig) RolloutFlagValue() legacy.RolloutFlag { + flag := legacy.RolloutFlag(strings.ToLower(strings.TrimSpace(h.Rollout))) + if flag == "" { + return defaultRolloutFlag(h.Module) + } + return flag +} diff --git a/internal/config/types.go b/internal/config/types.go index a8d53e6..99ef3ce 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -5,6 +5,8 @@ import ( "strconv" "strings" "time" + + "github.com/any-hub/any-hub/internal/hubmodule" ) // Duration 提供更灵活的反序列化能力,同时兼容纯秒整数与 Go Duration 字符串。 @@ -68,6 +70,7 @@ type HubConfig struct { Proxy string `mapstructure:"Proxy"` Type string `mapstructure:"Type"` Module string `mapstructure:"Module"` + Rollout string `mapstructure:"Rollout"` Username string `mapstructure:"Username"` Password string `mapstructure:"Password"` CacheTTL Duration `mapstructure:"CacheTTL"` @@ -105,3 +108,14 @@ func CredentialModes(hubs []HubConfig) []string { } return result } + +// StrategyOverrides 将 hub 层的 TTL/Validation 配置映射为模块策略覆盖项。 +func (h HubConfig) StrategyOverrides(ttl time.Duration) hubmodule.StrategyOptions { + opts := hubmodule.StrategyOptions{ + TTLOverride: ttl, + } + if mode := strings.TrimSpace(h.ValidationMode); mode != "" { + opts.ValidationOverride = hubmodule.ValidationMode(mode) + } + return opts +} diff --git a/internal/config/validation.go b/internal/config/validation.go index c794fb3..7c6cb12 100644 --- a/internal/config/validation.go +++ b/internal/config/validation.go @@ -84,6 +84,11 @@ func (c *Config) Validate() error { return newFieldError(hubField(hub.Name, "Module"), fmt.Sprintf("未注册模块: %s", moduleKey)) } hub.Module = moduleKey + flag, err := parseRolloutFlag(hub.Rollout, hub.Module) + if err != nil { + return newFieldError(hubField(hub.Name, "Rollout"), err.Error()) + } + hub.Rollout = string(flag) if hub.ValidationMode != "" { mode := strings.ToLower(strings.TrimSpace(hub.ValidationMode)) switch mode { diff --git a/internal/hubmodule/README.md b/internal/hubmodule/README.md index 5fe7fca..ce31190 100644 --- a/internal/hubmodule/README.md +++ b/internal/hubmodule/README.md @@ -15,7 +15,7 @@ internal/hubmodule/ ## 模块约束 - **单一接口**:每个模块需要同时实现代理与缓存接口,避免跨包耦合。 - **注册流程**:在模块 `init()` 中调用 `hubmodule.Register(ModuleMetadata{...})`,注册失败必须 panic 以阻止启动。 -- **缓存布局**:一律使用 `StoragePath//.body`,如需附加目录需在 `ModuleMetadata` 中声明迁移策略。 +- **缓存布局**:一律使用 `StoragePath//`,即与上游请求完全一致的磁盘路径;当某个路径既要保存正文又要作为子目录父节点时,会在该目录下写入 `__content` 文件以存放正文。 - **配置注入**:模块仅通过依赖注入获取 `HubConfigEntry` 和全局参数,禁止直接读取文件或环境变量。 - **可观测性**:所有模块必须输出 `module_key`、命中/回源状态等日志字段,并在返回错误时附带 Hub 名称。 diff --git a/internal/hubmodule/doc.go b/internal/hubmodule/doc.go index f894f2d..d7e277c 100644 --- a/internal/hubmodule/doc.go +++ b/internal/hubmodule/doc.go @@ -3,7 +3,7 @@ // 模块作者需要: // 1. 在 internal/hubmodule// 目录下实现代理与缓存接口; // 2. 通过本包暴露的 Register 函数在 init() 中注册模块元数据; -// 3. 保证缓存写入仍遵循 StoragePath//.body 布局,并补充中文注释说明实现细节。 +// 3. 保证缓存写入仍遵循 StoragePath// 原始路径布局,并补充中文注释说明实现细节。 // // 该包同时负责提供模块发现、可观测信息以及迁移状态的对外查询能力。 package hubmodule diff --git a/internal/hubmodule/docker/module.go b/internal/hubmodule/docker/module.go new file mode 100644 index 0000000..dffc398 --- /dev/null +++ b/internal/hubmodule/docker/module.go @@ -0,0 +1,29 @@ +// Package docker 定义 Docker Hub 代理模块的元数据与缓存策略描述,供 registry 查表时使用。 +package docker + +import ( + "time" + + "github.com/any-hub/any-hub/internal/hubmodule" +) + +const dockerDefaultTTL = 12 * time.Hour + +// docker 模块继承 legacy 行为,但声明明确的缓存策略默认值,便于 hub 覆盖。 +func init() { + hubmodule.MustRegister(hubmodule.ModuleMetadata{ + Key: "docker", + Description: "Docker registry module with manifest/blob cache policies", + MigrationState: hubmodule.MigrationStateBeta, + SupportedProtocols: []string{ + "docker", + }, + CacheStrategy: hubmodule.CacheStrategyProfile{ + TTLHint: dockerDefaultTTL, + ValidationMode: hubmodule.ValidationModeETag, + DiskLayout: "raw_path", + RequiresMetadataFile: false, + SupportsStreamingWrite: true, + }, + }) +} diff --git a/internal/hubmodule/interfaces.go b/internal/hubmodule/interfaces.go index 9c362a4..8b97976 100644 --- a/internal/hubmodule/interfaces.go +++ b/internal/hubmodule/interfaces.go @@ -1,6 +1,10 @@ package hubmodule -import "time" +import ( + "time" + + "github.com/any-hub/any-hub/internal/cache" +) // MigrationState 描述模块上线阶段,方便观测端区分 legacy/beta/ga。 type MigrationState string @@ -36,9 +40,13 @@ type ModuleMetadata struct { MigrationState MigrationState SupportedProtocols []string CacheStrategy CacheStrategyProfile + LocatorRewrite LocatorRewrite } // DefaultModuleKey 返回内置 legacy 模块的键值。 func DefaultModuleKey() string { return defaultModuleKey } + +// LocatorRewrite 允许模块根据自身协议调整缓存路径,例如将 npm metadata 写入独立文件。 +type LocatorRewrite func(cache.Locator) cache.Locator diff --git a/internal/hubmodule/legacy/legacy_module.go b/internal/hubmodule/legacy/legacy_module.go index 9665bd6..2051c1e 100644 --- a/internal/hubmodule/legacy/legacy_module.go +++ b/internal/hubmodule/legacy/legacy_module.go @@ -1,3 +1,4 @@ +// Package legacy 提供旧版共享代理+缓存实现的适配器,确保未迁移 Hub 可继续运行。 package legacy import "github.com/any-hub/any-hub/internal/hubmodule" @@ -12,7 +13,7 @@ func init() { "docker", "npm", "go", "pypi", }, CacheStrategy: hubmodule.CacheStrategyProfile{ - DiskLayout: ".body", + DiskLayout: "raw_path", ValidationMode: hubmodule.ValidationModeETag, SupportsStreamingWrite: true, }, diff --git a/internal/hubmodule/legacy/state.go b/internal/hubmodule/legacy/state.go new file mode 100644 index 0000000..d223a60 --- /dev/null +++ b/internal/hubmodule/legacy/state.go @@ -0,0 +1,65 @@ +package legacy + +import ( + "sort" + "strings" + "sync" +) + +// RolloutFlag 描述 legacy 模块迁移阶段。 +type RolloutFlag string + +const ( + RolloutLegacyOnly RolloutFlag = "legacy-only" + RolloutDual RolloutFlag = "dual" + RolloutModular RolloutFlag = "modular" +) + +// AdapterState 记录特定 Hub 在 legacy 适配器中的运行状态。 +type AdapterState struct { + HubName string + ModuleKey string + Rollout RolloutFlag +} + +var ( + stateMu sync.RWMutex + state = make(map[string]AdapterState) +) + +// RecordAdapterState 更新指定 Hub 的 rollout 状态,供诊断端和日志使用。 +func RecordAdapterState(hubName, moduleKey string, flag RolloutFlag) { + if hubName == "" { + return + } + key := strings.ToLower(hubName) + stateMu.Lock() + state[key] = AdapterState{ + HubName: hubName, + ModuleKey: moduleKey, + Rollout: flag, + } + stateMu.Unlock() +} + +// SnapshotAdapterStates 返回所有 Hub 的 rollout 状态,按名称排序。 +func SnapshotAdapterStates() []AdapterState { + stateMu.RLock() + defer stateMu.RUnlock() + + if len(state) == 0 { + return nil + } + + keys := make([]string, 0, len(state)) + for k := range state { + keys = append(keys, k) + } + sort.Strings(keys) + + result := make([]AdapterState, 0, len(keys)) + for _, key := range keys { + result = append(result, state[key]) + } + return result +} diff --git a/internal/hubmodule/npm/locator.go b/internal/hubmodule/npm/locator.go new file mode 100644 index 0000000..cfc5e3e --- /dev/null +++ b/internal/hubmodule/npm/locator.go @@ -0,0 +1,41 @@ +package npm + +import ( + "strings" + + "github.com/any-hub/any-hub/internal/cache" +) + +// rewriteLocator 将 npm metadata JSON 落盘至 package.json,避免与 tarball +// 路径的 `/-/` 子目录冲突,同时保持 tarball 使用原始路径。 +func rewriteLocator(loc cache.Locator) cache.Locator { + path := loc.Path + if path == "" { + return loc + } + + var qsSuffix string + core := path + if idx := strings.Index(core, "/__qs/"); idx >= 0 { + qsSuffix = core[idx:] + core = core[:idx] + } + + if strings.Contains(core, "/-/") { + loc.Path = core + qsSuffix + return loc + } + + clean := strings.TrimSuffix(core, "/") + if clean == "" { + clean = "/" + } + + if clean == "/" { + loc.Path = "/package.json" + qsSuffix + return loc + } + + loc.Path = clean + "/package.json" + qsSuffix + return loc +} diff --git a/internal/hubmodule/npm/module.go b/internal/hubmodule/npm/module.go new file mode 100644 index 0000000..f5ef93f --- /dev/null +++ b/internal/hubmodule/npm/module.go @@ -0,0 +1,30 @@ +// Package npm 描述 npm Registry 模块的默认策略与注册逻辑,方便新 Hub 直接启用。 +package npm + +import ( + "time" + + "github.com/any-hub/any-hub/internal/hubmodule" +) + +const npmDefaultTTL = 30 * time.Minute + +// npm 模块描述 NPM Registry 的默认缓存策略,并允许通过 [[Hub]] 覆盖 TTL/Validation。 +func init() { + hubmodule.MustRegister(hubmodule.ModuleMetadata{ + Key: "npm", + Description: "NPM proxy module with cache strategy overrides for metadata/tarballs", + MigrationState: hubmodule.MigrationStateBeta, + SupportedProtocols: []string{ + "npm", + }, + CacheStrategy: hubmodule.CacheStrategyProfile{ + TTLHint: npmDefaultTTL, + ValidationMode: hubmodule.ValidationModeLastModified, + DiskLayout: "raw_path", + RequiresMetadataFile: false, + SupportsStreamingWrite: true, + }, + LocatorRewrite: rewriteLocator, + }) +} diff --git a/internal/hubmodule/npm/module_test.go b/internal/hubmodule/npm/module_test.go new file mode 100644 index 0000000..4c4d7de --- /dev/null +++ b/internal/hubmodule/npm/module_test.go @@ -0,0 +1,52 @@ +package npm + +import ( + "testing" + "time" + + "github.com/any-hub/any-hub/internal/hubmodule" +) + +func TestNPMMetadataRegistration(t *testing.T) { + meta, ok := hubmodule.Resolve("npm") + if !ok { + t.Fatalf("npm module not registered") + } + if meta.Key != "npm" { + t.Fatalf("unexpected module key: %s", meta.Key) + } + if meta.MigrationState == "" { + t.Fatalf("migration state must be set") + } + if len(meta.SupportedProtocols) == 0 { + t.Fatalf("supported protocols must not be empty") + } + if meta.CacheStrategy.TTLHint != npmDefaultTTL { + t.Fatalf("expected default ttl %s, got %s", npmDefaultTTL, meta.CacheStrategy.TTLHint) + } + if meta.CacheStrategy.ValidationMode != hubmodule.ValidationModeLastModified { + t.Fatalf("expected validation mode last-modified, got %s", meta.CacheStrategy.ValidationMode) + } + if !meta.CacheStrategy.SupportsStreamingWrite { + t.Fatalf("npm strategy should support streaming writes") + } +} + +func TestNPMStrategyOverrides(t *testing.T) { + meta, ok := hubmodule.Resolve("npm") + if !ok { + t.Fatalf("npm module not registered") + } + + overrideTTL := 10 * time.Minute + strategy := hubmodule.ResolveStrategy(meta, hubmodule.StrategyOptions{ + TTLOverride: overrideTTL, + ValidationOverride: hubmodule.ValidationModeETag, + }) + if strategy.TTLHint != overrideTTL { + t.Fatalf("expected ttl override %s, got %s", overrideTTL, strategy.TTLHint) + } + if strategy.ValidationMode != hubmodule.ValidationModeETag { + t.Fatalf("expected validation mode override to etag, got %s", strategy.ValidationMode) + } +} diff --git a/internal/hubmodule/pypi/module.go b/internal/hubmodule/pypi/module.go new file mode 100644 index 0000000..a53036e --- /dev/null +++ b/internal/hubmodule/pypi/module.go @@ -0,0 +1,29 @@ +// Package pypi 聚焦 PyPI simple index 模块,提供 TTL/验证策略的注册样例。 +package pypi + +import ( + "time" + + "github.com/any-hub/any-hub/internal/hubmodule" +) + +const pypiDefaultTTL = 15 * time.Minute + +// pypi 模块负责 simple index + 分发包的策略声明,默认使用 Last-Modified 校验。 +func init() { + hubmodule.MustRegister(hubmodule.ModuleMetadata{ + Key: "pypi", + Description: "PyPI simple index module with per-hub cache overrides", + MigrationState: hubmodule.MigrationStateBeta, + SupportedProtocols: []string{ + "pypi", + }, + CacheStrategy: hubmodule.CacheStrategyProfile{ + TTLHint: pypiDefaultTTL, + ValidationMode: hubmodule.ValidationModeLastModified, + DiskLayout: "raw_path", + RequiresMetadataFile: false, + SupportsStreamingWrite: true, + }, + }) +} diff --git a/internal/hubmodule/strategy.go b/internal/hubmodule/strategy.go index 2b83f90..b585b11 100644 --- a/internal/hubmodule/strategy.go +++ b/internal/hubmodule/strategy.go @@ -17,5 +17,18 @@ func ResolveStrategy(meta ModuleMetadata, opts StrategyOptions) CacheStrategyPro if opts.ValidationOverride != "" { strategy.ValidationMode = opts.ValidationOverride } - return strategy + return normalizeStrategy(strategy) +} + +func normalizeStrategy(profile CacheStrategyProfile) CacheStrategyProfile { + if profile.TTLHint < 0 { + profile.TTLHint = 0 + } + if profile.ValidationMode == "" { + profile.ValidationMode = ValidationModeETag + } + if profile.DiskLayout == "" { + profile.DiskLayout = "raw_path" + } + return profile } diff --git a/internal/hubmodule/template/module.go b/internal/hubmodule/template/module.go index a846240..e15377a 100644 --- a/internal/hubmodule/template/module.go +++ b/internal/hubmodule/template/module.go @@ -1,8 +1,7 @@ +// Package template 提供编写新模块时可复制的骨架示例。 package template import "github.com/any-hub/any-hub/internal/hubmodule" - -// Package template 提供编写新模块时可复制的骨架示例。 // // 使用方式:复制整个目录到 internal/hubmodule// 并替换字段。 // - 将 TemplateModule 重命名为实际模块类型。 diff --git a/internal/logging/fields.go b/internal/logging/fields.go index b4cd021..e9df691 100644 --- a/internal/logging/fields.go +++ b/internal/logging/fields.go @@ -11,13 +11,14 @@ func BaseFields(action, configPath string) logrus.Fields { } // RequestFields 提供 hub/domain/命中状态字段,供代理请求日志复用。 -func RequestFields(hub, domain, hubType, authMode, moduleKey string, cacheHit bool) logrus.Fields { +func RequestFields(hub, domain, hubType, authMode, moduleKey, rolloutFlag string, cacheHit bool) logrus.Fields { return logrus.Fields{ - "hub": hub, - "domain": domain, - "hub_type": hubType, - "auth_mode": authMode, - "cache_hit": cacheHit, - "module_key": moduleKey, + "hub": hub, + "domain": domain, + "hub_type": hubType, + "auth_mode": authMode, + "cache_hit": cacheHit, + "module_key": moduleKey, + "rollout_flag": rolloutFlag, } } diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index a7aa779..6d5db93 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -47,6 +47,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error { requestID := server.RequestID(c) locator := buildLocator(route, c) policy := determineCachePolicy(route, locator, c.Method()) + strategyWriter := cache.NewStrategyWriter(h.store, route.CacheStrategy) if err := ensureProxyHubType(route); err != nil { h.logger.WithFields(logrus.Fields{ @@ -62,7 +63,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error { } var cached *cache.ReadResult - if h.store != nil && policy.allowCache { + if strategyWriter.Enabled() && policy.allowCache { result, err := h.store.Get(ctx, locator) switch { case err == nil: @@ -79,13 +80,19 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error { if cached != nil { serve := true if policy.requireRevalidate { - fresh, err := h.isCacheFresh(c, route, locator, cached.Entry) - if err != nil { - h.logger.WithError(err). - WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}). - Warn("cache_revalidate_failed") - serve = false - } else if !fresh { + if strategyWriter.ShouldBypassValidation(cached.Entry) { + serve = true + } else if strategyWriter.SupportsValidation() { + fresh, err := h.isCacheFresh(c, route, locator, cached.Entry) + if err != nil { + h.logger.WithError(err). + WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}). + Warn("cache_revalidate_failed") + serve = false + } else if !fresh { + serve = false + } + } else { serve = false } } @@ -96,7 +103,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error { cached.Reader.Close() } - return h.fetchAndStream(c, route, locator, policy, requestID, started, ctx) + return h.fetchAndStream(c, route, locator, policy, strategyWriter, requestID, started, ctx) } func (h *Handler) serveCache(c fiber.Ctx, route *server.HubRoute, result *cache.ReadResult, requestID string, started time.Time) error { @@ -144,7 +151,7 @@ func (h *Handler) serveCache(c fiber.Ctx, route *server.HubRoute, result *cache. return nil } -func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, policy cachePolicy, requestID string, started time.Time, ctx context.Context) error { +func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, policy cachePolicy, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context) error { resp, upstreamURL, err := h.executeRequest(c, route) if err != nil { h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err) @@ -158,17 +165,17 @@ func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator ca } defer resp.Body.Close() - shouldStore := policy.allowStore && h.store != nil && isCacheableStatus(resp.StatusCode) && c.Method() == http.MethodGet - return h.consumeUpstream(c, route, locator, resp, shouldStore, requestID, started, ctx) + shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) && c.Method() == http.MethodGet + return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx) } -func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, shouldStore bool, requestID string, started time.Time, ctx context.Context) error { +func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, shouldStore bool, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context) error { upstreamURL := resp.Request.URL.String() method := c.Method() authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials() if shouldStore { - return h.cacheAndStream(c, route, locator, resp, requestID, started, ctx, upstreamURL) + return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL) } copyResponseHeaders(c, resp.Header) @@ -196,7 +203,7 @@ func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator c return nil } -func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, requestID string, started time.Time, ctx context.Context, upstreamURL string) error { +func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context, upstreamURL string) error { copyResponseHeaders(c, resp.Header) c.Set("X-Any-Hub-Upstream", upstreamURL) c.Set("X-Any-Hub-Cache-Hit", "false") @@ -208,7 +215,7 @@ func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator ca reader := io.TeeReader(resp.Body, c.Response().BodyWriter()) opts := cache.PutOptions{ModTime: extractModTime(resp.Header)} - entry, err := h.store.Put(ctx, locator, reader, opts) + entry, err := writer.Put(ctx, locator, reader, opts) h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err) if err != nil { return fiber.NewError(fiber.StatusBadGateway, fmt.Sprintf("cache_write_failed: %v", err)) @@ -323,7 +330,7 @@ func (h *Handler) writeError(c fiber.Ctx, status int, code string) error { } func (h *Handler) logResult(route *server.HubRoute, upstream string, requestID string, status int, cacheHit bool, started time.Time, err error) { - fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, cacheHit) + fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), cacheHit) fields["action"] = "proxy" fields["upstream"] = upstream fields["upstream_status"] = status @@ -399,10 +406,14 @@ func buildLocator(route *server.HubRoute, c fiber.Ctx) cache.Locator { sum := sha1.Sum(query) clean = fmt.Sprintf("%s/__qs/%s", clean, hex.EncodeToString(sum[:])) } - return cache.Locator{ + loc := cache.Locator{ HubName: route.Config.Name, Path: clean, } + if route.Module.LocatorRewrite != nil { + loc = route.Module.LocatorRewrite(loc) + } + return loc } func stripQueryMarker(p string) string { @@ -807,7 +818,7 @@ func isAuthFailure(status int) bool { } func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestID string, status int) { - fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, false) + fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), false) fields["action"] = "proxy_retry" fields["upstream"] = upstream fields["upstream_status"] = status @@ -819,7 +830,7 @@ func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestI } func (h *Handler) logAuthFailure(route *server.HubRoute, upstream string, requestID string, status int) { - fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, false) + fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), false) fields["action"] = "proxy" fields["upstream"] = upstream fields["upstream_status"] = status diff --git a/internal/server/bootstrap.go b/internal/server/bootstrap.go index 8831f48..8d00cc5 100644 --- a/internal/server/bootstrap.go +++ b/internal/server/bootstrap.go @@ -3,13 +3,12 @@ package server import ( "fmt" - "github.com/any-hub/any-hub/internal/config" "github.com/any-hub/any-hub/internal/hubmodule" ) -func moduleMetadataForHub(hub config.HubConfig) (hubmodule.ModuleMetadata, error) { - if meta, ok := hubmodule.Resolve(hub.Module); ok { +func moduleMetadataForKey(key string) (hubmodule.ModuleMetadata, error) { + if meta, ok := hubmodule.Resolve(key); ok { return meta, nil } - return hubmodule.ModuleMetadata{}, fmt.Errorf("module %s is not registered", hub.Module) + return hubmodule.ModuleMetadata{}, fmt.Errorf("module %s is not registered", key) } diff --git a/internal/server/hub_registry.go b/internal/server/hub_registry.go index dcfcfd8..f61bfa7 100644 --- a/internal/server/hub_registry.go +++ b/internal/server/hub_registry.go @@ -11,6 +11,7 @@ import ( "github.com/any-hub/any-hub/internal/config" "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/hubmodule/legacy" ) // HubRoute 将 Hub 配置与派生属性(如缓存 TTL、解析后的 Upstream/Proxy URL) @@ -28,6 +29,10 @@ type HubRoute struct { // ModuleKey/Module 记录当前 hub 选用的模块及其元数据,便于日志与观测。 ModuleKey string Module hubmodule.ModuleMetadata + // CacheStrategy 代表模块默认策略与 hub 覆盖后的最终结果。 + CacheStrategy hubmodule.CacheStrategyProfile + // RolloutFlag 反映当前 hub 的 legacy → modular 迁移状态,供日志/诊断使用。 + RolloutFlag legacy.RolloutFlag } // HubRegistry 提供 Host/Host:port 到 HubRoute 的查询能力,所有 Hub 共享同一个监听端口。 @@ -100,7 +105,9 @@ func (r *HubRegistry) List() []HubRoute { } func buildHubRoute(cfg *config.Config, hub config.HubConfig) (*HubRoute, error) { - meta, err := moduleMetadataForHub(hub) + flag := hub.RolloutFlagValue() + effectiveKey := config.EffectiveModuleKey(hub.Module, flag) + meta, err := moduleMetadataForKey(effectiveKey) if err != nil { return nil, fmt.Errorf("hub %s: %w", hub.Name, err) } @@ -118,14 +125,20 @@ func buildHubRoute(cfg *config.Config, hub config.HubConfig) (*HubRoute, error) } } + effectiveTTL := cfg.EffectiveCacheTTL(hub) + runtime := config.BuildHubRuntime(hub, meta, effectiveTTL, flag) + legacy.RecordAdapterState(hub.Name, runtime.Module.Key, flag) + return &HubRoute{ - Config: hub, - ListenPort: cfg.Global.ListenPort, - CacheTTL: cfg.EffectiveCacheTTL(hub), - UpstreamURL: upstreamURL, - ProxyURL: proxyURL, - ModuleKey: meta.Key, - Module: meta, + Config: hub, + ListenPort: cfg.Global.ListenPort, + CacheTTL: effectiveTTL, + UpstreamURL: upstreamURL, + ProxyURL: proxyURL, + ModuleKey: runtime.Module.Key, + Module: runtime.Module, + CacheStrategy: runtime.CacheStrategy, + RolloutFlag: runtime.Rollout, }, nil } diff --git a/internal/server/hub_registry_test.go b/internal/server/hub_registry_test.go index 46481a9..4380d9c 100644 --- a/internal/server/hub_registry_test.go +++ b/internal/server/hub_registry_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/hubmodule/legacy" ) func TestHubRegistryLookupByHost(t *testing.T) { @@ -48,6 +49,15 @@ func TestHubRegistryLookupByHost(t *testing.T) { if route.CacheTTL != cfg.EffectiveCacheTTL(route.Config) { t.Errorf("cache ttl mismatch: got %s", route.CacheTTL) } + if route.CacheStrategy.TTLHint != route.CacheTTL { + t.Errorf("cache strategy ttl mismatch: %s vs %s", route.CacheStrategy.TTLHint, route.CacheTTL) + } + if route.CacheStrategy.ValidationMode == "" { + t.Fatalf("cache strategy validation mode should not be empty") + } + if route.RolloutFlag != legacy.RolloutLegacyOnly { + t.Fatalf("default rollout flag should be legacy-only") + } if route.UpstreamURL.String() != "https://registry-1.docker.io" { t.Errorf("unexpected upstream URL: %s", route.UpstreamURL) diff --git a/internal/server/router.go b/internal/server/router.go index 1421ced..f62eeb2 100644 --- a/internal/server/router.go +++ b/internal/server/router.go @@ -79,6 +79,10 @@ func requestContextMiddleware(opts AppOptions) fiber.Handler { c.Locals(contextKeyRequestID, reqID) c.Set("X-Request-ID", reqID) + if isDiagnosticsPath(string(c.Request().URI().Path())) { + return c.Next() + } + rawHost := strings.TrimSpace(getHostHeader(c)) route, ok := opts.Registry.Lookup(rawHost) if !ok { @@ -153,13 +157,19 @@ func ensureRouterHubType(route *HubRoute) error { func renderTypeUnsupported(c fiber.Ctx, logger *logrus.Logger, route *HubRoute, err error) error { fields := logrus.Fields{ - "action": "hub_type_check", - "hub": route.Config.Name, - "hub_type": route.Config.Type, - "error": "hub_type_unsupported", + "action": "hub_type_check", + "hub": route.Config.Name, + "hub_type": route.Config.Type, + "module_key": route.ModuleKey, + "rollout_flag": string(route.RolloutFlag), + "error": "hub_type_unsupported", } logger.WithFields(fields).Error(err.Error()) return c.Status(fiber.StatusNotImplemented).JSON(fiber.Map{ "error": "hub_type_unsupported", }) } + +func isDiagnosticsPath(path string) bool { + return strings.HasPrefix(path, "/-/") +} diff --git a/internal/server/routes/modules.go b/internal/server/routes/modules.go new file mode 100644 index 0000000..198c43c --- /dev/null +++ b/internal/server/routes/modules.go @@ -0,0 +1,114 @@ +package routes + +import ( + "sort" + "strings" + "time" + + "github.com/gofiber/fiber/v3" + + "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/server" +) + +// RegisterModuleRoutes 暴露 /-/modules 诊断接口,供 SRE 查询模块与 Hub 绑定关系。 +func RegisterModuleRoutes(app *fiber.App, registry *server.HubRegistry) { + if app == nil || registry == nil { + return + } + + app.Get("/-/modules", func(c fiber.Ctx) error { + payload := fiber.Map{ + "modules": encodeModules(hubmodule.List()), + "hubs": encodeHubBindings(registry.List()), + } + return c.JSON(payload) + }) + + app.Get("/-/modules/:key", func(c fiber.Ctx) error { + key := strings.ToLower(strings.TrimSpace(c.Params("key"))) + if key == "" { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "module_key_required"}) + } + meta, ok := hubmodule.Resolve(key) + if !ok { + return c.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "module_not_found"}) + } + return c.JSON(encodeModule(meta)) + }) +} + +type modulePayload struct { + Key string `json:"key"` + Description string `json:"description"` + MigrationState hubmodule.MigrationState `json:"migration_state"` + SupportedProtocols []string `json:"supported_protocols"` + CacheStrategy cacheStrategyPayload `json:"cache_strategy"` +} + +type cacheStrategyPayload struct { + TTLSeconds int64 `json:"ttl_seconds"` + ValidationMode string `json:"validation_mode"` + DiskLayout string `json:"disk_layout"` + RequiresMetadataFile bool `json:"requires_metadata_file"` + SupportsStreamingWrite bool `json:"supports_streaming_write"` +} + +type hubBindingPayload struct { + HubName string `json:"hub_name"` + ModuleKey string `json:"module_key"` + Domain string `json:"domain"` + Port int `json:"port"` + Rollout string `json:"rollout_flag"` +} + +func encodeModules(mods []hubmodule.ModuleMetadata) []modulePayload { + if len(mods) == 0 { + return nil + } + sort.Slice(mods, func(i, j int) bool { + return mods[i].Key < mods[j].Key + }) + result := make([]modulePayload, 0, len(mods)) + for _, meta := range mods { + result = append(result, encodeModule(meta)) + } + return result +} + +func encodeModule(meta hubmodule.ModuleMetadata) modulePayload { + strategy := meta.CacheStrategy + return modulePayload{ + Key: meta.Key, + Description: meta.Description, + MigrationState: meta.MigrationState, + SupportedProtocols: append([]string(nil), meta.SupportedProtocols...), + CacheStrategy: cacheStrategyPayload{ + TTLSeconds: int64(strategy.TTLHint / time.Second), + ValidationMode: string(strategy.ValidationMode), + DiskLayout: strategy.DiskLayout, + RequiresMetadataFile: strategy.RequiresMetadataFile, + SupportsStreamingWrite: strategy.SupportsStreamingWrite, + }, + } +} + +func encodeHubBindings(routes []server.HubRoute) []hubBindingPayload { + if len(routes) == 0 { + return nil + } + sort.Slice(routes, func(i, j int) bool { + return routes[i].Config.Name < routes[j].Config.Name + }) + result := make([]hubBindingPayload, 0, len(routes)) + for _, route := range routes { + result = append(result, hubBindingPayload{ + HubName: route.Config.Name, + ModuleKey: route.ModuleKey, + Domain: route.Config.Domain, + Port: route.ListenPort, + Rollout: string(route.RolloutFlag), + }) + } + return result +} diff --git a/main.go b/main.go index 6a214c3..0cf4987 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "github.com/any-hub/any-hub/internal/logging" "github.com/any-hub/any-hub/internal/proxy" "github.com/any-hub/any-hub/internal/server" + "github.com/any-hub/any-hub/internal/server/routes" "github.com/any-hub/any-hub/internal/version" ) @@ -144,6 +145,7 @@ func startHTTPServer(cfg *config.Config, registry *server.HubRegistry, proxyHand if err != nil { return err } + routes.RegisterModuleRoutes(app, registry) logger.WithFields(logrus.Fields{ "action": "listen", diff --git a/specs/004-modular-proxy-cache/data-model.md b/specs/004-modular-proxy-cache/data-model.md index 794ee81..85dfefd 100644 --- a/specs/004-modular-proxy-cache/data-model.md +++ b/specs/004-modular-proxy-cache/data-model.md @@ -2,7 +2,7 @@ ## Overview -The modular architecture introduces explicit metadata describing which proxy+cache module each hub uses, how modules register themselves, and what cache policies they expose. The underlying storage layout (`StoragePath//.body`) remains unchanged, but new metadata ensures the runtime can resolve modules, enforce compatibility, and migrate legacy hubs incrementally. +The modular architecture introduces explicit metadata describing which proxy+cache module each hub uses, how modules register themselves, and what cache policies they expose. The underlying storage layout now matches the upstream request path (`StoragePath//`), simplifying disk management while metadata ensures the runtime can resolve modules, enforce compatibility, and migrate legacy hubs incrementally. ## Entities @@ -48,7 +48,7 @@ The modular architecture introduces explicit metadata describing which proxy+cac - **Fields**: - `TTL` *(duration)* – default TTL per module; hubs may override via config. - `ValidationMode` *(enum: `etag`, `last-modified`, `never`)* – defines revalidation behavior. - - `DiskLayout` *(string)* – description of path mapping rules (default `.body` suffix). + - `DiskLayout` *(string)* – description of path mapping rules (default `raw_path`, i.e., exact upstream path without suffix). - `RequiresMetadataFile` *(bool)* – whether `.meta` entries are required. - `SupportsStreamingWrite` *(bool)* – indicates module can write cache while proxying upstream. - **Relationships**: @@ -92,4 +92,3 @@ The modular architecture introduces explicit metadata describing which proxy+cac - `HubConfigEntry.Name` and `ModuleMetadata.Key` must each be unique (case-insensitive) within a config/process. - Module registry rejects duplicate keys to avoid ambiguous bindings. - diff --git a/specs/004-modular-proxy-cache/plan.md b/specs/004-modular-proxy-cache/plan.md index 6261291..82889cf 100644 --- a/specs/004-modular-proxy-cache/plan.md +++ b/specs/004-modular-proxy-cache/plan.md @@ -13,7 +13,7 @@ Modularize the proxy and cache layers so every hub type (npm, Docker, PyPI, futu **Language/Version**: Go 1.25+ (静态链接,单二进制交付) **Primary Dependencies**: Fiber v3(HTTP 服务)、Viper(配置)、Logrus + Lumberjack(结构化日志 & 滚动)、标准库 `net/http`/`io` -**Storage**: 本地文件系统缓存目录 `StoragePath//.body` + `.meta` 元数据(模块必须复用同一布局) +**Storage**: 本地文件系统缓存目录 `StoragePath//`,直接复用请求路径完成磁盘定位 **Testing**: `go test ./...`,使用 `httptest`、临时目录和自建上游伪服务验证配置/缓存/代理路径 **Target Platform**: Linux/Unix CLI 进程,由 systemd/supervisor 管理,匿名下游客户端 **Project Type**: 单 Go 项目(`cmd/` 入口 + `internal/*` 包) @@ -104,7 +104,7 @@ tests/ # `go test` 下的单元/集成测试,用临时目 - New diagnostics endpoint remains internal and optional; no UI/login introduced. ✅ Principle I - Code still single Go binary with existing dependency set. ✅ Principle II - `Module` field documented with defaults, validation, and migration path; no extra config sources. ✅ Principle III -- Cache strategy enforces `.body` layout and streaming flow, with telemetry requirements captured in contracts. ✅ Principle IV +- Cache strategy enforces“原始路径 == 磁盘路径”的布局与流式回源,相关观测需求写入 contracts。✅ Principle IV - Logs/quickstart/test guidance ensure observability and Chinese documentation continue. ✅ Principle V ## Phase 2 – Implementation Outlook (pre-tasks) @@ -112,6 +112,6 @@ tests/ # `go test` 下的单元/集成测试,用临时目 1. **Module Registry & Interfaces**: Create `internal/hubmodule` package, define shared interfaces, implement registry with tests, and expose diagnostics data source reused by HTTP endpoints. 2. **Config Loader & Validation**: Extend `internal/config/types.go` and `validation.go` to include `Module` with default `legacy`, plus wiring to registry resolution during startup. 3. **Legacy Adapter & Migration Switches**: Provide adapter module that wraps current shared proxy/cache, plus feature flags or config toggles to control rollout states per hub. -4. **Module Implementations**: Carve existing npm/docker/pypi logic into dedicated modules within `internal/hubmodule/`, ensuring cache writer uses `.body` layout and telemetry tags. +4. **Module Implementations**: Carve existing npm/docker/pypi logic into dedicated modules within `internal/hubmodule/`, ensuring cache writer复用原始请求路径与必要的 telemetry 标签。 5. **Observability/Diagnostics**: Implement `/−/modules` endpoint (Fiber route) and log tags showing `module_key` on cache/proxy events. 6. **Testing**: Add shared test harness for modules, update integration tests to cover mixed legacy + modular hubs, and document commands in README/quickstart. diff --git a/specs/004-modular-proxy-cache/quickstart.md b/specs/004-modular-proxy-cache/quickstart.md index a95dd92..6f0b828 100644 --- a/specs/004-modular-proxy-cache/quickstart.md +++ b/specs/004-modular-proxy-cache/quickstart.md @@ -13,16 +13,24 @@ ## 3. Bind Module via Config 1. Edit `config.toml` and set `Module = ""` inside the target `[[Hub]]` block (omit to use `legacy`). -2. (Optional) Override cache behavior per hub using existing fields (`CacheTTL`, etc.). -3. Run `ANY_HUB_CONFIG=./config.toml go test ./...` to ensure loader validation passes. +2. While validating a new module, set `Rollout = "dual"` so you can flip back to legacy without editing other fields. +3. (Optional) Override cache behavior per hub using existing fields (`CacheTTL`, etc.). +4. Run `ANY_HUB_CONFIG=./config.toml go test ./...` (or `make modules-test`) to ensure loader validation passes and the module registry sees your key. ## 4. Run and Verify 1. Start the binary: `go run ./cmd/any-hub --config ./config.toml`. -2. Send traffic to the hub's domain/port and watch logs for `module_key=` tags. -3. Inspect `./storage//` to confirm `.body` files are written by the module. -4. Exercise rollback by switching `Module` back to `legacy` if needed. +2. Use `curl -H "Host: " http://127.0.0.1:/` to produce traffic, then hit `curl http://127.0.0.1:/-/modules` and confirm the hub binding points to your module with the expected `rollout_flag`. +3. Inspect `./storage//` to confirm the cached files mirror the upstream path (no suffix). When a path also has child entries (e.g., `/pkg` metadata plus `/pkg/-/...` tarballs), the metadata payload is stored in a `__content` file under that directory so both artifacts can coexist. Verify TTL overrides are propagated. +4. Monitor `logs/any-hub.log` (or the sample `logs/module_migration_sample.log`) to verify each entry exposes `module_key` + `rollout_flag`. Example: + ```json + {"action":"proxy","hub":"testhub","module_key":"testhub","rollout_flag":"dual","cache_hit":false,"upstream_status":200} + ``` +5. Exercise rollback by switching `Rollout = "legacy-only"` (or `Module = "legacy"` if needed) and re-running the traffic to ensure diagnostics/logs show the transition. ## 5. Ship 1. Commit module code + config docs. 2. Update release notes mentioning the module key, migration guidance, and related diagnostics. 3. Monitor cache hit/miss metrics post-deploy; adjust TTL overrides if necessary. + +## 6. Attach Validation Artifacts +- Save the JSON snapshot from `/-/modules` and a short log excerpt (see `logs/module_migration_sample.log`) with both legacy + modular hubs present; attach them to the change request so reviewers can confirm you followed the playbook. diff --git a/specs/004-modular-proxy-cache/research.md b/specs/004-modular-proxy-cache/research.md index 7c251b8..f1726e3 100644 --- a/specs/004-modular-proxy-cache/research.md +++ b/specs/004-modular-proxy-cache/research.md @@ -20,9 +20,9 @@ - **Alternatives considered**: Allow modules to spin up custom Fiber groups or loggers—rejected because it complicates shutdown hooks and breaks structured logging consistency. ## Decision 4: Storage Layout Compatibility -- **Decision**: Keep current `StoragePath//.body` layout; modules may add subdirectories below `` only when necessary but must expose migration hooks via registry metadata. -- **Rationale**: Recent cache fix established `.body` suffix to avoid file/dir conflicts; modules should reuse it to maintain operational tooling compatibility. -- **Alternatives considered**: Give each module a distinct root folder—rejected because it would fragment cleanup tooling and require per-module disk quotas. +- **Decision**: Reuse the original request path directly (`StoragePath//`) so operators can browse cached artifacts without suffix translation; modules share the same layout and rely on directory creation safeguards to avoid traversal issues. +- **Rationale**: Aligns with operational workflows that expect “what you request is what you store,” simplifying manual cache invalidation and disk audits now that we no longer need `.body` indirection. +- **Alternatives considered**: Keep the `.body` suffix or add per-module subdirectories—rejected because suffix-based migrations complicate tooling and dedicated subdirectories fragment cache quotas. ## Decision 5: Testing Strategy - **Decision**: For each module, enforce a shared test harness that spins a fake upstream using `httptest.Server`, writes to `t.TempDir()` storage, and asserts registry wiring end-to-end via integration tests. diff --git a/specs/004-modular-proxy-cache/tasks.md b/specs/004-modular-proxy-cache/tasks.md index dc4a9a5..c8eff3c 100644 --- a/specs/004-modular-proxy-cache/tasks.md +++ b/specs/004-modular-proxy-cache/tasks.md @@ -48,14 +48,14 @@ ### Tests -- [ ] T012 [P] [US2] Add cache strategy override integration test validating TTL + revalidation paths in `tests/integration/cache_strategy_override_test.go` -- [ ] T013 [P] [US2] Add module-level cache strategy unit tests in `internal/hubmodule/npm/module_test.go` +- [X] T012 [P] [US2] Add cache strategy override integration test validating TTL + revalidation paths in `tests/integration/cache_strategy_override_test.go` +- [X] T013 [P] [US2] Add module-level cache strategy unit tests in `internal/hubmodule/npm/module_test.go` ### Implementation -- [ ] T014 [US2] Implement `CacheStrategyProfile` helpers and injection plumbing (`internal/hubmodule/strategy.go`, `internal/cache/writer.go`) -- [ ] T015 [US2] Bind hub-level overrides to strategy metadata via config/runtime structures (`internal/config/types.go`, `internal/config/runtime.go`) -- [ ] T016 [US2] Update existing modules (npm/docker/pypi) to declare strategies + honor overrides (`internal/hubmodule/{npm,docker,pypi}/module.go`) +- [X] T014 [US2] Implement `CacheStrategyProfile` helpers and injection plumbing (`internal/hubmodule/strategy.go`, `internal/cache/writer.go`) +- [X] T015 [US2] Bind hub-level overrides to strategy metadata via config/runtime structures (`internal/config/types.go`, `internal/config/runtime.go`) +- [X] T016 [US2] Update existing modules (npm/docker/pypi) to declare strategies + honor overrides (`internal/hubmodule/{npm,docker,pypi}/module.go`) --- @@ -66,22 +66,22 @@ ### Tests -- [ ] T017 [P] [US3] Add dual-mode integration test covering rollout toggle + rollback in `tests/integration/legacy_adapter_toggle_test.go` -- [ ] T018 [P] [US3] Add diagnostics endpoint contract test for `/−/modules` in `tests/integration/module_diagnostics_test.go` +- [X] T017 [P] [US3] Add dual-mode integration test covering rollout toggle + rollback in `tests/integration/legacy_adapter_toggle_test.go` +- [X] T018 [P] [US3] Add diagnostics endpoint contract test for `/−/modules` in `tests/integration/module_diagnostics_test.go` ### Implementation -- [ ] T019 [US3] Implement `LegacyAdapterState` tracker + rollout flag parsing (`internal/hubmodule/legacy/state.go`, `internal/config/runtime_flags.go`) -- [ ] T020 [US3] Implement Fiber handler + routing for `/−/modules` diagnostics (`internal/server/routes/modules.go`, `internal/server/router.go`) -- [ ] T021 [US3] Add structured log fields (`module_key`, `rollout_flag`) across logging middleware (`internal/server/middleware/logging.go`, `internal/proxy/logging.go`) -- [ ] T022 [US3] Document operational playbook for phased migration (`docs/operations/migration.md`) +- [X] T019 [US3] Implement `LegacyAdapterState` tracker + rollout flag parsing (`internal/hubmodule/legacy/state.go`, `internal/config/runtime_flags.go`) +- [X] T020 [US3] Implement Fiber handler + routing for `/−/modules` diagnostics (`internal/server/routes/modules.go`, `internal/server/router.go`) +- [X] T021 [US3] Add structured log fields (`module_key`, `rollout_flag`) across logging middleware (`internal/server/middleware/logging.go`, `internal/proxy/logging.go`) +- [X] T022 [US3] Document operational playbook for phased migration (`docs/operations/migration.md`) --- ## Phase 6: Polish & Cross-Cutting Concerns -- [ ] T023 [P] Add Chinese comments + GoDoc for new interfaces/modules (`internal/hubmodule/**/*.go`) -- [ ] T024 Validate quickstart by running module creation flow end-to-end and capture sample logs (`specs/004-modular-proxy-cache/quickstart.md`, `logs/`) +- [X] T023 [P] Add Chinese comments + GoDoc for new interfaces/modules (`internal/hubmodule/**/*.go`) +- [X] T024 Validate quickstart by running module creation flow end-to-end and capture sample logs (`specs/004-modular-proxy-cache/quickstart.md`, `logs/`) --- diff --git a/tests/integration/cache_strategy_override_test.go b/tests/integration/cache_strategy_override_test.go new file mode 100644 index 0000000..6d2102e --- /dev/null +++ b/tests/integration/cache_strategy_override_test.go @@ -0,0 +1,197 @@ +package integration + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gofiber/fiber/v3" + "github.com/sirupsen/logrus" + + "github.com/any-hub/any-hub/internal/cache" + "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/proxy" + "github.com/any-hub/any-hub/internal/server" +) + +func TestCacheStrategyOverrides(t *testing.T) { + t.Run("ttl defers revalidation until expired", func(t *testing.T) { + stub := newUpstreamStub(t, upstreamNPM) + defer stub.Close() + + storageDir := t.TempDir() + ttl := 50 * time.Millisecond + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 6100, + CacheTTL: config.Duration(time.Second), + StoragePath: storageDir, + }, + Hubs: []config.HubConfig{ + { + Name: "npm-ttl", + Domain: "ttl.npm.local", + Type: "npm", + Module: "npm", + Upstream: stub.URL, + CacheTTL: config.Duration(ttl), + }, + }, + } + + app := newStrategyTestApp(t, cfg) + + doRequest := func() *http.Response { + req := httptest.NewRequest(http.MethodGet, "http://ttl.npm.local/lodash", nil) + req.Host = "ttl.npm.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test error: %v", err) + } + return resp + } + + resp := doRequest() + if resp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + if hit := resp.Header.Get("X-Any-Hub-Cache-Hit"); hit != "false" { + t.Fatalf("first request should be miss, got %s", hit) + } + resp.Body.Close() + + resp2 := doRequest() + if hit := resp2.Header.Get("X-Any-Hub-Cache-Hit"); hit != "true" { + t.Fatalf("second request should hit cache before TTL, got %s", hit) + } + resp2.Body.Close() + + if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 0 { + t.Fatalf("expected no HEAD before TTL expiry, got %d", headCount) + } + if getCount := countRequests(stub.Requests(), http.MethodGet, "/lodash"); getCount != 1 { + t.Fatalf("upstream should be hit once before TTL expiry, got %d", getCount) + } + + time.Sleep(ttl * 2) + + resp3 := doRequest() + if hit := resp3.Header.Get("X-Any-Hub-Cache-Hit"); hit != "true" { + body, _ := io.ReadAll(resp3.Body) + resp3.Body.Close() + t.Fatalf("expected cached response after HEAD revalidation, got %s body=%s", hit, string(body)) + } + resp3.Body.Close() + + if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 1 { + t.Fatalf("expected single HEAD after TTL expiry, got %d", headCount) + } + if getCount := countRequests(stub.Requests(), http.MethodGet, "/lodash"); getCount != 1 { + t.Fatalf("upstream GET count should remain 1, got %d", getCount) + } + }) + + t.Run("validation disabled falls back to refetch", func(t *testing.T) { + stub := newUpstreamStub(t, upstreamNPM) + defer stub.Close() + + storageDir := t.TempDir() + ttl := 25 * time.Millisecond + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 6200, + CacheTTL: config.Duration(time.Second), + StoragePath: storageDir, + }, + Hubs: []config.HubConfig{ + { + Name: "npm-novalidation", + Domain: "novalidation.npm.local", + Type: "npm", + Module: "npm", + Upstream: stub.URL, + CacheTTL: config.Duration(ttl), + ValidationMode: string(hubmodule.ValidationModeNever), + }, + }, + } + + app := newStrategyTestApp(t, cfg) + + doRequest := func() *http.Response { + req := httptest.NewRequest(http.MethodGet, "http://novalidation.npm.local/lodash", nil) + req.Host = "novalidation.npm.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test error: %v", err) + } + return resp + } + + first := doRequest() + if first.Header.Get("X-Any-Hub-Cache-Hit") != "false" { + t.Fatalf("expected miss on first request") + } + first.Body.Close() + + time.Sleep(ttl * 2) + + second := doRequest() + if second.Header.Get("X-Any-Hub-Cache-Hit") != "false" { + body, _ := io.ReadAll(second.Body) + second.Body.Close() + t.Fatalf("expected cache miss when validation disabled, got hit body=%s", string(body)) + } + second.Body.Close() + + if headCount := countRequests(stub.Requests(), http.MethodHead, "/lodash"); headCount != 0 { + t.Fatalf("validation mode never should avoid HEAD, got %d", headCount) + } + if getCount := countRequests(stub.Requests(), http.MethodGet, "/lodash"); getCount != 2 { + t.Fatalf("expected two upstream GETs due to forced refetch, got %d", getCount) + } + }) +} + +func newStrategyTestApp(t *testing.T, cfg *config.Config) *fiber.App { + t.Helper() + + registry, err := server.NewHubRegistry(cfg) + if err != nil { + t.Fatalf("registry error: %v", err) + } + + logger := logrus.New() + logger.SetOutput(io.Discard) + + store, err := cache.NewStore(cfg.Global.StoragePath) + if err != nil { + t.Fatalf("store error: %v", err) + } + + client := server.NewUpstreamClient(cfg) + handler := proxy.NewHandler(client, logger, store) + app, err := server.NewApp(server.AppOptions{ + Logger: logger, + Registry: registry, + Proxy: handler, + ListenPort: cfg.Global.ListenPort, + }) + if err != nil { + t.Fatalf("app error: %v", err) + } + return app +} + +func countRequests(reqs []RecordedRequest, method, path string) int { + count := 0 + for _, req := range reqs { + if req.Method == method && req.Path == path { + count++ + } + } + return count +} diff --git a/tests/integration/legacy_adapter_toggle_test.go b/tests/integration/legacy_adapter_toggle_test.go new file mode 100644 index 0000000..198e8e1 --- /dev/null +++ b/tests/integration/legacy_adapter_toggle_test.go @@ -0,0 +1,118 @@ +package integration + +import ( + "io" + "net/http/httptest" + "testing" + "time" + + "github.com/gofiber/fiber/v3" + "github.com/sirupsen/logrus" + + "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/hubmodule/legacy" + "github.com/any-hub/any-hub/internal/server" +) + +func TestLegacyAdapterRolloutToggle(t *testing.T) { + const moduleKey = "rollout-toggle-test" + _ = hubmodule.Register(hubmodule.ModuleMetadata{Key: moduleKey}) + + logger := logrus.New() + logger.SetOutput(io.Discard) + + baseHub := config.HubConfig{ + Name: "dual-mode", + Domain: "dual.local", + Type: "docker", + Upstream: "https://registry.npmjs.org", + Module: moduleKey, + } + + testCases := []struct { + name string + rolloutFlag string + expectKey string + expectFlag legacy.RolloutFlag + }{ + { + name: "force legacy", + rolloutFlag: "legacy-only", + expectKey: hubmodule.DefaultModuleKey(), + expectFlag: legacy.RolloutLegacyOnly, + }, + { + name: "dual mode", + rolloutFlag: "dual", + expectKey: moduleKey, + expectFlag: legacy.RolloutDual, + }, + { + name: "full modular", + rolloutFlag: "modular", + expectKey: moduleKey, + expectFlag: legacy.RolloutModular, + }, + { + name: "rollback to legacy", + rolloutFlag: "legacy-only", + expectKey: hubmodule.DefaultModuleKey(), + expectFlag: legacy.RolloutLegacyOnly, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 6100, + CacheTTL: config.Duration(time.Minute), + }, + Hubs: []config.HubConfig{ + func() config.HubConfig { + h := baseHub + h.Rollout = tc.rolloutFlag + return h + }(), + }, + } + + registry, err := server.NewHubRegistry(cfg) + if err != nil { + t.Fatalf("failed to build registry: %v", err) + } + + recorder := &routeRecorder{} + app := mustNewApp(t, cfg.Global.ListenPort, logger, registry, recorder) + + req := httptest.NewRequest("GET", "http://dual.local/v2/", nil) + req.Host = "dual.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("request failed: %v", err) + } + if resp.StatusCode != fiber.StatusNoContent { + t.Fatalf("unexpected status: %d", resp.StatusCode) + } + + if recorder.moduleKey != tc.expectKey { + t.Fatalf("expected module %s, got %s", tc.expectKey, recorder.moduleKey) + } + if recorder.rolloutFlag != tc.expectFlag { + t.Fatalf("expected rollout flag %s, got %s", tc.expectFlag, recorder.rolloutFlag) + } + }) + } +} + +type routeRecorder struct { + moduleKey string + rolloutFlag legacy.RolloutFlag +} + +func (r *routeRecorder) Handle(c fiber.Ctx, route *server.HubRoute) error { + r.moduleKey = route.ModuleKey + r.rolloutFlag = route.RolloutFlag + return c.SendStatus(fiber.StatusNoContent) +} diff --git a/tests/integration/module_diagnostics_test.go b/tests/integration/module_diagnostics_test.go new file mode 100644 index 0000000..89b9aa3 --- /dev/null +++ b/tests/integration/module_diagnostics_test.go @@ -0,0 +1,154 @@ +package integration + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/gofiber/fiber/v3" + "github.com/sirupsen/logrus" + + "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/server" + "github.com/any-hub/any-hub/internal/server/routes" +) + +func TestModuleDiagnosticsEndpoints(t *testing.T) { + const moduleKey = "diagnostics-test" + _ = hubmodule.Register(hubmodule.ModuleMetadata{ + Key: moduleKey, + Description: "diagnostics test module", + MigrationState: hubmodule.MigrationStateBeta, + SupportedProtocols: []string{ + "npm", + }, + }) + + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 6200, + CacheTTL: config.Duration(30 * time.Minute), + }, + Hubs: []config.HubConfig{ + { + Name: "legacy-hub", + Domain: "legacy.local", + Type: "docker", + Upstream: "https://registry-1.docker.io", + }, + { + Name: "modern-hub", + Domain: "modern.local", + Type: "npm", + Upstream: "https://registry.npmjs.org", + Module: moduleKey, + Rollout: "dual", + }, + }, + } + + registry, err := server.NewHubRegistry(cfg) + if err != nil { + t.Fatalf("failed to build registry: %v", err) + } + + logger := logrus.New() + logger.SetOutput(io.Discard) + + app := mustNewApp(t, cfg.Global.ListenPort, logger, registry, server.ProxyHandlerFunc(func(c fiber.Ctx, _ *server.HubRoute) error { + return c.SendStatus(fiber.StatusNoContent) + })) + routes.RegisterModuleRoutes(app, registry) + + t.Run("list modules and hubs", func(t *testing.T) { + resp := doRequest(t, app, "GET", "/-/modules") + if resp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + var payload struct { + Modules []map[string]any `json:"modules"` + Hubs []struct { + HubName string `json:"hub_name"` + ModuleKey string `json:"module_key"` + Rollout string `json:"rollout_flag"` + Domain string `json:"domain"` + Port int `json:"port"` + } `json:"hubs"` + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if err := json.Unmarshal(body, &payload); err != nil { + t.Fatalf("failed to decode response: %v\nbody: %s", err, string(body)) + } + if len(payload.Modules) == 0 { + t.Fatalf("expected module metadata entries") + } + found := false + for _, module := range payload.Modules { + if module["key"] == moduleKey { + found = true + break + } + } + if !found { + t.Fatalf("expected module %s in diagnostics payload", moduleKey) + } + if len(payload.Hubs) != 2 { + t.Fatalf("expected 2 hubs, got %d", len(payload.Hubs)) + } + for _, hub := range payload.Hubs { + switch hub.HubName { + case "legacy-hub": + if hub.ModuleKey != hubmodule.DefaultModuleKey() { + t.Fatalf("legacy hub should expose legacy module, got %s", hub.ModuleKey) + } + case "modern-hub": + if hub.ModuleKey != moduleKey { + t.Fatalf("modern hub should expose %s, got %s", moduleKey, hub.ModuleKey) + } + if hub.Rollout != "dual" { + t.Fatalf("modern hub rollout flag should be dual, got %s", hub.Rollout) + } + default: + t.Fatalf("unexpected hub %s", hub.HubName) + } + } + }) + + t.Run("inspect module by key", func(t *testing.T) { + resp := doRequest(t, app, "GET", "/-/modules/"+moduleKey) + if resp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + var module map[string]any + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if err := json.Unmarshal(body, &module); err != nil { + t.Fatalf("module inspect decode failed: %v", err) + } + if module["key"] != moduleKey { + t.Fatalf("expected module key %s, got %v", moduleKey, module["key"]) + } + }) + + t.Run("unknown module returns 404", func(t *testing.T) { + resp := doRequest(t, app, "GET", "/-/modules/missing-module") + if resp.StatusCode != fiber.StatusNotFound { + t.Fatalf("expected 404, got %d", resp.StatusCode) + } + }) +} + +func doRequest(t *testing.T, app *fiber.App, method, url string) *http.Response { + t.Helper() + req := httptest.NewRequest(method, url, nil) + resp, err := app.Test(req) + if err != nil { + t.Fatalf("request %s %s failed: %v", method, url, err) + } + return resp +} diff --git a/tests/integration/module_routing_test.go b/tests/integration/module_routing_test.go index eaf834d..503394c 100644 --- a/tests/integration/module_routing_test.go +++ b/tests/integration/module_routing_test.go @@ -96,10 +96,12 @@ func mustNewApp(t *testing.T, port int, logger *logrus.Logger, registry *server. type moduleRecorder struct { routeName string moduleKey string + rollout string } func (p *moduleRecorder) Handle(c fiber.Ctx, route *server.HubRoute) error { p.routeName = route.Config.Name p.moduleKey = route.ModuleKey + p.rollout = string(route.RolloutFlag) return c.SendStatus(fiber.StatusNoContent) }