This commit is contained in:
2025-11-15 23:03:02 +08:00
parent efd7737765
commit 467cabe238
4 changed files with 320 additions and 47 deletions

View File

@@ -107,7 +107,13 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
return h.fetchAndStream(c, route, locator, policy, strategyWriter, requestID, started, ctx)
}
func (h *Handler) serveCache(c fiber.Ctx, route *server.HubRoute, result *cache.ReadResult, requestID string, started time.Time) error {
func (h *Handler) serveCache(
c fiber.Ctx,
route *server.HubRoute,
result *cache.ReadResult,
requestID string,
started time.Time,
) error {
if seeker, ok := result.Reader.(io.Seeker); ok {
_, _ = seeker.Seek(0, io.SeekStart)
}
@@ -152,7 +158,16 @@ func (h *Handler) serveCache(c fiber.Ctx, route *server.HubRoute, result *cache.
return nil
}
func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, policy cachePolicy, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context) error {
func (h *Handler) fetchAndStream(
c fiber.Ctx,
route *server.HubRoute,
locator cache.Locator,
policy cachePolicy,
writer cache.StrategyWriter,
requestID string,
started time.Time,
ctx context.Context,
) error {
resp, upstreamURL, err := h.executeRequest(c, route)
if err != nil {
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
@@ -164,13 +179,34 @@ func (h *Handler) fetchAndStream(c fiber.Ctx, route *server.HubRoute, locator ca
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
if route.Config.Type == "pypi" {
if rewritten, rewriteErr := h.rewritePyPIResponse(route, resp, requestPath(c)); rewriteErr == nil {
resp = rewritten
} else {
h.logger.WithError(rewriteErr).WithFields(logrus.Fields{
"action": "pypi_rewrite",
"hub": route.Config.Name,
}).Warn("pypi_rewrite_failed")
}
}
defer resp.Body.Close()
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) && c.Method() == http.MethodGet
shouldStore := policy.allowStore && writer.Enabled() && isCacheableStatus(resp.StatusCode) &&
c.Method() == http.MethodGet
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
}
func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, shouldStore bool, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context) error {
func (h *Handler) consumeUpstream(
c fiber.Ctx,
route *server.HubRoute,
locator cache.Locator,
resp *http.Response,
shouldStore bool,
writer cache.StrategyWriter,
requestID string,
started time.Time,
ctx context.Context,
) error {
upstreamURL := resp.Request.URL.String()
method := c.Method()
authFailure := isAuthFailure(resp.StatusCode) && route.Config.HasCredentials()
@@ -204,7 +240,17 @@ func (h *Handler) consumeUpstream(c fiber.Ctx, route *server.HubRoute, locator c
return nil
}
func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, resp *http.Response, writer cache.StrategyWriter, requestID string, started time.Time, ctx context.Context, upstreamURL string) error {
func (h *Handler) cacheAndStream(
c fiber.Ctx,
route *server.HubRoute,
locator cache.Locator,
resp *http.Response,
writer cache.StrategyWriter,
requestID string,
started time.Time,
ctx context.Context,
upstreamURL string,
) error {
copyResponseHeaders(c, resp.Header)
c.Set("X-Any-Hub-Upstream", upstreamURL)
c.Set("X-Any-Hub-Cache-Hit", "false")
@@ -225,7 +271,14 @@ func (h *Handler) cacheAndStream(c fiber.Ctx, route *server.HubRoute, locator ca
return nil
}
func (h *Handler) retryOnAuthFailure(c fiber.Ctx, route *server.HubRoute, requestID string, started time.Time, resp *http.Response, upstreamURL *url.URL) (*http.Response, *url.URL, error) {
func (h *Handler) retryOnAuthFailure(
c fiber.Ctx,
route *server.HubRoute,
requestID string,
started time.Time,
resp *http.Response,
upstreamURL *url.URL,
) (*http.Response, *url.URL, error) {
if !shouldRetryAuth(route, resp.StatusCode) {
return resp, upstreamURL, nil
}
@@ -262,7 +315,11 @@ func (h *Handler) executeRequest(c fiber.Ctx, route *server.HubRoute) (*http.Res
return h.executeRequestWithAuth(c, route, "")
}
func (h *Handler) executeRequestWithAuth(c fiber.Ctx, route *server.HubRoute, authHeader string) (*http.Response, *url.URL, error) {
func (h *Handler) executeRequestWithAuth(
c fiber.Ctx,
route *server.HubRoute,
authHeader string,
) (*http.Response, *url.URL, error) {
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c)
body := bytesReader(c.Body())
req, err := h.buildUpstreamRequest(c, upstreamURL, route, c.Method(), body, authHeader)
@@ -274,7 +331,14 @@ func (h *Handler) executeRequestWithAuth(c fiber.Ctx, route *server.HubRoute, au
return resp, upstreamURL, err
}
func (h *Handler) buildUpstreamRequest(c fiber.Ctx, upstream *url.URL, route *server.HubRoute, method string, body io.Reader, overrideAuth string) (*http.Request, error) {
func (h *Handler) buildUpstreamRequest(
c fiber.Ctx,
upstream *url.URL,
route *server.HubRoute,
method string,
body io.Reader,
overrideAuth string,
) (*http.Request, error) {
ctx := c.Context()
if ctx == nil {
ctx = context.Background()
@@ -331,8 +395,24 @@ func (h *Handler) writeError(c fiber.Ctx, status int, code string) error {
return c.Status(status).JSON(fiber.Map{"error": code})
}
func (h *Handler) logResult(route *server.HubRoute, upstream string, requestID string, status int, cacheHit bool, started time.Time, err error) {
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), cacheHit)
func (h *Handler) logResult(
route *server.HubRoute,
upstream string,
requestID string,
status int,
cacheHit bool,
started time.Time,
err error,
) {
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.ModuleKey,
string(route.RolloutFlag),
cacheHit,
)
fields["action"] = "proxy"
fields["upstream"] = upstream
fields["upstream_status"] = status
@@ -481,6 +561,24 @@ func resolveUpstreamURL(route *server.HubRoute, base *url.URL, c fiber.Ctx) *url
if newPath, ok := applyDockerHubNamespaceFallback(route, clean); ok {
clean = newPath
}
if route != nil && route.Config.Type == "pypi" && strings.HasPrefix(clean, "/files/") {
trimmed := strings.TrimPrefix(clean, "/files/")
parts := strings.SplitN(trimmed, "/", 3)
if len(parts) >= 3 {
scheme := parts[0]
host := parts[1]
rest := parts[2]
filesBase := &url.URL{Scheme: scheme, Host: host}
if !strings.HasPrefix(rest, "/") {
rest = "/" + rest
}
relative := &url.URL{Path: rest, RawPath: rest}
if query := string(uri.QueryString()); query != "" {
relative.RawQuery = query
}
return filesBase.ResolveReference(relative)
}
}
relative := &url.URL{Path: clean, RawPath: clean}
if query := string(uri.QueryString()); query != "" {
relative.RawQuery = query
@@ -515,8 +613,8 @@ func routePort(route *server.HubRoute) string {
}
type cachePolicy struct {
allowCache bool
allowStore bool
allowCache bool
allowStore bool
requireRevalidate bool
}
@@ -527,10 +625,10 @@ func determineCachePolicy(route *server.HubRoute, locator cache.Locator, method
policy := cachePolicy{allowCache: true, allowStore: true}
path := stripQueryMarker(locator.Path)
switch route.Config.Type {
case "docker":
if path == "/v2" || path == "v2" || path == "/v2/" {
return cachePolicy{}
}
case "docker":
if path == "/v2" || path == "v2" || path == "/v2/" {
return cachePolicy{}
}
if strings.Contains(path, "/_catalog") {
return cachePolicy{}
}
@@ -539,27 +637,28 @@ case "docker":
}
policy.requireRevalidate = true
return policy
case "go":
if strings.Contains(path, "/@v/") && (strings.HasSuffix(path, ".zip") || strings.HasSuffix(path, ".mod") || strings.HasSuffix(path, ".info")) {
case "go":
if strings.Contains(path, "/@v/") &&
(strings.HasSuffix(path, ".zip") || strings.HasSuffix(path, ".mod") || strings.HasSuffix(path, ".info")) {
return policy
}
policy.requireRevalidate = true
return policy
case "npm":
if strings.Contains(path, "/-/") && strings.HasSuffix(path, ".tgz") {
return policy
}
policy.requireRevalidate = true
return policy
case "pypi":
if isPyPIDistribution(path) {
return policy
}
policy.requireRevalidate = true
return policy
default:
return policy
}
policy.requireRevalidate = true
return policy
case "npm":
if strings.Contains(path, "/-/") && strings.HasSuffix(path, ".tgz") {
return policy
}
policy.requireRevalidate = true
return policy
case "pypi":
if isPyPIDistribution(path) {
return policy
}
policy.requireRevalidate = true
return policy
default:
return policy
}
}
func isDockerImmutablePath(path string) bool {
@@ -595,7 +694,12 @@ func isCacheableStatus(status int) bool {
return status == http.StatusOK
}
func (h *Handler) isCacheFresh(c fiber.Ctx, route *server.HubRoute, locator cache.Locator, entry cache.Entry) (bool, error) {
func (h *Handler) isCacheFresh(
c fiber.Ctx,
route *server.HubRoute,
locator cache.Locator,
entry cache.Entry,
) (bool, error) {
ctx := c.Context()
if ctx == nil {
ctx = context.Background()
@@ -700,10 +804,11 @@ func applyPyPISimpleFallback(route *server.HubRoute, path string) (string, bool)
if route == nil || route.Config.Type != "pypi" {
return path, false
}
if strings.HasPrefix(path, "/simple/") || strings.HasPrefix(path, "/packages/") {
if strings.HasPrefix(path, "/simple/") || strings.HasPrefix(path, "/files/") {
return path, false
}
if strings.HasSuffix(path, ".whl") || strings.HasSuffix(path, ".tar.gz") || strings.HasSuffix(path, ".tar.bz2") || strings.HasSuffix(path, ".zip") {
if strings.HasSuffix(path, ".whl") || strings.HasSuffix(path, ".tar.gz") || strings.HasSuffix(path, ".tar.bz2") ||
strings.HasSuffix(path, ".zip") {
return path, false
}
trimmed := strings.Trim(path, "/")
@@ -761,7 +866,11 @@ func parseAuthParams(input string) map[string]string {
return params
}
func (h *Handler) fetchBearerToken(ctx context.Context, challenge bearerChallenge, route *server.HubRoute) (string, error) {
func (h *Handler) fetchBearerToken(
ctx context.Context,
challenge bearerChallenge,
route *server.HubRoute,
) (string, error) {
if challenge.Realm == "" {
return "", errors.New("bearer realm missing")
}
@@ -794,7 +903,11 @@ func (h *Handler) fetchBearerToken(ctx context.Context, challenge bearerChalleng
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return "", fmt.Errorf("token request failed: status=%d body=%s", resp.StatusCode, strings.TrimSpace(string(body)))
return "", fmt.Errorf(
"token request failed: status=%d body=%s",
resp.StatusCode,
strings.TrimSpace(string(body)),
)
}
var tokenResp struct {
@@ -832,7 +945,15 @@ func isAuthFailure(status int) bool {
}
func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestID string, status int) {
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), false)
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.ModuleKey,
string(route.RolloutFlag),
false,
)
fields["action"] = "proxy_retry"
fields["upstream"] = upstream
fields["upstream_status"] = status
@@ -844,7 +965,15 @@ func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestI
}
func (h *Handler) logAuthFailure(route *server.HubRoute, upstream string, requestID string, status int) {
fields := logging.RequestFields(route.Config.Name, route.Config.Domain, route.Config.Type, route.Config.AuthMode(), route.ModuleKey, string(route.RolloutFlag), false)
fields := logging.RequestFields(
route.Config.Name,
route.Config.Domain,
route.Config.Type,
route.Config.AuthMode(),
route.ModuleKey,
string(route.RolloutFlag),
false,
)
fields["action"] = "proxy"
fields["upstream"] = upstream
fields["upstream_status"] = status