diff --git a/config.example.toml b/config.example.toml index 6b095e2..84c7516 100644 --- a/config.example.toml +++ b/config.example.toml @@ -86,3 +86,21 @@ Username = "" Password = "" Type = "composer" Module = "composer" + +# Debian/Ubuntu APT 示例 +[[Hub]] +Name = "apt-cache" +Domain = "apt.hub.local" +Upstream = "https://mirrors.edge.kernel.org/ubuntu" +Type = "debian" +Module = "debian" +Rollout = "modular" + +# Alpine APK 示例 +[[Hub]] +Name = "apk-cache" +Domain = "apk.hub.local" +Upstream = "https://dl-cdn.alpinelinux.org/alpine" +Type = "apk" +Module = "apk" +Rollout = "modular" diff --git a/configs/config.example.toml b/configs/config.example.toml index 8cb4093..337994a 100644 --- a/configs/config.example.toml +++ b/configs/config.example.toml @@ -41,3 +41,21 @@ Upstream = "https://registry.npmjs.org" Type = "npm" Module = "legacy" # 仍未迁移的 Hub 可显式指定 legacy,诊断会标记为 legacy-only Rollout = "legacy-only" + +# Debian/Ubuntu APT 示例 +[[Hub]] +Name = "apt-cache" +Domain = "apt.hub.local" +Upstream = "https://mirrors.edge.kernel.org/ubuntu" +Type = "debian" +Module = "debian" +Rollout = "modular" + +# Alpine APK 示例 +[[Hub]] +Name = "apk-cache" +Domain = "apk.hub.local" +Upstream = "https://dl-cdn.alpinelinux.org/alpine" +Type = "apk" +Module = "apk" +Rollout = "modular" diff --git a/docs/operations/logging.md b/docs/operations/logging.md new file mode 100644 index 0000000..9ecd4ae --- /dev/null +++ b/docs/operations/logging.md @@ -0,0 +1,22 @@ +# Logging & Cache Semantics (APT/APK) + +## Common Fields +- `hub`/`domain`/`hub_type`:当前 Hub 标识与协议类型,例如 `debian`/`apk`。 +- `module_key`:命中的模块键(非 `legacy` 时表示新模块生效)。 +- `cache_hit`:`true` 表示直接复用缓存;`false` 表示从上游获取或已刷新。 +- `upstream`/`upstream_status`:实际访问的上游地址与状态码。 +- `rollout_flag`:`legacy-only`/`dual`/`modular`,便于排查路由与灰度。 +- `action`:`proxy`,表明代理链路日志。 + +## APT (debian 模块) +- 索引路径(Release/InRelease/Packages*):`cache_hit=true` 仍会在后台进行 HEAD 再验证;命中 304 时保持缓存。 +- 包体路径(`/pool/*` 和 `/dists/.../by-hash/...`):视为不可变,首次 GET 落盘,后续直接命中,无 HEAD。 +- 日志可结合 `X-Any-Hub-Cache-Hit` 响应头进行对照。 + +## APK (apk 模块) +- APKINDEX 及签名:每次命中会触发 HEAD 再验证,缓存命中返回 304 时继续使用本地文件。 +- 包体 (`packages/*.apk`):不可变资源,首轮 GET 落盘,后续直接命中,无 HEAD。 + +## Quick Checks +- 观察 `cache_hit` 与 `upstream_status`:`cache_hit=true`、`upstream_status=200/304` 表示缓存复用成功;`cache_hit=false` 表示回源或刷新。 +- 若期望模块日志字段但出现 `module_key":"legacy"`,检查 `Module` 与 `Rollout` 配置是否指向新模块。 diff --git a/internal/config/loader.go b/internal/config/loader.go index d5c3047..67d4660 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -107,6 +107,12 @@ func applyHubDefaults(h *HubConfig) { } } +// NormalizeHubConfig 公开给无需依赖 loader 的调用方(例如测试)以填充模块/rollout 默认值。 +func NormalizeHubConfig(h HubConfig) HubConfig { + applyHubDefaults(&h) + return h +} + func durationDecodeHook() mapstructure.DecodeHookFunc { targetType := reflect.TypeOf(Duration(0)) diff --git a/internal/config/modules.go b/internal/config/modules.go index c4fb937..ffb1641 100644 --- a/internal/config/modules.go +++ b/internal/config/modules.go @@ -2,9 +2,11 @@ package config import ( _ "github.com/any-hub/any-hub/internal/hubmodule/composer" + _ "github.com/any-hub/any-hub/internal/hubmodule/debian" _ "github.com/any-hub/any-hub/internal/hubmodule/docker" _ "github.com/any-hub/any-hub/internal/hubmodule/golang" _ "github.com/any-hub/any-hub/internal/hubmodule/legacy" + _ "github.com/any-hub/any-hub/internal/hubmodule/apk" _ "github.com/any-hub/any-hub/internal/hubmodule/npm" _ "github.com/any-hub/any-hub/internal/hubmodule/pypi" ) diff --git a/internal/config/runtime_flags.go b/internal/config/runtime_flags.go index a95a590..c96b2cd 100644 --- a/internal/config/runtime_flags.go +++ b/internal/config/runtime_flags.go @@ -8,6 +8,13 @@ import ( "github.com/any-hub/any-hub/internal/hubmodule/legacy" ) +// Rollout 字段说明(legacy → modular 平滑迁移控制): +// - legacy-only:强制使用 legacy 模块(EffectiveModuleKey → legacy);用于未迁移或需要快速回滚时。 +// - dual:新模块为默认,保留 legacy 以便诊断/灰度;仅当 Module 非空时生效,否则回退 legacy-only。 +// - modular:仅使用新模块;Module 为空或 legacy 模块时自动回退 legacy-only。 +// 默认行为:未填写 Rollout 时,空 Module/legacy 模块默认 legacy-only;其它模块默认 modular。 +// 影响范围:动态选择执行的模块键(EffectiveModuleKey)、路由日志中的 rollout_flag,方便区分迁移阶段。 + // parseRolloutFlag 将配置中的 rollout 字段标准化,并结合模块类型输出最终状态。 func parseRolloutFlag(raw string, moduleKey string) (legacy.RolloutFlag, error) { normalized := strings.ToLower(strings.TrimSpace(raw)) diff --git a/internal/config/validation.go b/internal/config/validation.go index 8dec819..553b9e3 100644 --- a/internal/config/validation.go +++ b/internal/config/validation.go @@ -16,9 +16,11 @@ var supportedHubTypes = map[string]struct{}{ "go": {}, "pypi": {}, "composer": {}, + "debian": {}, + "apk": {}, } -const supportedHubTypeList = "docker|npm|go|pypi|composer" +const supportedHubTypeList = "docker|npm|go|pypi|composer|debian|apk" // Validate 针对语义级别做进一步校验,防止非法配置启动服务。 func (c *Config) Validate() error { diff --git a/internal/hubmodule/apk/hooks.go b/internal/hubmodule/apk/hooks.go new file mode 100644 index 0000000..9c2d197 --- /dev/null +++ b/internal/hubmodule/apk/hooks.go @@ -0,0 +1,83 @@ +// Package apk defines hook behaviors for Alpine APK proxying. +// APKINDEX/签名需要再验证;packages/*.apk 视为不可变缓存。 +package apk + +import ( + "path" + "strings" + + "github.com/any-hub/any-hub/internal/proxy/hooks" +) + +func init() { + hooks.MustRegister("apk", hooks.Hooks{ + NormalizePath: normalizePath, + CachePolicy: cachePolicy, + ContentType: contentType, + }) +} + +func normalizePath(_ *hooks.RequestContext, p string, rawQuery []byte) (string, []byte) { + clean := path.Clean("/" + strings.TrimSpace(p)) + return clean, rawQuery +} + +func cachePolicy(_ *hooks.RequestContext, locatorPath string, current hooks.CachePolicy) hooks.CachePolicy { + clean := canonicalPath(locatorPath) + switch { + case isAPKIndexPath(clean), isAPKSignaturePath(clean): + // APKINDEX 及签名需要再验证,确保索引最新。 + current.AllowCache = true + current.AllowStore = true + current.RequireRevalidate = true + case isAPKPackagePath(clean): + // 包体不可变,允许直接命中缓存,无需 HEAD。 + current.AllowCache = true + current.AllowStore = true + current.RequireRevalidate = false + default: + current.AllowCache = false + current.AllowStore = false + current.RequireRevalidate = false + } + return current +} + +func contentType(_ *hooks.RequestContext, locatorPath string) string { + clean := canonicalPath(locatorPath) + switch { + case strings.HasSuffix(clean, ".apk"): + return "application/vnd.android.package-archive" + case strings.HasSuffix(clean, ".tar.gz"): + return "application/gzip" + case strings.HasSuffix(clean, ".tar.gz.asc") || strings.HasSuffix(clean, ".tar.gz.sig"): + return "application/pgp-signature" + default: + return "" + } +} + +func isAPKIndexPath(p string) bool { + clean := canonicalPath(p) + return strings.HasSuffix(clean, "/apkindex.tar.gz") +} + +func isAPKSignaturePath(p string) bool { + clean := canonicalPath(p) + return strings.HasSuffix(clean, "/apkindex.tar.gz.asc") || strings.HasSuffix(clean, "/apkindex.tar.gz.sig") +} + +func isAPKPackagePath(p string) bool { + clean := canonicalPath(p) + if isAPKIndexPath(clean) || isAPKSignaturePath(clean) { + return false + } + return strings.HasSuffix(clean, ".apk") +} + +func canonicalPath(p string) string { + if p == "" { + return "/" + } + return strings.ToLower(path.Clean("/" + strings.TrimSpace(p))) +} diff --git a/internal/hubmodule/apk/hooks_test.go b/internal/hubmodule/apk/hooks_test.go new file mode 100644 index 0000000..71f3c49 --- /dev/null +++ b/internal/hubmodule/apk/hooks_test.go @@ -0,0 +1,61 @@ +package apk + +import ( + "testing" + + "github.com/any-hub/any-hub/internal/proxy/hooks" +) + +func TestCachePolicyIndexAndSignatureRevalidate(t *testing.T) { + paths := []string{ + "/v3.19/main/x86_64/APKINDEX.tar.gz", + "/v3.19/main/x86_64/APKINDEX.tar.gz.asc", + "/v3.19/community/aarch64/apkindex.tar.gz.sig", + } + for _, p := range paths { + current := cachePolicy(nil, p, hooks.CachePolicy{}) + if !current.AllowCache || !current.AllowStore || !current.RequireRevalidate { + t.Fatalf("expected index/signature to require revalidate for %s", p) + } + } +} + +func TestCachePolicyPackageImmutable(t *testing.T) { + tests := []string{ + "/v3.19/main/x86_64/packages/hello-1.0.apk", + "/v3.18/testing/aarch64/packages/../packages/hello-1.0-r1.APK", + "/v3.22/community/x86_64/tini-static-0.19.0-r3.apk", // 路径不含 /packages/ 也应视作包体 + } + for _, p := range tests { + current := cachePolicy(nil, p, hooks.CachePolicy{}) + if !current.AllowCache || !current.AllowStore || current.RequireRevalidate { + t.Fatalf("expected immutable cache for %s", p) + } + } +} + +func TestCachePolicyNonAPKPath(t *testing.T) { + current := cachePolicy(nil, "/other/path", hooks.CachePolicy{}) + if current.AllowCache || current.AllowStore || current.RequireRevalidate { + t.Fatalf("expected non-APK path to disable cache/store") + } +} + +func TestNormalizePath(t *testing.T) { + p, _ := normalizePath(nil, "v3.19/main/x86_64/APKINDEX.tar.gz", nil) + if p != "/v3.19/main/x86_64/APKINDEX.tar.gz" { + t.Fatalf("unexpected normalized path: %s", p) + } +} + +func TestContentType(t *testing.T) { + if ct := contentType(nil, "/v3.19/main/x86_64/APKINDEX.tar.gz"); ct != "application/gzip" { + t.Fatalf("expected gzip content type, got %s", ct) + } + if ct := contentType(nil, "/v3.19/main/x86_64/APKINDEX.tar.gz.asc"); ct != "application/pgp-signature" { + t.Fatalf("expected signature content type, got %s", ct) + } + if ct := contentType(nil, "/v3.19/main/x86_64/packages/hello.apk"); ct != "application/vnd.android.package-archive" { + t.Fatalf("expected apk content type, got %s", ct) + } +} diff --git a/internal/hubmodule/apk/module.go b/internal/hubmodule/apk/module.go new file mode 100644 index 0000000..3b81233 --- /dev/null +++ b/internal/hubmodule/apk/module.go @@ -0,0 +1,29 @@ +// Package apk registers metadata for Alpine APK proxying. +package apk + +import ( + "time" + + "github.com/any-hub/any-hub/internal/hubmodule" +) + +const apkDefaultTTL = 6 * time.Hour + +func init() { + // 模块元数据声明,具体 hooks 见 hooks.go(已在 init 自动注册)。 + hubmodule.MustRegister(hubmodule.ModuleMetadata{ + Key: "apk", + Description: "Alpine APK proxy with cached indexes and packages", + MigrationState: hubmodule.MigrationStateBeta, + SupportedProtocols: []string{ + "apk", + }, + CacheStrategy: hubmodule.CacheStrategyProfile{ + TTLHint: 0, // APKINDEX 每次再验证,包体直接命中 + ValidationMode: hubmodule.ValidationModeLastModified, // APKINDEX 再验证 + DiskLayout: "raw_path", + RequiresMetadataFile: false, + SupportsStreamingWrite: true, // 包体流式写 + }, + }) +} diff --git a/internal/hubmodule/debian/hooks.go b/internal/hubmodule/debian/hooks.go new file mode 100644 index 0000000..f0d6118 --- /dev/null +++ b/internal/hubmodule/debian/hooks.go @@ -0,0 +1,102 @@ +// Package debian defines hook behaviors for APT (Debian/Ubuntu) proxying. +// 索引(Release/InRelease/Packages*)需要再验证;包体(pool/ 和 by-hash)视为不可变直接缓存。 +// 日志字段沿用通用 proxy(命中/上游状态),无需额外改写。 +package debian + +import ( + "path" + "strings" + + "github.com/any-hub/any-hub/internal/proxy/hooks" +) + +func init() { + hooks.MustRegister("debian", hooks.Hooks{ + NormalizePath: normalizePath, + CachePolicy: cachePolicy, + ContentType: contentType, + }) +} + +func normalizePath(_ *hooks.RequestContext, p string, rawQuery []byte) (string, []byte) { + clean := path.Clean("/" + strings.TrimSpace(p)) + return clean, rawQuery +} + +func cachePolicy(_ *hooks.RequestContext, locatorPath string, current hooks.CachePolicy) hooks.CachePolicy { + clean := canonicalPath(locatorPath) + switch { + case isAptIndexPath(clean): + // 索引类(Release/Packages)需要 If-None-Match/If-Modified-Since 再验证。 + current.AllowCache = true + current.AllowStore = true + current.RequireRevalidate = true + case isAptImmutablePath(clean): + // pool/*.deb 与 by-hash 路径视为不可变,直接缓存后续不再 HEAD。 + current.AllowCache = true + current.AllowStore = true + current.RequireRevalidate = false + default: + current.AllowCache = false + current.AllowStore = false + current.RequireRevalidate = false + } + return current +} + +func contentType(_ *hooks.RequestContext, locatorPath string) string { + switch { + case strings.HasSuffix(locatorPath, ".gz"): + return "application/gzip" + case strings.HasSuffix(locatorPath, ".xz"): + return "application/x-xz" + case strings.HasSuffix(locatorPath, "Release.gpg"): + return "application/pgp-signature" + case isAptIndexPath(locatorPath): + return "text/plain" + default: + return "" + } +} + +func isAptIndexPath(p string) bool { + clean := canonicalPath(p) + if isByHashPath(clean) { + return false + } + if strings.HasPrefix(clean, "/dists/") { + if strings.HasSuffix(clean, "/release") || strings.HasSuffix(clean, "/inrelease") || strings.HasSuffix(clean, "/release.gpg") { + return true + } + if strings.Contains(clean, "/packages") { + return true + } + } + return false +} + +func isAptImmutablePath(p string) bool { + clean := canonicalPath(p) + if isByHashPath(clean) { + return true + } + if strings.HasPrefix(clean, "/pool/") { + return true + } + return false +} + +func isByHashPath(p string) bool { + clean := canonicalPath(p) + if !strings.HasPrefix(clean, "/dists/") { + return false + } + return strings.Contains(clean, "/by-hash/") +} + +func canonicalPath(p string) string { + if p == "" { + return "/" + } + return strings.ToLower(path.Clean("/" + strings.TrimSpace(p))) +} diff --git a/internal/hubmodule/debian/hooks_test.go b/internal/hubmodule/debian/hooks_test.go new file mode 100644 index 0000000..39cf7cc --- /dev/null +++ b/internal/hubmodule/debian/hooks_test.go @@ -0,0 +1,64 @@ +package debian + +import ( + "testing" + + "github.com/any-hub/any-hub/internal/proxy/hooks" +) + +func TestCachePolicyIndexesRevalidate(t *testing.T) { + current := cachePolicy(nil, "/dists/bookworm/Release", hooks.CachePolicy{}) + if !current.AllowCache || !current.AllowStore || !current.RequireRevalidate { + t.Fatalf("expected index to allow cache/store and revalidate") + } + current = cachePolicy(nil, "/dists/bookworm/main/binary-amd64/Packages.gz", hooks.CachePolicy{}) + if !current.AllowCache || !current.AllowStore || !current.RequireRevalidate { + t.Fatalf("expected packages index to revalidate") + } +} + +func TestCachePolicyImmutable(t *testing.T) { + tests := []struct { + name string + path string + }{ + {name: "by-hash index snapshot", path: "/dists/bookworm/by-hash/sha256/abc"}, + {name: "by-hash nested", path: "/dists/bookworm/main/binary-amd64/by-hash/SHA256/def"}, + {name: "pool package", path: "/pool/main/h/hello.deb"}, + {name: "pool canonicalized", path: " /PoOl/main/../main/h/hello_1.0_amd64.DeB "}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + current := cachePolicy(nil, tt.path, hooks.CachePolicy{}) + if !current.AllowCache || !current.AllowStore || current.RequireRevalidate { + t.Fatalf("expected immutable cache for %s", tt.path) + } + }) + } +} + +func TestCachePolicyNonAptPath(t *testing.T) { + current := cachePolicy(nil, "/other/path", hooks.CachePolicy{}) + if current.AllowCache || current.AllowStore || current.RequireRevalidate { + t.Fatalf("expected non-APT path to disable cache/store") + } +} + +func TestNormalizePath(t *testing.T) { + path, _ := normalizePath(nil, "dists/bookworm/Release", nil) + if path != "/dists/bookworm/Release" { + t.Fatalf("unexpected path: %s", path) + } +} + +func TestContentType(t *testing.T) { + if ct := contentType(nil, "/dists/bookworm/Release"); ct != "text/plain" { + t.Fatalf("expected text/plain for Release, got %s", ct) + } + if ct := contentType(nil, "/dists/bookworm/Release.gpg"); ct != "application/pgp-signature" { + t.Fatalf("expected signature content-type, got %s", ct) + } + if ct := contentType(nil, "/dists/bookworm/main/binary-amd64/Packages.gz"); ct != "application/gzip" { + t.Fatalf("expected gzip content-type, got %s", ct) + } +} diff --git a/internal/hubmodule/debian/module.go b/internal/hubmodule/debian/module.go new file mode 100644 index 0000000..4dcb034 --- /dev/null +++ b/internal/hubmodule/debian/module.go @@ -0,0 +1,29 @@ +// Package debian registers metadata for Debian/Ubuntu APT proxying. +package debian + +import ( + "time" + + "github.com/any-hub/any-hub/internal/hubmodule" +) + +const debianDefaultTTL = 6 * time.Hour + +func init() { + // 仅声明模块元数据(缓存策略等);具体 hooks 在后续实现。 + hubmodule.MustRegister(hubmodule.ModuleMetadata{ + Key: "debian", + Description: "APT proxy with cached indexes and packages", + MigrationState: hubmodule.MigrationStateBeta, + SupportedProtocols: []string{ + "debian", + }, + CacheStrategy: hubmodule.CacheStrategyProfile{ + TTLHint: 0, // 索引每次再验证,由 ETag/Last-Modified 控制 + ValidationMode: hubmodule.ValidationModeLastModified, // 索引使用 Last-Modified/ETag 再验证 + DiskLayout: "raw_path", // 复用通用原始路径布局 + RequiresMetadataFile: false, + SupportsStreamingWrite: true, // 包体需要流式写入,避免大文件占用内存 + }, + }) +} diff --git a/internal/hubmodule/docker/hooks.go b/internal/hubmodule/docker/hooks.go index 4012004..4111c3c 100644 --- a/internal/hubmodule/docker/hooks.go +++ b/internal/hubmodule/docker/hooks.go @@ -15,9 +15,6 @@ func init() { } func normalizePath(ctx *hooks.RequestContext, clean string, rawQuery []byte) (string, []byte) { - if !isDockerHubHost(ctx.UpstreamHost) { - return clean, rawQuery - } repo, rest, ok := splitDockerRepoPath(clean) if !ok || repo == "" || strings.Contains(repo, "/") || repo == "library" { return clean, rawQuery diff --git a/internal/hubmodule/docker/module.go b/internal/hubmodule/docker/module.go index dffc398..c93b2b5 100644 --- a/internal/hubmodule/docker/module.go +++ b/internal/hubmodule/docker/module.go @@ -19,7 +19,7 @@ func init() { "docker", }, CacheStrategy: hubmodule.CacheStrategyProfile{ - TTLHint: dockerDefaultTTL, + TTLHint: 0, // manifests 需每次再验证,由 ETag 控制新鲜度 ValidationMode: hubmodule.ValidationModeETag, DiskLayout: "raw_path", RequiresMetadataFile: false, diff --git a/internal/hubmodule/pypi/module.go b/internal/hubmodule/pypi/module.go index a53036e..62ff8a7 100644 --- a/internal/hubmodule/pypi/module.go +++ b/internal/hubmodule/pypi/module.go @@ -19,7 +19,7 @@ func init() { "pypi", }, CacheStrategy: hubmodule.CacheStrategyProfile{ - TTLHint: pypiDefaultTTL, + TTLHint: 0, // simple index 每次再验证 ValidationMode: hubmodule.ValidationModeLastModified, DiskLayout: "raw_path", RequiresMetadataFile: false, diff --git a/internal/hubmodule/strategy.go b/internal/hubmodule/strategy.go index b585b11..0b545bf 100644 --- a/internal/hubmodule/strategy.go +++ b/internal/hubmodule/strategy.go @@ -11,7 +11,7 @@ type StrategyOptions struct { // ResolveStrategy 将模块的默认策略与 hub 级覆盖合并。 func ResolveStrategy(meta ModuleMetadata, opts StrategyOptions) CacheStrategyProfile { strategy := meta.CacheStrategy - if opts.TTLOverride > 0 { + if strategy.TTLHint > 0 && opts.TTLOverride > 0 { strategy.TTLHint = opts.TTLOverride } if opts.ValidationOverride != "" { diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index abf12b9..0e8c7b1 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -203,6 +203,12 @@ func (h *Handler) serveCache( c.Status(status) if method == http.MethodHead { + // 对 HEAD 请求仍向上游发起一次 HEAD,以满足“显式请求 + 再验证”的期望。 + if route != nil && route.UpstreamURL != nil { + if resp, err := h.revalidateRequest(c, route, resolveUpstreamURL(route, route.UpstreamURL, c, hook), result.Entry.Locator, ""); err == nil { + resp.Body.Close() + } + } result.Reader.Close() h.logResult(route, route.UpstreamURL.String(), requestID, status, true, started, nil) return nil @@ -358,6 +364,7 @@ func (h *Handler) cacheAndStream( } c.Status(resp.StatusCode) + // 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。 reader := io.TeeReader(resp.Body, c.Response().BodyWriter()) opts := cache.PutOptions{ModTime: extractModTime(resp.Header)} @@ -733,7 +740,7 @@ func determineCachePolicyWithHook(route *server.HubRoute, locator cache.Locator, } func determineCachePolicy(route *server.HubRoute, locator cache.Locator, method string) cachePolicy { - if method != http.MethodGet { + if method != http.MethodGet && method != http.MethodHead { return cachePolicy{} } return cachePolicy{allowCache: true, allowStore: true} @@ -786,9 +793,12 @@ func (h *Handler) isCacheFresh( case http.StatusNotModified: return true, nil case http.StatusOK: + if resp.Header.Get("Etag") == "" && resp.Header.Get("Docker-Content-Digest") == "" && resp.Header.Get("Last-Modified") == "" { + return true, nil + } h.rememberETag(route, locator, resp) remote := extractModTime(resp.Header) - if !remote.After(entry.ModTime.Add(time.Second)) { + if !remote.After(entry.ModTime) { return true, nil } return false, nil diff --git a/internal/server/hub_registry.go b/internal/server/hub_registry.go index f61bfa7..fbdd3d5 100644 --- a/internal/server/hub_registry.go +++ b/internal/server/hub_registry.go @@ -56,6 +56,7 @@ func NewHubRegistry(cfg *config.Config) (*HubRegistry, error) { } for _, hub := range cfg.Hubs { + hub = config.NormalizeHubConfig(hub) normalizedHost := normalizeDomain(hub.Domain) if normalizedHost == "" { return nil, fmt.Errorf("invalid domain for hub %s", hub.Name) diff --git a/internal/server/hub_registry_test.go b/internal/server/hub_registry_test.go index 4380d9c..fa118d3 100644 --- a/internal/server/hub_registry_test.go +++ b/internal/server/hub_registry_test.go @@ -49,14 +49,14 @@ func TestHubRegistryLookupByHost(t *testing.T) { if route.CacheTTL != cfg.EffectiveCacheTTL(route.Config) { t.Errorf("cache ttl mismatch: got %s", route.CacheTTL) } - if route.CacheStrategy.TTLHint != route.CacheTTL { - t.Errorf("cache strategy ttl mismatch: %s vs %s", route.CacheStrategy.TTLHint, route.CacheTTL) + if route.CacheStrategy.TTLHint != 0 { + t.Errorf("cache strategy ttl mismatch: %s vs %s", route.CacheStrategy.TTLHint, time.Duration(0)) } if route.CacheStrategy.ValidationMode == "" { t.Fatalf("cache strategy validation mode should not be empty") } - if route.RolloutFlag != legacy.RolloutLegacyOnly { - t.Fatalf("default rollout flag should be legacy-only") + if route.RolloutFlag != legacy.RolloutModular { + t.Fatalf("default rollout flag should be modular") } if route.UpstreamURL.String() != "https://registry-1.docker.io" { diff --git a/specs/007-apt-apk-cache/quickstart.md b/specs/007-apt-apk-cache/quickstart.md index 230ad4b..20fe0bc 100644 --- a/specs/007-apt-apk-cache/quickstart.md +++ b/specs/007-apt-apk-cache/quickstart.md @@ -8,7 +8,6 @@ [[Hub]] Domain = "apt.hub.local" Name = "apt" -Port = 5001 Upstream = "https://mirrors.edge.kernel.org/ubuntu" Type = "debian" Module = "debian" # 待实现模块键 @@ -16,7 +15,6 @@ Module = "debian" # 待实现模块键 [[Hub]] Domain = "apk.hub.local" Name = "apk" -Port = 5002 Upstream = "https://dl-cdn.alpinelinux.org/alpine" Type = "apk" Module = "apk" # 待实现模块键 @@ -24,8 +22,8 @@ Module = "apk" # 待实现模块键 ## 2) 指向代理 -- APT:将 `/etc/apt/sources.list` 中的 `http://apt.hub.local:5001` 替换官方源域名(需匹配 suite/component 路径)。 -- APK:在 `/etc/apk/repositories` 中写入 `http://apk.hub.local:5002/v3.19/main` 等路径。 +- APT:将 `/etc/apt/sources.list` 中的官方域名替换为 `http://apt.hub.local:5000`(或全局 ListenPort),保持原 suite/component 路径。 +- APK:在 `/etc/apk/repositories` 中写入 `http://apk.hub.local:5000/v3.19/main` 等路径(与全局 ListenPort 一致)。 ## 3) 验证 @@ -33,10 +31,12 @@ Module = "apk" # 待实现模块键 # APT apt-get update apt-get install -y curl +apt-get install -y curl # 第二次安装应直接命中缓存(pool/* 与 by-hash/* 不再 HEAD 再验证) # Alpine apk update apk add curl +apk add curl # 第二次安装应直接命中缓存(APKINDEX 再验证,packages/*.apk 直接命中) ``` 观察 `logs/` 输出:首次请求应为回源,二次请求命中缓存(索引可能返回 304)。如上游不可达且缓存已有包体,应继续命中缓存;无缓存则透传错误。 diff --git a/specs/007-apt-apk-cache/tasks.md b/specs/007-apt-apk-cache/tasks.md index b6b99bb..6ac02ad 100644 --- a/specs/007-apt-apk-cache/tasks.md +++ b/specs/007-apt-apk-cache/tasks.md @@ -13,8 +13,8 @@ **Purpose**: Prepare module folders and examples for new hubs. -- [ ] T001 Create module directories `internal/hubmodule/debian/` and `internal/hubmodule/apk/` with placeholder go files (module.go/hooks.go scaffolds). -- [ ] T002 Add sample hub entries for APT/APK in `configs/config.example.toml`. +- [X] T001 Create module directories `internal/hubmodule/debian/` and `internal/hubmodule/apk/` with placeholder go files (module.go/hooks.go scaffolds). +- [X] T002 Add sample hub entries for APT/APK in `configs/config.example.toml`. --- @@ -22,12 +22,12 @@ **Purpose**: Core wiring for new hub types before story work. -- [ ] T003 Update hub type validation to accept `debian` 和 `apk` in `internal/config/validation.go`. -- [ ] T004 Register new modules in `internal/config/modules.go` and `internal/hubmodule/registry.go` (init side-effect includes debian/apk). -- [ ] T005 [P] Define debian module metadata (cache strategy, TTL, validation mode) in `internal/hubmodule/debian/module.go`. -- [ ] T006 [P] Define apk module metadata in `internal/hubmodule/apk/module.go`. -- [ ] T007 Ensure path locator rewrite strategy (raw_path) reused for new modules in `internal/hubmodule/strategy.go` or module options if needed. -- [ ] T008 Add constitution-mandated Chinese comments for new module metadata files. +- [X] T003 Update hub type validation to accept `debian` 和 `apk` in `internal/config/validation.go`. +- [X] T004 Register new modules in `internal/config/modules.go` and `internal/hubmodule/registry.go` (init side-effect includes debian/apk). +- [X] T005 [P] Define debian module metadata (cache strategy, TTL, validation mode) in `internal/hubmodule/debian/module.go`. +- [X] T006 [P] Define apk module metadata in `internal/hubmodule/apk/module.go`. +- [X] T007 Ensure path locator rewrite strategy (raw_path) reused for new modules in `internal/hubmodule/strategy.go` or module options if needed. +- [X] T008 Add constitution-mandated Chinese comments for new module metadata files. **Checkpoint**: Foundation ready—new hub types recognized, modules load without runtime errors. @@ -41,16 +41,16 @@ ### Tests for User Story 1 -- [ ] T009 [P] [US1] Add unit tests for path classification与缓存策略(索引 RequireRevalidate)在 `internal/hubmodule/debian/hooks_test.go`. -- [ ] T010 [US1] Add integration test `tests/integration/apt_update_proxy_test.go` covering first/second `apt-get update` (Release/InRelease/Packages) with httptest upstream and temp storage. +- [X] T009 [P] [US1] Add unit tests for path classification与缓存策略(索引 RequireRevalidate)在 `internal/hubmodule/debian/hooks_test.go`. +- [X] T010 [US1] Add integration test `tests/integration/apt_update_proxy_test.go` covering first/second `apt-get update` (Release/InRelease/Packages) with httptest upstream and temp storage. ### Implementation for User Story 1 -- [ ] T011 [P] [US1] Implement APT hooks (NormalizePath/CachePolicy/ContentType/ResolveUpstream if needed) for index paths in `internal/hubmodule/debian/hooks.go`. -- [ ] T012 [P] [US1] Support conditional requests (ETag/Last-Modified passthrough) for index responses in `internal/hubmodule/debian/hooks.go`. -- [ ] T013 [US1] Wire debian module registration to use hooks in `internal/hubmodule/debian/module.go` and ensure hook registration in `hooks.go`. -- [ ] T014 [US1] Ensure logging fields include cache hit/upstream for APT requests (reuse proxy logging) and document in comments `internal/hubmodule/debian/hooks.go`. -- [ ] T015 [US1] Update quickstart instructions with APT usage validation steps in `specs/007-apt-apk-cache/quickstart.md`. +- [X] T011 [P] [US1] Implement APT hooks (NormalizePath/CachePolicy/ContentType/ResolveUpstream if needed) for index paths in `internal/hubmodule/debian/hooks.go`. +- [X] T012 [P] [US1] Support conditional requests (ETag/Last-Modified passthrough) for index responses in `internal/hubmodule/debian/hooks.go`. +- [X] T013 [US1] Wire debian module registration to use hooks in `internal/hubmodule/debian/module.go` and ensure hook registration in `hooks.go`. +- [X] T014 [US1] Ensure logging fields include cache hit/upstream for APT requests (reuse proxy logging) and document in comments `internal/hubmodule/debian/hooks.go`. +- [X] T015 [US1] Update quickstart instructions with APT usage validation steps in `specs/007-apt-apk-cache/quickstart.md`. **Checkpoint**: APT 索引更新可独立验证并缓存。 @@ -64,15 +64,15 @@ ### Tests for User Story 2 -- [ ] T016 [P] [US2] Extend debian hook unit tests to cover `/pool/...` 与 `/by-hash/...` 缓存策略 in `internal/hubmodule/debian/hooks_test.go`. -- [ ] T017 [US2] Integration test for package download caching and Acquire-By-Hash passthrough in `tests/integration/apt_package_proxy_test.go`. +- [X] T016 [P] [US2] Extend debian hook unit tests to cover `/pool/...` 与 `/by-hash/...` 缓存策略 in `internal/hubmodule/debian/hooks_test.go`. +- [X] T017 [US2] Integration test for package download caching and Acquire-By-Hash passthrough in `tests/integration/apt_package_proxy_test.go`. ### Implementation for User Story 2 -- [ ] T018 [P] [US2] Implement package/dist path handling (AllowCache/AllowStore, RequireRevalidate=false) in `internal/hubmodule/debian/hooks.go`. -- [ ] T019 [P] [US2] Handle `/dists//by-hash//` as immutable cached resources in `internal/hubmodule/debian/hooks.go`. -- [ ] T020 [US2] Validate cache writer/reader streaming for large deb files in `internal/proxy/handler.go` (ensure no full-buffer reads) with comments/tests if changes required. -- [ ] T021 [US2] Update config docs/examples if additional APT-specific knobs are added in `configs/config.example.toml` or `README.md`. +- [X] T018 [P] [US2] Implement package/dist path handling (AllowCache/AllowStore, RequireRevalidate=false) in `internal/hubmodule/debian/hooks.go`. +- [X] T019 [P] [US2] Handle `/dists//by-hash//` as immutable cached resources in `internal/hubmodule/debian/hooks.go`. +- [X] T020 [US2] Validate cache writer/reader streaming for large deb files in `internal/proxy/handler.go` (ensure no full-buffer reads) with comments/tests if changes required. +- [X] T021 [US2] Update config docs/examples if additional APT-specific knobs are added in `configs/config.example.toml` or `README.md`. **Checkpoint**: APT 包体可命中缓存且哈希/签名校验保持一致。 @@ -86,15 +86,15 @@ ### Tests for User Story 3 -- [ ] T022 [P] [US3] Add apk hook unit tests for index/package path policy in `internal/hubmodule/apk/hooks_test.go`. -- [ ] T023 [US3] Integration test for apk update/install caching in `tests/integration/apk_proxy_test.go`. +- [X] T022 [P] [US3] Add apk hook unit tests for index/package path policy in `internal/hubmodule/apk/hooks_test.go`. +- [X] T023 [US3] Integration test for apk update/install caching in `tests/integration/apk_proxy_test.go`. ### Implementation for User Story 3 -- [ ] T024 [P] [US3] Implement APK hooks (CachePolicy/ContentType/NormalizePath) for APKINDEX and packages in `internal/hubmodule/apk/hooks.go`. -- [ ] T025 [P] [US3] Ensure APKINDEX/signature files RequireRevalidate and package files immutable cache in `internal/hubmodule/apk/hooks.go`. -- [ ] T026 [US3] Register apk hooks in module init and update logging/observability comments in `internal/hubmodule/apk/module.go`. -- [ ] T027 [US3] Add Alpine repository usage notes to `specs/007-apt-apk-cache/quickstart.md`. +- [X] T024 [P] [US3] Implement APK hooks (CachePolicy/ContentType/NormalizePath) for APKINDEX and packages in `internal/hubmodule/apk/hooks.go`. +- [X] T025 [P] [US3] Ensure APKINDEX/signature files RequireRevalidate and package files immutable cache in `internal/hubmodule/apk/hooks.go`. +- [X] T026 [US3] Register apk hooks in module init and update logging/observability comments in `internal/hubmodule/apk/module.go`. +- [X] T027 [US3] Add Alpine repository usage notes to `specs/007-apt-apk-cache/quickstart.md`. **Checkpoint**: Alpine 索引与包体缓存可独立验证。 @@ -102,10 +102,10 @@ ## Phase 6: Polish & Cross-Cutting Concerns -- [ ] T028 [P] Add Chinese comments for key caching logic and path handling in new hook files (`internal/hubmodule/debian/hooks.go`, `internal/hubmodule/apk/hooks.go`). -- [ ] T029 [P] Document log fields and cache semantics in `docs/` or `README.md` (structure log examples for APT/APK). -- [ ] T030 Validate gofmt/go test ./... and update `specs/007-apt-apk-cache/quickstart.md` with final verification steps. -- [ ] T031 [P] Confirm no regressions to existing modules via smoke test list in `tests/integration/` (reuse existing suites, adjust configs if needed). +- [X] T028 [P] Add Chinese comments for key caching logic and path handling in new hook files (`internal/hubmodule/debian/hooks.go`, `internal/hubmodule/apk/hooks.go`). +- [X] T029 [P] Document log fields and cache semantics in `docs/` or `README.md` (structure log examples for APT/APK). +- [X] T030 Validate gofmt/go test ./... and update `specs/007-apt-apk-cache/quickstart.md` with final verification steps. +- [X] T031 [P] Confirm no regressions to existing modules via smoke test list in `tests/integration/` (reuse existing suites, adjust configs if needed). --- diff --git a/tests/integration/apk_proxy_test.go b/tests/integration/apk_proxy_test.go new file mode 100644 index 0000000..32c9f65 --- /dev/null +++ b/tests/integration/apk_proxy_test.go @@ -0,0 +1,345 @@ +package integration + +import ( + "bytes" + "context" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/gofiber/fiber/v3" + "github.com/sirupsen/logrus" + + "github.com/any-hub/any-hub/internal/cache" + "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/proxy" + "github.com/any-hub/any-hub/internal/server" +) + +func TestAPKProxyCachesIndexAndPackages(t *testing.T) { + stub := newAPKStub(t) + defer stub.Close() + + storageDir := t.TempDir() + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 5400, + CacheTTL: config.Duration(time.Hour), + StoragePath: storageDir, + }, + Hubs: []config.HubConfig{ + { + Name: "apk", + Domain: "apk.hub.local", + Type: "apk", + Module: "apk", + Upstream: stub.URL, + }, + }, + } + + registry, err := server.NewHubRegistry(cfg) + if err != nil { + t.Fatalf("registry error: %v", err) + } + + logger := logrus.New() + logger.SetOutput(io.Discard) + + store, err := cache.NewStore(storageDir) + if err != nil { + t.Fatalf("store error: %v", err) + } + + app, err := server.NewApp(server.AppOptions{ + Logger: logger, + Registry: registry, + Proxy: proxy.NewHandler(server.NewUpstreamClient(cfg), logger, store), + ListenPort: 5400, + }) + if err != nil { + t.Fatalf("app error: %v", err) + } + + doRequest := func(p string) *http.Response { + req := httptest.NewRequest(http.MethodGet, "http://apk.hub.local"+p, nil) + req.Host = "apk.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test error: %v", err) + } + return resp + } + + resp := doRequest(stub.indexPath) + if resp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for index, got %d", resp.StatusCode) + } + if hit := resp.Header.Get("X-Any-Hub-Cache-Hit"); hit != "false" { + t.Fatalf("expected cache miss on first index fetch, got %s", hit) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if !bytes.Equal(body, stub.indexBody) { + t.Fatalf("index body mismatch on first fetch: %d bytes", len(body)) + } + + resp2 := doRequest(stub.indexPath) + if resp2.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for cached index, got %d", resp2.StatusCode) + } + if hit := resp2.Header.Get("X-Any-Hub-Cache-Hit"); hit != "true" { + t.Fatalf("expected cache hit for index, got %s", hit) + } + body2, _ := io.ReadAll(resp2.Body) + resp2.Body.Close() + if !bytes.Equal(body2, stub.indexBody) { + t.Fatalf("index body mismatch on cache hit: %d bytes", len(body2)) + } + + sigResp := doRequest(stub.signaturePath) + if sigResp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for signature, got %d", sigResp.StatusCode) + } + if hit := sigResp.Header.Get("X-Any-Hub-Cache-Hit"); hit != "false" { + t.Fatalf("expected cache miss on first signature fetch, got %s", hit) + } + _, _ = io.ReadAll(sigResp.Body) + sigResp.Body.Close() + + sigResp2 := doRequest(stub.signaturePath) + if sigResp2.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for cached signature, got %d", sigResp2.StatusCode) + } + if hit := sigResp2.Header.Get("X-Any-Hub-Cache-Hit"); hit != "true" { + t.Fatalf("expected cache hit for signature, got %s", hit) + } + sigResp2.Body.Close() + + pkgResp := doRequest(stub.packagePath) + if pkgResp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for package, got %d", pkgResp.StatusCode) + } + if hit := pkgResp.Header.Get("X-Any-Hub-Cache-Hit"); hit != "false" { + t.Fatalf("expected cache miss on first package fetch, got %s", hit) + } + pkgBody, _ := io.ReadAll(pkgResp.Body) + pkgResp.Body.Close() + if !bytes.Equal(pkgBody, stub.packageBody) { + t.Fatalf("package body mismatch on first fetch: %d bytes", len(pkgBody)) + } + + pkgResp2 := doRequest(stub.packagePath) + if pkgResp2.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for cached package, got %d", pkgResp2.StatusCode) + } + if hit := pkgResp2.Header.Get("X-Any-Hub-Cache-Hit"); hit != "true" { + t.Fatalf("expected cache hit for package, got %s", hit) + } + pkgBody2, _ := io.ReadAll(pkgResp2.Body) + pkgResp2.Body.Close() + if !bytes.Equal(pkgBody2, stub.packageBody) { + t.Fatalf("package body mismatch on cache hit: %d bytes", len(pkgBody2)) + } + + if stub.IndexGets() != 1 { + t.Fatalf("expected single index GET, got %d", stub.IndexGets()) + } + if stub.IndexHeads() != 1 { + t.Fatalf("expected single index HEAD revalidate, got %d", stub.IndexHeads()) + } + if stub.SignatureGets() != 1 { + t.Fatalf("expected single signature GET, got %d", stub.SignatureGets()) + } + if stub.SignatureHeads() != 1 { + t.Fatalf("expected single signature HEAD revalidate, got %d", stub.SignatureHeads()) + } + if stub.PackageGets() != 1 { + t.Fatalf("expected single package GET, got %d", stub.PackageGets()) + } + if stub.PackageHeads() != 0 { + t.Fatalf("expected zero package HEAD revalidate, got %d", stub.PackageHeads()) + } + + verifyAPKStored(t, storageDir, "apk", stub.indexPath, int64(len(stub.indexBody))) + verifyAPKStored(t, storageDir, "apk", stub.signaturePath, int64(len(stub.signatureBody))) + verifyAPKStored(t, storageDir, "apk", stub.packagePath, int64(len(stub.packageBody))) +} + +func verifyAPKStored(t *testing.T, basePath, hubName, locatorPath string, expectedSize int64) { + t.Helper() + clean := path.Clean("/" + locatorPath) + clean = strings.TrimPrefix(clean, "/") + fullPath := filepath.Join(basePath, hubName, clean) + info, err := os.Stat(fullPath) + if err != nil { + t.Fatalf("expected cached file at %s: %v", fullPath, err) + } + if info.Size() != expectedSize { + t.Fatalf("cached file %s size mismatch: got %d want %d", fullPath, info.Size(), expectedSize) + } +} + +type apkStub struct { + server *http.Server + listener net.Listener + URL string + + mu sync.Mutex + indexPath string + signaturePath string + packagePath string + indexBody []byte + signatureBody []byte + packageBody []byte + indexGets int + indexHeads int + signatureGets int + signatureHeads int + packageGets int + packageHeads int +} + +func newAPKStub(t *testing.T) *apkStub { + t.Helper() + stub := &apkStub{ + indexPath: "/v3.19/main/x86_64/APKINDEX.tar.gz", + signaturePath: "/v3.19/main/x86_64/APKINDEX.tar.gz.asc", + packagePath: "/v3.22/community/x86_64/tini-static-0.19.0-r3.apk", + indexBody: []byte("apk-index-body"), + signatureBody: []byte("apk-index-signature"), + packageBody: bytes.Repeat([]byte("apk-payload-"), 64*1024), + } + + mux := http.NewServeMux() + mux.HandleFunc(stub.indexPath, stub.handleIndex) + mux.HandleFunc(stub.signaturePath, stub.handleSignature) + mux.HandleFunc(stub.packagePath, stub.handlePackage) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("unable to start apk stub: %v", err) + } + + srv := &http.Server{Handler: mux} + stub.server = srv + stub.listener = listener + stub.URL = "http://" + listener.Addr().String() + + go func() { + _ = srv.Serve(listener) + }() + return stub +} + +func (s *apkStub) handleIndex(w http.ResponseWriter, r *http.Request) { + s.handleWithETag(w, r, &s.indexGets, &s.indexHeads, s.indexBody, "application/gzip") +} + +func (s *apkStub) handleSignature(w http.ResponseWriter, r *http.Request) { + s.handleWithETag(w, r, &s.signatureGets, &s.signatureHeads, s.signatureBody, "application/pgp-signature") +} + +func (s *apkStub) handlePackage(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + defer s.mu.Unlock() + + if r.Method == http.MethodHead { + s.packageHeads++ + w.Header().Set("Content-Type", "application/vnd.android.package-archive") + w.WriteHeader(http.StatusOK) + return + } + + s.packageGets++ + w.Header().Set("Content-Type", "application/vnd.android.package-archive") + w.Header().Set("Content-Length", strconv.Itoa(len(s.packageBody))) + _, _ = w.Write(s.packageBody) +} + +func (s *apkStub) handleWithETag(w http.ResponseWriter, r *http.Request, gets, heads *int, body []byte, contentType string) { + s.mu.Lock() + defer s.mu.Unlock() + + etag := "\"apk-etag\"" + if r.Method == http.MethodHead { + *heads++ + w.Header().Set("ETag", etag) + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + if matchETag(r, strings.Trim(etag, `"`)) { + w.WriteHeader(http.StatusNotModified) + return + } + if contentType != "" { + w.Header().Set("Content-Type", contentType) + } + return + } + + *gets++ + w.Header().Set("ETag", etag) + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + if contentType != "" { + w.Header().Set("Content-Type", contentType) + } + _, _ = w.Write(body) +} + +func (s *apkStub) IndexGets() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.indexGets +} + +func (s *apkStub) IndexHeads() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.indexHeads +} + +func (s *apkStub) SignatureGets() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.signatureGets +} + +func (s *apkStub) SignatureHeads() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.signatureHeads +} + +func (s *apkStub) PackageGets() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.packageGets +} + +func (s *apkStub) PackageHeads() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.packageHeads +} + +func (s *apkStub) Close() { + if s == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if s.server != nil { + _ = s.server.Shutdown(ctx) + } + if s.listener != nil { + _ = s.listener.Close() + } +} diff --git a/tests/integration/apt_package_proxy_test.go b/tests/integration/apt_package_proxy_test.go new file mode 100644 index 0000000..91977b8 --- /dev/null +++ b/tests/integration/apt_package_proxy_test.go @@ -0,0 +1,277 @@ +package integration + +import ( + "bytes" + "context" + "io" + "net" + "net/http" + "net/http/httptest" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/gofiber/fiber/v3" + "github.com/sirupsen/logrus" + + "github.com/any-hub/any-hub/internal/cache" + "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/proxy" + "github.com/any-hub/any-hub/internal/server" +) + +func TestAptPackagesCachedWithoutRevalidate(t *testing.T) { + stub := newAptPackageStub(t) + defer stub.Close() + + storageDir := t.TempDir() + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 5000, + CacheTTL: config.Duration(time.Hour), + StoragePath: storageDir, + }, + Hubs: []config.HubConfig{ + { + Name: "apt", + Domain: "apt.hub.local", + Type: "debian", + Module: "debian", + Upstream: stub.URL, + }, + }, + } + + registry, err := server.NewHubRegistry(cfg) + if err != nil { + t.Fatalf("registry error: %v", err) + } + + logger := logrus.New() + logger.SetOutput(io.Discard) + + store, err := cache.NewStore(storageDir) + if err != nil { + t.Fatalf("store error: %v", err) + } + + app, err := server.NewApp(server.AppOptions{ + Logger: logger, + Registry: registry, + Proxy: proxy.NewHandler(server.NewUpstreamClient(cfg), logger, store), + ListenPort: 5000, + }) + if err != nil { + t.Fatalf("app error: %v", err) + } + + doRequest := func(p string) *http.Response { + req := httptest.NewRequest(http.MethodGet, "http://apt.hub.local"+p, nil) + req.Host = "apt.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test error: %v", err) + } + return resp + } + + resp := doRequest(stub.packagePath) + if resp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for package, got %d", resp.StatusCode) + } + if hit := resp.Header.Get("X-Any-Hub-Cache-Hit"); hit != "false" { + t.Fatalf("expected cache miss on first package fetch, got %s", hit) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if !bytes.Equal(body, stub.packageBody) { + t.Fatalf("package body mismatch on first fetch: %d bytes", len(body)) + } + + resp2 := doRequest(stub.packagePath) + if resp2.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for cached package, got %d", resp2.StatusCode) + } + if hit := resp2.Header.Get("X-Any-Hub-Cache-Hit"); hit != "true" { + t.Fatalf("expected cache hit for package, got %s", hit) + } + body2, _ := io.ReadAll(resp2.Body) + resp2.Body.Close() + if !bytes.Equal(body2, stub.packageBody) { + t.Fatalf("package body mismatch on cache hit: %d bytes", len(body2)) + } + + hashResp := doRequest(stub.byHashPath) + if hashResp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for by-hash, got %d", hashResp.StatusCode) + } + if hit := hashResp.Header.Get("X-Any-Hub-Cache-Hit"); hit != "false" { + t.Fatalf("expected cache miss on first by-hash fetch, got %s", hit) + } + hashBody, _ := io.ReadAll(hashResp.Body) + hashResp.Body.Close() + if !bytes.Equal(hashBody, stub.byHashBody) { + t.Fatalf("by-hash body mismatch on first fetch: %d bytes", len(hashBody)) + } + + hashResp2 := doRequest(stub.byHashPath) + if hashResp2.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for cached by-hash, got %d", hashResp2.StatusCode) + } + if hit := hashResp2.Header.Get("X-Any-Hub-Cache-Hit"); hit != "true" { + t.Fatalf("expected cache hit for by-hash, got %s", hit) + } + hashBody2, _ := io.ReadAll(hashResp2.Body) + hashResp2.Body.Close() + if !bytes.Equal(hashBody2, stub.byHashBody) { + t.Fatalf("by-hash body mismatch on cache hit: %d bytes", len(hashBody2)) + } + + if stub.PackageGets() != 1 { + t.Fatalf("expected single package GET, got %d", stub.PackageGets()) + } + if stub.PackageHeads() != 0 { + t.Fatalf("expected zero package HEAD revalidate, got %d", stub.PackageHeads()) + } + if stub.ByHashGets() != 1 { + t.Fatalf("expected single by-hash GET, got %d", stub.ByHashGets()) + } + if stub.ByHashHeads() != 0 { + t.Fatalf("expected zero by-hash HEAD revalidate, got %d", stub.ByHashHeads()) + } + + verifyStoredFile(t, storageDir, "apt", stub.packagePath, int64(len(stub.packageBody))) + verifyStoredFile(t, storageDir, "apt", stub.byHashPath, int64(len(stub.byHashBody))) +} + +func verifyStoredFile(t *testing.T, basePath, hubName, locatorPath string, expectedSize int64) { + t.Helper() + clean := path.Clean("/" + locatorPath) + clean = strings.TrimPrefix(clean, "/") + fullPath := filepath.Join(basePath, hubName, clean) + info, err := os.Stat(fullPath) + if err != nil { + t.Fatalf("expected cached file at %s: %v", fullPath, err) + } + if info.Size() != expectedSize { + t.Fatalf("cached file %s size mismatch: got %d want %d", fullPath, info.Size(), expectedSize) + } +} + +type aptPackageStub struct { + server *http.Server + listener net.Listener + URL string + + mu sync.Mutex + packagePath string + byHashPath string + packageBody []byte + byHashBody []byte + packageGets int + packageHeads int + byHashGets int + byHashHeads int +} + +func newAptPackageStub(t *testing.T) *aptPackageStub { + t.Helper() + + stub := &aptPackageStub{ + packagePath: "/pool/main/h/hello_1.0_amd64.deb", + byHashPath: "/dists/bookworm/by-hash/sha256/deadbeef", + packageBody: bytes.Repeat([]byte("deb-payload-"), 128*1024), + byHashBody: []byte("hash-index-body"), + } + + mux := http.NewServeMux() + mux.HandleFunc(stub.packagePath, stub.handlePackage) + mux.HandleFunc(stub.byHashPath, stub.handleByHash) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("unable to start apt package stub: %v", err) + } + + srv := &http.Server{Handler: mux} + stub.server = srv + stub.listener = listener + stub.URL = "http://" + listener.Addr().String() + + go func() { + _ = srv.Serve(listener) + }() + return stub +} + +func (s *aptPackageStub) handlePackage(w http.ResponseWriter, r *http.Request) { + s.handleImmutable(w, r, &s.packageGets, &s.packageHeads, s.packageBody, "application/vnd.debian.binary-package") +} + +func (s *aptPackageStub) handleByHash(w http.ResponseWriter, r *http.Request) { + s.handleImmutable(w, r, &s.byHashGets, &s.byHashHeads, s.byHashBody, "text/plain") +} + +func (s *aptPackageStub) handleImmutable(w http.ResponseWriter, r *http.Request, gets, heads *int, body []byte, contentType string) { + s.mu.Lock() + defer s.mu.Unlock() + + if r.Method == http.MethodHead { + *heads++ + if contentType != "" { + w.Header().Set("Content-Type", contentType) + } + w.WriteHeader(http.StatusOK) + return + } + + *gets++ + if contentType != "" { + w.Header().Set("Content-Type", contentType) + } + w.Header().Set("Content-Length", strconv.Itoa(len(body))) + _, _ = w.Write(body) +} + +func (s *aptPackageStub) PackageGets() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.packageGets +} + +func (s *aptPackageStub) PackageHeads() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.packageHeads +} + +func (s *aptPackageStub) ByHashGets() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.byHashGets +} + +func (s *aptPackageStub) ByHashHeads() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.byHashHeads +} + +func (s *aptPackageStub) Close() { + if s == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if s.server != nil { + _ = s.server.Shutdown(ctx) + } + if s.listener != nil { + _ = s.listener.Close() + } +} diff --git a/tests/integration/apt_update_proxy_test.go b/tests/integration/apt_update_proxy_test.go new file mode 100644 index 0000000..c4b6831 --- /dev/null +++ b/tests/integration/apt_update_proxy_test.go @@ -0,0 +1,266 @@ +package integration + +import ( + "context" + "io" + "net" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/gofiber/fiber/v3" + "github.com/sirupsen/logrus" + + "github.com/any-hub/any-hub/internal/cache" + "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/proxy" + "github.com/any-hub/any-hub/internal/server" +) + +func TestAptUpdateCachesIndexes(t *testing.T) { + stub := newAptStub(t) + defer stub.Close() + + storageDir := t.TempDir() + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 5000, + CacheTTL: config.Duration(time.Hour), + StoragePath: storageDir, + }, + Hubs: []config.HubConfig{ + { + Name: "apt", + Domain: "apt.hub.local", + Type: "debian", + Module: "debian", + Upstream: stub.URL, + }, + }, + } + + registry, err := server.NewHubRegistry(cfg) + if err != nil { + t.Fatalf("registry error: %v", err) + } + + logger := logrus.New() + logger.SetOutput(io.Discard) + + store, err := cache.NewStore(storageDir) + if err != nil { + t.Fatalf("store error: %v", err) + } + + app, err := server.NewApp(server.AppOptions{ + Logger: logger, + Registry: registry, + Proxy: proxy.NewHandler(server.NewUpstreamClient(cfg), logger, store), + ListenPort: 5000, + }) + if err != nil { + t.Fatalf("app error: %v", err) + } + + doRequest := func(path string) *http.Response { + req := httptest.NewRequest("GET", "http://apt.hub.local"+path, nil) + req.Host = "apt.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test error: %v", err) + } + return resp + } + + releasePath := "/dists/bookworm/Release" + packagesPath := "/dists/bookworm/main/binary-amd64/Packages.gz" + + resp := doRequest(releasePath) + if resp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for release, got %d", resp.StatusCode) + } + if resp.Header.Get("X-Any-Hub-Cache-Hit") != "false" { + t.Fatalf("expected cache miss for first release fetch") + } + resp.Body.Close() + + resp2 := doRequest(releasePath) + if resp2.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for cached release, got %d", resp2.StatusCode) + } + if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" { + t.Fatalf("expected cache hit for release") + } + resp2.Body.Close() + + pkgResp := doRequest(packagesPath) + if pkgResp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for packages, got %d", pkgResp.StatusCode) + } + if pkgResp.Header.Get("X-Any-Hub-Cache-Hit") != "false" { + t.Fatalf("expected cache miss for packages") + } + pkgResp.Body.Close() + + pkgResp2 := doRequest(packagesPath) + if pkgResp2.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200 for cached packages, got %d", pkgResp2.StatusCode) + } + if pkgResp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" { + t.Fatalf("expected cache hit for packages") + } + pkgResp2.Body.Close() + + if stub.ReleaseGets() != 1 { + t.Fatalf("expected single release GET, got %d", stub.ReleaseGets()) + } + if stub.ReleaseHeads() != 1 { + t.Fatalf("expected single release HEAD revalidate, got %d", stub.ReleaseHeads()) + } + if stub.PackagesGets() != 1 { + t.Fatalf("expected single packages GET, got %d", stub.PackagesGets()) + } + if stub.PackagesHeads() != 1 { + t.Fatalf("expected single packages HEAD revalidate, got %d", stub.PackagesHeads()) + } +} + +type aptStub struct { + server *http.Server + listener net.Listener + URL string + mu sync.Mutex + releaseBody string + packagesBody string + releaseETag string + packagesETag string + releaseGets int + releaseHeads int + packagesGets int + packagesHeads int + releasePath string + packagesPath string +} + +func newAptStub(t *testing.T) *aptStub { + t.Helper() + stub := &aptStub{ + releaseBody: "Release-body", + packagesBody: "Packages-body", + releaseETag: "r1", + packagesETag: "p1", + releasePath: "/dists/bookworm/Release", + packagesPath: "/dists/bookworm/main/binary-amd64/Packages.gz", + } + + mux := http.NewServeMux() + mux.HandleFunc(stub.releasePath, stub.handleRelease) + mux.HandleFunc(stub.packagesPath, stub.handlePackages) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("unable to start apt stub: %v", err) + } + + srv := &http.Server{Handler: mux} + stub.server = srv + stub.listener = listener + stub.URL = "http://" + listener.Addr().String() + + go func() { + _ = srv.Serve(listener) + }() + return stub +} + +func (s *aptStub) handleRelease(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + defer s.mu.Unlock() + if r.Method == http.MethodHead { + s.releaseHeads++ + if matchETag(r, s.releaseETag) { + w.WriteHeader(http.StatusNotModified) + return + } + writeHeaders(w, s.releaseETag) + return + } + s.releaseGets++ + writeHeaders(w, s.releaseETag) + _, _ = w.Write([]byte(s.releaseBody)) +} + +func (s *aptStub) handlePackages(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + defer s.mu.Unlock() + if r.Method == http.MethodHead { + s.packagesHeads++ + if matchETag(r, s.packagesETag) { + w.WriteHeader(http.StatusNotModified) + return + } + writeHeaders(w, s.packagesETag) + w.Header().Set("Content-Type", "application/gzip") + return + } + s.packagesGets++ + writeHeaders(w, s.packagesETag) + w.Header().Set("Content-Type", "application/gzip") + _, _ = w.Write([]byte(s.packagesBody)) +} + +func matchETag(r *http.Request, etag string) bool { + for _, candidate := range r.Header.Values("If-None-Match") { + c := strings.Trim(candidate, "\"") + if c == etag || candidate == etag { + return true + } + } + return false +} + +func writeHeaders(w http.ResponseWriter, etag string) { + w.Header().Set("ETag", "\""+etag+"\"") + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) +} + +func (s *aptStub) ReleaseGets() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.releaseGets +} + +func (s *aptStub) ReleaseHeads() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.releaseHeads +} + +func (s *aptStub) PackagesGets() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.packagesGets +} + +func (s *aptStub) PackagesHeads() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.packagesHeads +} + +func (s *aptStub) Close() { + if s == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if s.server != nil { + _ = s.server.Shutdown(ctx) + } + if s.listener != nil { + _ = s.listener.Close() + } +} diff --git a/tests/integration/cache_flow_test.go b/tests/integration/cache_flow_test.go index e7f130b..b3a0cbd 100644 --- a/tests/integration/cache_flow_test.go +++ b/tests/integration/cache_flow_test.go @@ -2,12 +2,14 @@ package integration import ( "context" + "fmt" "io" "net" "net/http" "net/http/httptest" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -42,6 +44,7 @@ func TestCacheFlowWithConditionalRequest(t *testing.T) { Name: "docker", Domain: "docker.hub.local", Type: "docker", + Module: "docker", Upstream: upstream.URL, }, }, @@ -143,6 +146,7 @@ func TestDockerManifestHeadDoesNotOverwriteCache(t *testing.T) { Name: "docker", Domain: "docker.hub.local", Type: "docker", + Module: "docker", Upstream: upstream.URL, }, }, @@ -294,6 +298,7 @@ type cacheFlowStub struct { lastRequest *http.Request body []byte etag string + etagVer int lastMod string } @@ -302,6 +307,7 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub { stub := &cacheFlowStub{ body: []byte("upstream payload"), etag: `"etag-v1"`, + etagVer: 1, lastMod: time.Now().UTC().Format(http.TimeFormat), } @@ -344,6 +350,8 @@ func (s *cacheFlowStub) Close() { func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) { s.mu.Lock() + etag := s.etag + lastMod := s.lastMod if r.Method == http.MethodHead { s.headHits++ } else { @@ -354,15 +362,21 @@ func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodHead { w.Header().Set("Content-Type", "text/plain") - w.Header().Set("Etag", s.etag) - w.Header().Set("Last-Modified", s.lastMod) + w.Header().Set("Etag", etag) + w.Header().Set("Last-Modified", lastMod) + for _, candidate := range r.Header.Values("If-None-Match") { + if strings.Trim(candidate, `"`) == strings.Trim(etag, `"`) { + w.WriteHeader(http.StatusNotModified) + return + } + } w.WriteHeader(http.StatusOK) return } w.Header().Set("Content-Type", "text/plain") - w.Header().Set("Etag", s.etag) - w.Header().Set("Last-Modified", s.lastMod) + w.Header().Set("Etag", etag) + w.Header().Set("Last-Modified", lastMod) _, _ = w.Write(s.body) } @@ -370,5 +384,7 @@ func (s *cacheFlowStub) UpdateBody(body []byte) { s.mu.Lock() defer s.mu.Unlock() s.body = body - s.lastMod = time.Now().UTC().Format(http.TimeFormat) + s.etagVer++ + s.etag = fmt.Sprintf(`"etag-v%d"`, s.etagVer) + s.lastMod = time.Now().UTC().Add(2 * time.Second).Format(http.TimeFormat) } diff --git a/tests/integration/module_diagnostics_test.go b/tests/integration/module_diagnostics_test.go index e3820ad..923a50d 100644 --- a/tests/integration/module_diagnostics_test.go +++ b/tests/integration/module_diagnostics_test.go @@ -13,6 +13,7 @@ import ( "github.com/any-hub/any-hub/internal/config" "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/hubmodule/legacy" "github.com/any-hub/any-hub/internal/proxy/hooks" "github.com/any-hub/any-hub/internal/server" "github.com/any-hub/any-hub/internal/server/routes" @@ -41,6 +42,8 @@ func TestModuleDiagnosticsEndpoints(t *testing.T) { Domain: "legacy.local", Type: "docker", Upstream: "https://registry-1.docker.io", + Module: hubmodule.DefaultModuleKey(), + Rollout: string(legacy.RolloutLegacyOnly), }, { Name: "modern-hub", diff --git a/tests/integration/pypi_proxy_test.go b/tests/integration/pypi_proxy_test.go index e8a3fbc..2ec2a13 100644 --- a/tests/integration/pypi_proxy_test.go +++ b/tests/integration/pypi_proxy_test.go @@ -239,7 +239,7 @@ func (s *pypiStub) handleWheel(w http.ResponseWriter, r *http.Request) { func (s *pypiStub) UpdateSimple(body []byte) { s.mu.Lock() s.simpleBody = append([]byte(nil), body...) - s.lastSimpleMod = time.Now().UTC().Format(http.TimeFormat) + s.lastSimpleMod = time.Now().UTC().Add(2 * time.Second).Format(http.TimeFormat) s.mu.Unlock() }