From 442c27a26dc753790b6033ec892c287d31438364 Mon Sep 17 00:00:00 2001 From: Rogee Date: Mon, 23 Mar 2026 16:55:20 +0800 Subject: [PATCH] feat: add registry k8s manifest fallback --- internal/hubmodule/docker/hooks.go | 4 + internal/proxy/handler.go | 95 ++++++++- internal/proxy/handler_test.go | 55 ++++++ tests/integration/cache_flow_test.go | 281 +++++++++++++++++++++++++++ 4 files changed, 430 insertions(+), 5 deletions(-) create mode 100644 internal/proxy/handler_test.go diff --git a/internal/hubmodule/docker/hooks.go b/internal/hubmodule/docker/hooks.go index 277bfa0..8ef3eac 100644 --- a/internal/hubmodule/docker/hooks.go +++ b/internal/hubmodule/docker/hooks.go @@ -87,6 +87,10 @@ func manifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool return "/v2/" + repo + "/" + repo + rest, true } +func RegistryK8sManifestFallbackPath(ctx *hooks.RequestContext, clean string) (string, bool) { + return manifestFallbackPath(ctx, clean) +} + func splitDockerRepoPath(path string) (string, string, bool) { if !strings.HasPrefix(path, "/v2/") { return "", "", false diff --git a/internal/proxy/handler.go b/internal/proxy/handler.go index c6ad0b2..64584cc 100644 --- a/internal/proxy/handler.go +++ b/internal/proxy/handler.go @@ -22,6 +22,7 @@ import ( "github.com/any-hub/any-hub/internal/cache" "github.com/any-hub/any-hub/internal/hubmodule" + dockermodule "github.com/any-hub/any-hub/internal/hubmodule/docker" "github.com/any-hub/any-hub/internal/logging" "github.com/any-hub/any-hub/internal/proxy/hooks" "github.com/any-hub/any-hub/internal/server" @@ -218,7 +219,7 @@ func (h *Handler) serveCache( } if shouldRevalidate { - if resp, err := h.revalidateRequest(c, route, resolveUpstreamURL(route, route.UpstreamURL, c, hook), result.Entry.Locator, ""); err == nil { + if resp, err := h.revalidateRequest(c, route, effectiveRevalidateURL(route, c, result.Entry, hook), result.Entry.Locator, ""); err == nil { resp.Body.Close() } } @@ -259,6 +260,17 @@ func (h *Handler) fetchAndStream( h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err) return h.writeError(c, fiber.StatusBadGateway, "upstream_failed") } + effectiveUpstreamPath := "" + originalStatus := resp.StatusCode + originalPath := "" + if hook != nil { + originalPath = hook.clean + } + resp, upstreamURL, effectiveUpstreamPath, err = h.retryRegistryK8sManifestFallback(c, route, requestID, resp, upstreamURL, hook, originalStatus, originalPath) + if err != nil { + h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err) + return h.writeError(c, fiber.StatusBadGateway, "upstream_failed") + } if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil { if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil { resp = rewritten @@ -273,7 +285,7 @@ func (h *Handler) fetchAndStream( shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) && c.Method() == http.MethodGet - return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx) + return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx, effectiveUpstreamPath) } func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) { @@ -325,13 +337,14 @@ func (h *Handler) consumeUpstream( requestID string, started time.Time, ctx context.Context, + effectiveUpstreamPath string, ) error { upstreamURL := resp.Request.URL.String() method := c.Method() authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials() if shouldStore { - return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL) + return h.cacheAndStream(c, route, locator, resp, writer, requestID, started, ctx, upstreamURL, effectiveUpstreamPath) } copyResponseHeaders(c, resp.Header) @@ -369,6 +382,7 @@ func (h *Handler) cacheAndStream( started time.Time, ctx context.Context, upstreamURL string, + effectiveUpstreamPath string, ) error { copyResponseHeaders(c, resp.Header) c.Set("X-Any-Hub-Upstream", upstreamURL) @@ -381,7 +395,7 @@ func (h *Handler) cacheAndStream( // 使用 TeeReader 边向客户端回写边落盘,避免大文件在内存中完整缓冲。 reader := io.TeeReader(resp.Body, c.Response().BodyWriter()) - opts := cache.PutOptions{ModTime: extractModTime(resp.Header)} + opts := cache.PutOptions{ModTime: extractModTime(resp.Header), EffectiveUpstreamPath: effectiveUpstreamPath} entry, err := writer.Put(ctx, locator, reader, opts) h.logResult(route, upstreamURL, requestID, resp.StatusCode, false, started, err) if err != nil { @@ -774,7 +788,7 @@ func (h *Handler) isCacheFresh( ctx = context.Background() } - upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook) + upstreamURL := effectiveRevalidateURL(route, c, entry, hook) resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "") if err != nil { return false, err @@ -825,6 +839,77 @@ func (h *Handler) isCacheFresh( } } +func effectiveRevalidateURL(route *server.HubRoute, c fiber.Ctx, entry cache.Entry, hook *hookState) *url.URL { + if route == nil || route.UpstreamURL == nil || entry.EffectiveUpstreamPath == "" { + return resolveUpstreamURL(route, route.UpstreamURL, c, hook) + } + clone := *route.UpstreamURL + clone.Path = entry.EffectiveUpstreamPath + clone.RawPath = entry.EffectiveUpstreamPath + return &clone +} + +func (h *Handler) retryRegistryK8sManifestFallback( + c fiber.Ctx, + route *server.HubRoute, + requestID string, + resp *http.Response, + upstreamURL *url.URL, + hook *hookState, + originalStatus int, + originalPath string, +) (*http.Response, *url.URL, string, error) { + if resp == nil || resp.StatusCode != http.StatusNotFound || hook == nil || hook.ctx == nil { + return resp, upstreamURL, "", nil + } + fallbackPath, ok := dockermodule.RegistryK8sManifestFallbackPath(hook.ctx, hook.clean) + if !ok { + return resp, upstreamURL, "", nil + } + resp.Body.Close() + fallbackHook := *hook + fallbackHook.clean = fallbackPath + fallbackResp, fallbackURL, err := h.executeRequest(c, route, &fallbackHook) + if err != nil { + return nil, upstreamURL, "", err + } + fallbackResp, fallbackURL, err = h.retryOnAuthFailure(c, route, requestID, time.Now(), fallbackResp, fallbackURL, &fallbackHook) + if err != nil { + return nil, upstreamURL, "", err + } + h.logRegistryK8sFallback(route, requestID, originalPath, fallbackPath, originalStatus, c.Method()) + if fallbackResp.StatusCode == http.StatusOK { + return fallbackResp, fallbackURL, fallbackPath, nil + } + return fallbackResp, fallbackURL, "", nil +} + +func (h *Handler) logRegistryK8sFallback(route *server.HubRoute, requestID string, originalPath string, fallbackPath string, originalStatus int, method string) { + if route == nil || h == nil || h.logger == nil { + return + } + fields := logging.RequestFields( + route.Config.Name, + route.Config.Domain, + route.Config.Type, + route.Config.AuthMode(), + route.Module.Key, + false, + ) + fields["action"] = "proxy_fallback" + if route.UpstreamURL != nil { + fields["upstream_host"] = route.UpstreamURL.Hostname() + } + fields["original_path"] = originalPath + fields["fallback_path"] = fallbackPath + fields["original_status"] = originalStatus + fields["method"] = method + if requestID != "" { + fields["request_id"] = requestID + } + h.logger.WithFields(fields).Info("proxy_registry_k8s_fallback") +} + func (h *Handler) revalidateRequest( c fiber.Ctx, route *server.HubRoute, diff --git a/internal/proxy/handler_test.go b/internal/proxy/handler_test.go new file mode 100644 index 0000000..543a51f --- /dev/null +++ b/internal/proxy/handler_test.go @@ -0,0 +1,55 @@ +package proxy + +import ( + "bytes" + "net/url" + "strings" + "testing" + + "github.com/sirupsen/logrus" + + "github.com/any-hub/any-hub/internal/config" + "github.com/any-hub/any-hub/internal/hubmodule" + "github.com/any-hub/any-hub/internal/server" +) + +func TestRegistryK8sManifestFallbackLogsStructuredEvent(t *testing.T) { + buf := new(bytes.Buffer) + logger := logrus.New() + logger.SetOutput(buf) + logger.SetFormatter(&logrus.JSONFormatter{}) + + upstreamURL, err := url.Parse("http://registry.k8s.io") + if err != nil { + t.Fatalf("parse upstream url: %v", err) + } + + h := NewHandler(nil, logger, nil) + route := &server.HubRoute{ + Config: config.HubConfig{ + Name: "docker", + Domain: "k8s.hub.local", + Type: "docker", + }, + Module: hubmodule.ModuleMetadata{Key: "docker"}, + UpstreamURL: upstreamURL, + } + + h.logRegistryK8sFallback(route, "req-1", "/v2/coredns/manifests/v1.13.1", "/v2/coredns/coredns/manifests/v1.13.1", 404, "GET") + + output := buf.String() + for _, want := range []string{ + "proxy_registry_k8s_fallback", + `"hub":"docker"`, + `"domain":"k8s.hub.local"`, + `"upstream_host":"registry.k8s.io"`, + `"original_path":"/v2/coredns/manifests/v1.13.1"`, + `"fallback_path":"/v2/coredns/coredns/manifests/v1.13.1"`, + `"original_status":404`, + `"method":"GET"`, + } { + if !strings.Contains(output, want) { + t.Fatalf("expected log output to contain %s, got %s", want, output) + } + } +} diff --git a/tests/integration/cache_flow_test.go b/tests/integration/cache_flow_test.go index a89a8f9..b114008 100644 --- a/tests/integration/cache_flow_test.go +++ b/tests/integration/cache_flow_test.go @@ -26,6 +26,8 @@ import ( const ( dockerManifestPath = "/v2/library/cache-flow/manifests/latest" dockerManifestNoNamespacePath = "/v2/cache-flow/manifests/latest" + registryK8sOriginalManifest = "/v2/coredns/manifests/v1.13.1" + registryK8sFallbackManifest = "/v2/coredns/coredns/manifests/v1.13.1" ) func TestCacheFlowWithConditionalRequest(t *testing.T) { @@ -286,6 +288,140 @@ func TestDockerNonDockerHubUpstreamKeepsOriginalPath(t *testing.T) { } } +func TestRegistryK8sManifestFallbackRetry(t *testing.T) { + stub := newRegistryK8sStub(t) + stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest")) + defer stub.Close() + + app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL) + + req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil) + req.Host = "k8s.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test failed: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != fiber.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("expected 200, got %d (body=%s)", resp.StatusCode, string(body)) + } + if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 { + t.Fatalf("expected single original manifest hit") + } + if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 1 { + t.Fatalf("expected single fallback manifest hit") + } +} + +func TestRegistryK8sManifestFallbackNotAttemptedWhenOriginalSucceeds(t *testing.T) { + stub := newRegistryK8sStub(t) + stub.setResponse(registryK8sOriginalManifest, http.StatusOK, []byte("original manifest")) + defer stub.Close() + + app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL) + + req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil) + req.Host = "k8s.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test failed: %v", err) + } + resp.Body.Close() + + if resp.StatusCode != fiber.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 { + t.Fatalf("expected no fallback hit") + } +} + +func TestRegistryK8sManifestFallbackNotAttemptedForNonRegistryHost(t *testing.T) { + stub := newRegistryK8sStub(t) + stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest")) + defer stub.Close() + + app := newDockerProxyTestApp(t, t.TempDir(), "mirror.hub.local", stub.URL) + + req := httptest.NewRequest(http.MethodGet, "http://mirror.hub.local"+registryK8sOriginalManifest, nil) + req.Host = "mirror.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test failed: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != fiber.StatusNotFound { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("expected 404, got %d (body=%s)", resp.StatusCode, string(body)) + } + if stub.pathHits(registryK8sFallbackManifest, http.MethodGet) != 0 { + t.Fatalf("expected no fallback hit for non-registry host") + } +} + +func TestRegistryK8sManifestFallbackSecondRequestHitsCache(t *testing.T) { + stub := newRegistryK8sStub(t) + stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest")) + defer stub.Close() + + app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL) + + makeRequest := func() *http.Response { + req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil) + req.Host = "k8s.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test failed: %v", err) + } + return resp + } + + resp1 := makeRequest() + resp1.Body.Close() + resp2 := makeRequest() + defer resp2.Body.Close() + + if resp2.Header.Get("X-Any-Hub-Cache-Hit") != "true" { + t.Fatalf("expected second request to hit cache") + } + if stub.pathHits(registryK8sOriginalManifest, http.MethodGet) != 1 { + t.Fatalf("expected original 404 path to be requested once, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodGet)) + } +} + +func TestRegistryK8sFallbackCacheRevalidatesEffectivePath(t *testing.T) { + stub := newRegistryK8sStub(t) + stub.setResponse(registryK8sFallbackManifest, http.StatusOK, []byte("fallback manifest")) + defer stub.Close() + + app := newDockerProxyTestApp(t, t.TempDir(), "k8s.hub.local", "http://registry.k8s.io", stub.URL) + + makeRequest := func() *http.Response { + req := httptest.NewRequest(http.MethodGet, "http://k8s.hub.local"+registryK8sOriginalManifest, nil) + req.Host = "k8s.hub.local" + resp, err := app.Test(req) + if err != nil { + t.Fatalf("app.Test failed: %v", err) + } + return resp + } + + resp1 := makeRequest() + resp1.Body.Close() + resp2 := makeRequest() + resp2.Body.Close() + + if stub.pathHits(registryK8sOriginalManifest, http.MethodHead) != 0 { + t.Fatalf("expected no HEAD revalidation against original path, got %d", stub.pathHits(registryK8sOriginalManifest, http.MethodHead)) + } + if stub.pathHits(registryK8sFallbackManifest, http.MethodHead) == 0 { + t.Fatalf("expected HEAD revalidation against fallback path") + } +} + type cacheFlowStub struct { server *http.Server listener net.Listener @@ -300,6 +436,22 @@ type cacheFlowStub struct { lastMod string } +type registryK8sStub struct { + server *http.Server + listener net.Listener + URL string + + mu sync.Mutex + responses map[string]registryK8sResponse + hits map[string]int +} + +type registryK8sResponse struct { + status int + body []byte + etag string +} + func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub { t.Helper() stub := &cacheFlowStub{ @@ -335,6 +487,32 @@ func newCacheFlowStub(t *testing.T, paths ...string) *cacheFlowStub { return stub } +func newRegistryK8sStub(t *testing.T) *registryK8sStub { + t.Helper() + stub := ®istryK8sStub{ + responses: make(map[string]registryK8sResponse), + hits: make(map[string]int), + } + mux := http.NewServeMux() + mux.HandleFunc("/", stub.handle) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Skipf("unable to start registry.k8s.io stub listener: %v", err) + } + + server := &http.Server{Handler: mux} + stub.server = server + stub.listener = listener + stub.URL = "http://" + listener.Addr().String() + + go func() { + _ = server.Serve(listener) + }() + + return stub +} + func (s *cacheFlowStub) Close() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -346,6 +524,109 @@ func (s *cacheFlowStub) Close() { } } +func (s *registryK8sStub) Close() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if s.server != nil { + _ = s.server.Shutdown(ctx) + } + if s.listener != nil { + _ = s.listener.Close() + } +} + +func (s *registryK8sStub) setResponse(path string, status int, body []byte) { + s.mu.Lock() + defer s.mu.Unlock() + s.responses[path] = registryK8sResponse{ + status: status, + body: append([]byte(nil), body...), + etag: fmt.Sprintf(`"%s-etag"`, strings.ReplaceAll(strings.Trim(path, "/"), "/", "-")), + } +} + +func (s *registryK8sStub) pathHits(path string, method string) int { + s.mu.Lock() + defer s.mu.Unlock() + return s.hits[method+" "+path] +} + +func (s *registryK8sStub) handle(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + s.hits[r.Method+" "+r.URL.Path]++ + resp, ok := s.responses[r.URL.Path] + s.mu.Unlock() + + if !ok { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"errors":[{"code":"MANIFEST_UNKNOWN","message":"manifest unknown"}]}`)) + return + } + w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json") + w.Header().Set("Etag", resp.etag) + w.Header().Set("Last-Modified", time.Now().UTC().Format(http.TimeFormat)) + if r.Method == http.MethodHead { + for _, candidate := range r.Header.Values("If-None-Match") { + if strings.Trim(candidate, `"`) == strings.Trim(resp.etag, `"`) { + w.WriteHeader(http.StatusNotModified) + return + } + } + w.WriteHeader(resp.status) + return + } + w.WriteHeader(resp.status) + _, _ = w.Write(resp.body) +} + +func newDockerProxyTestApp(t *testing.T, storageDir string, domain string, upstream string, proxyURLs ...string) *fiber.App { + t.Helper() + proxyURL := "" + if len(proxyURLs) > 0 { + proxyURL = proxyURLs[0] + } + cfg := &config.Config{ + Global: config.GlobalConfig{ + ListenPort: 5000, + CacheTTL: config.Duration(30 * time.Second), + StoragePath: storageDir, + }, + Hubs: []config.HubConfig{ + { + Name: "docker", + Domain: domain, + Type: "docker", + Upstream: upstream, + Proxy: proxyURL, + }, + }, + } + + registry, err := server.NewHubRegistry(cfg) + if err != nil { + t.Fatalf("registry error: %v", err) + } + logger := logrus.New() + logger.SetOutput(io.Discard) + store, err := cache.NewStore(storageDir) + if err != nil { + t.Fatalf("store error: %v", err) + } + client := server.NewUpstreamClient(cfg) + handler := proxy.NewHandler(client, logger, store) + app, err := server.NewApp(server.AppOptions{ + Logger: logger, + Registry: registry, + Proxy: handler, + ListenPort: 5000, + }) + if err != nil { + t.Fatalf("app error: %v", err) + } + return app +} + func (s *cacheFlowStub) handle(w http.ResponseWriter, r *http.Request) { s.mu.Lock() etag := s.etag