This commit is contained in:
2025-11-17 15:39:44 +08:00
parent abfa51f12e
commit 1ddda89499
46 changed files with 2185 additions and 751 deletions

View File

@@ -36,6 +36,14 @@ type Handler struct {
etags sync.Map // key: hub+path, value: etag/digest string
}
type hookState struct {
ctx *hooks.RequestContext
def hooks.Hooks
hasHooks bool
clean string
rawQuery []byte
}
// NewHandler constructs a proxy handler with shared HTTP client/logger/store.
func NewHandler(client *http.Client, logger *logrus.Logger, store cache.Store) *Handler {
return &Handler{
@@ -45,23 +53,58 @@ func NewHandler(client *http.Client, logger *logrus.Logger, store cache.Store) *
}
}
func buildHookContext(route *server.HubRoute, c fiber.Ctx) *hooks.RequestContext {
if route == nil {
return &hooks.RequestContext{Method: c.Method()}
}
baseHost := ""
if route.UpstreamURL != nil {
baseHost = route.UpstreamURL.Host
}
return &hooks.RequestContext{
HubName: route.Config.Name,
Domain: route.Config.Domain,
HubType: route.Config.Type,
ModuleKey: route.ModuleKey,
RolloutFlag: string(route.RolloutFlag),
UpstreamHost: baseHost,
Method: c.Method(),
}
}
func hasHook(def hooks.Hooks) bool {
return def.NormalizePath != nil ||
def.ResolveUpstream != nil ||
def.RewriteResponse != nil ||
def.CachePolicy != nil ||
def.ContentType != nil
}
// Handle 执行缓存查找、条件回源和最终 streaming 逻辑,任何阶段出错都会输出结构化日志。
func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
started := time.Now()
requestID := server.RequestID(c)
reqCtx := newRequestContext(route, c.Method())
moduleHooks, _ := hooks.For(route.ModuleKey)
locator := buildLocator(route, c, reqCtx, moduleHooks)
policy := determineCachePolicy(route, locator, c.Method(), reqCtx, moduleHooks)
strategyWriter := cache.NewStrategyWriter(h.store, route.CacheStrategy)
if err := ensureProxyHubType(route); err != nil {
h.logger.WithFields(logrus.Fields{
"hub": route.Config.Name,
"module_key": route.ModuleKey,
}).WithError(err).Error("hub_type_unsupported")
return h.writeError(c, fiber.StatusNotImplemented, "hub_type_unsupported")
hooksDef, ok := hooks.Fetch(route.ModuleKey)
hookCtx := buildHookContext(route, c)
rawQuery := append([]byte(nil), c.Request().URI().QueryString()...)
cleanPath := normalizeRequestPath(route, string(c.Request().URI().Path()))
if hasHook(hooksDef) && hooksDef.NormalizePath != nil {
newPath, newQuery := hooksDef.NormalizePath(hookCtx, cleanPath, rawQuery)
if newPath != "" {
cleanPath = newPath
}
rawQuery = newQuery
}
locator := buildLocator(route, c, cleanPath, rawQuery)
policy := determineCachePolicyWithHook(route, locator, c.Method(), hooksDef, ok, hookCtx)
hookState := hookState{
ctx: hookCtx,
def: hooksDef,
hasHooks: ok && hasHook(hooksDef),
clean: cleanPath,
rawQuery: rawQuery,
}
strategyWriter := cache.NewStrategyWriter(h.store, route.CacheStrategy)
ctx := c.Context()
if ctx == nil {
@@ -89,7 +132,7 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
if strategyWriter.ShouldBypassValidation(cached.Entry) {
serve = true
} else if strategyWriter.SupportsValidation() {
fresh, err := h.isCacheFresh(c, route, locator, cached.Entry)
fresh, err := h.isCacheFresh(c, route, locator, cached.Entry, &hookState)
if err != nil {
h.logger.WithError(err).
WithFields(logrus.Fields{"hub": route.Config.Name, "module_key": route.ModuleKey}).
@@ -104,12 +147,12 @@ func (h *Handler) Handle(c fiber.Ctx, route *server.HubRoute) error {
}
if serve {
defer cached.Reader.Close()
return h.serveCache(c, route, cached, requestID, started)
return h.serveCache(c, route, cached, requestID, started, &hookState)
}
cached.Reader.Close()
}
return h.fetchAndStream(c, route, locator, policy, strategyWriter, requestID, started, ctx, reqCtx, moduleHooks)
return h.fetchAndStream(c, route, locator, policy, strategyWriter, requestID, started, ctx, &hookState)
}
func (h *Handler) serveCache(
@@ -118,6 +161,7 @@ func (h *Handler) serveCache(
result *cache.ReadResult,
requestID string,
started time.Time,
hook *hookState,
) error {
var readSeeker io.ReadSeeker
switch reader := result.Reader.(type) {
@@ -130,44 +174,12 @@ func (h *Handler) serveCache(
method := c.Method()
contentType := inferCachedContentType(route, result.Entry.Locator)
if contentType == "" && shouldSniffDockerManifest(route, result.Entry.Locator) {
contentType := resolveContentType(route, result.Entry.Locator, hook)
if contentType == "" && shouldSniffDockerManifest(result.Entry.Locator) {
if sniffed := sniffDockerManifestContentType(readSeeker); sniffed != "" {
contentType = sniffed
}
}
if route != nil && route.Config.Type == "composer" && isComposerMetadataPath(stripQueryMarker(result.Entry.Locator.Path)) {
body, err := io.ReadAll(result.Reader)
result.Reader.Close()
if err != nil {
return fiber.NewError(fiber.StatusBadGateway, fmt.Sprintf("read cache failed: %v", err))
}
rewritten := body
if stripQueryMarker(result.Entry.Locator.Path) == "/packages.json" {
if data, changed, err := rewriteComposerRootBody(body, route.Config.Domain); err == nil && changed {
rewritten = data
}
} else {
if data, changed, err := rewriteComposerMetadata(body, route.Config.Domain); err == nil && changed {
rewritten = data
}
}
c.Set("Content-Type", "application/json")
c.Set("X-Any-Hub-Upstream", route.UpstreamURL.String())
c.Set("X-Any-Hub-Cache-Hit", "true")
if requestID != "" {
c.Set("X-Request-ID", requestID)
}
c.Status(fiber.StatusOK)
c.Response().Header.SetContentLength(len(rewritten))
_, err = c.Response().BodyWriter().Write(rewritten)
h.logResult(route, route.UpstreamURL.String(), requestID, fiber.StatusOK, true, started, err)
if err != nil {
return fiber.NewError(fiber.StatusBadGateway, fmt.Sprintf("read cache failed: %v", err))
}
return nil
}
if contentType != "" {
c.Set("Content-Type", contentType)
} else {
@@ -214,35 +226,27 @@ func (h *Handler) fetchAndStream(
requestID string,
started time.Time,
ctx context.Context,
hook *hookState,
) error {
resp, upstreamURL, err := h.executeRequest(c, route)
resp, upstreamURL, err := h.executeRequest(c, route, hook)
if err != nil {
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL)
resp, upstreamURL, err = h.retryOnAuthFailure(c, route, requestID, started, resp, upstreamURL, hook)
if err != nil {
h.logResult(route, upstreamURL.String(), requestID, 0, false, started, err)
return h.writeError(c, fiber.StatusBadGateway, "upstream_failed")
}
if route.Config.Type == "pypi" {
if rewritten, rewriteErr := h.rewritePyPIResponse(route, resp, requestPath(c)); rewriteErr == nil {
if hook != nil && hook.hasHooks && hook.def.RewriteResponse != nil {
if rewritten, rewriteErr := applyHookRewrite(hook, resp, requestPath(c)); rewriteErr == nil {
resp = rewritten
} else {
h.logger.WithError(rewriteErr).WithFields(logrus.Fields{
"action": "pypi_rewrite",
"action": "hook_rewrite",
"hub": route.Config.Name,
}).Warn("pypi_rewrite_failed")
}
} else if route.Config.Type == "composer" {
if rewritten, rewriteErr := h.rewriteComposerResponse(route, resp, requestPath(c)); rewriteErr == nil {
resp = rewritten
} else {
h.logger.WithError(rewriteErr).WithFields(logrus.Fields{
"action": "composer_rewrite",
"hub": route.Config.Name,
}).Warn("composer_rewrite_failed")
}).Warn("hook_rewrite_failed")
}
}
defer resp.Body.Close()
@@ -252,6 +256,42 @@ func (h *Handler) fetchAndStream(
return h.consumeUpstream(c, route, locator, resp, shouldStore, writer, requestID, started, ctx)
}
func applyHookRewrite(hook *hookState, resp *http.Response, path string) (*http.Response, error) {
if hook == nil || hook.def.RewriteResponse == nil {
return resp, nil
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
headers := make(map[string]string, len(resp.Header))
for key, values := range resp.Header {
if len(values) > 0 {
headers[key] = values[0]
}
}
status, newHeaders, newBody, rewriteErr := hook.def.RewriteResponse(hook.ctx, resp.StatusCode, headers, body, path)
if rewriteErr != nil {
return nil, rewriteErr
}
if newHeaders == nil {
newHeaders = headers
}
if newBody == nil {
newBody = body
}
cloned := *resp
cloned.StatusCode = status
cloned.Header = make(http.Header, len(newHeaders))
for key, value := range newHeaders {
cloned.Header.Set(key, value)
}
cloned.Body = io.NopCloser(bytes.NewReader(newBody))
cloned.ContentLength = int64(len(newBody))
return &cloned, nil
}
func (h *Handler) consumeUpstream(
c fiber.Ctx,
route *server.HubRoute,
@@ -335,6 +375,7 @@ func (h *Handler) retryOnAuthFailure(
started time.Time,
resp *http.Response,
upstreamURL *url.URL,
hook *hookState,
) (*http.Response, *url.URL, error) {
if !shouldRetryAuth(route, resp.StatusCode) {
return resp, upstreamURL, nil
@@ -354,30 +395,31 @@ func (h *Handler) retryOnAuthFailure(
return nil, upstreamURL, err
}
authHeader := "Bearer " + token
retryResp, retryURL, err := h.executeRequestWithAuth(c, route, authHeader)
retryResp, retryURL, err := h.executeRequestWithAuth(c, route, hook, authHeader)
if err != nil {
return nil, upstreamURL, err
}
return retryResp, retryURL, nil
}
retryResp, retryURL, err := h.executeRequest(c, route)
retryResp, retryURL, err := h.executeRequest(c, route, hook)
if err != nil {
return nil, upstreamURL, err
}
return retryResp, retryURL, nil
}
func (h *Handler) executeRequest(c fiber.Ctx, route *server.HubRoute) (*http.Response, *url.URL, error) {
return h.executeRequestWithAuth(c, route, "")
func (h *Handler) executeRequest(c fiber.Ctx, route *server.HubRoute, hook *hookState) (*http.Response, *url.URL, error) {
return h.executeRequestWithAuth(c, route, hook, "")
}
func (h *Handler) executeRequestWithAuth(
c fiber.Ctx,
route *server.HubRoute,
hook *hookState,
authHeader string,
) (*http.Response, *url.URL, error) {
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c)
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook)
body := bytesReader(c.Body())
req, err := h.buildUpstreamRequest(c, upstreamURL, route, c.Method(), body, authHeader)
if err != nil {
@@ -469,6 +511,7 @@ func (h *Handler) logResult(
route.ModuleKey,
string(route.RolloutFlag),
cacheHit,
route.ModuleKey == hubmodule.DefaultModuleKey(),
)
fields["action"] = "proxy"
fields["upstream"] = upstream
@@ -506,47 +549,20 @@ func inferCachedContentType(route *server.HubRoute, locator cache.Locator) strin
return "application/x-tar"
}
if route != nil {
switch route.Config.Type {
case "docker":
if strings.Contains(clean, "/manifests/") {
return ""
}
if strings.Contains(clean, "/tags/list") {
return "application/json"
}
if strings.Contains(clean, "/blobs/") {
return "application/octet-stream"
}
case "npm":
if strings.HasSuffix(clean, ".json") {
return "application/json"
}
case "pypi":
if strings.Contains(clean, "/simple/") {
return "text/html"
}
}
}
return ""
}
func buildLocator(route *server.HubRoute, c fiber.Ctx) cache.Locator {
uri := c.Request().URI()
pathVal := string(uri.Path())
clean := normalizeRequestPath(route, pathVal)
if newPath, ok := applyPyPISimpleFallback(route, clean); ok {
clean = newPath
}
if newPath, ok := applyDockerHubNamespaceFallback(route, clean); ok {
clean = newPath
}
query := uri.QueryString()
if route != nil && route.Config.Type == "composer" && isComposerDistPath(clean) {
// composer dist URLs often embed per-request tokens; ignore query for cache key
query = nil
func resolveContentType(route *server.HubRoute, locator cache.Locator, hook *hookState) string {
if hook != nil && hook.hasHooks && hook.def.ContentType != nil {
if ct := hook.def.ContentType(hook.ctx, stripQueryMarker(locator.Path)); ct != "" {
return ct
}
}
return inferCachedContentType(route, locator)
}
func buildLocator(route *server.HubRoute, c fiber.Ctx, clean string, rawQuery []byte) cache.Locator {
query := rawQuery
if len(query) > 0 {
sum := sha1.Sum(query)
clean = fmt.Sprintf("%s/__qs/%s", clean, hex.EncodeToString(sum[:]))
@@ -580,10 +596,7 @@ func stripQueryMarker(p string) string {
return p
}
func shouldSniffDockerManifest(route *server.HubRoute, locator cache.Locator) bool {
if route == nil || route.Config.Type != "docker" {
return false
}
func shouldSniffDockerManifest(locator cache.Locator) bool {
clean := stripQueryMarker(locator.Path)
return strings.Contains(clean, "/manifests/")
}
@@ -631,11 +644,7 @@ func normalizeRequestPath(route *server.HubRoute, raw string) string {
if raw == "" {
raw = "/"
}
hasSlash := strings.HasSuffix(raw, "/")
clean := path.Clean("/" + raw)
if route != nil && route.Config.Type == "pypi" && hasSlash && clean != "/" && !strings.HasSuffix(clean, "/") {
clean += "/"
}
return clean
}
@@ -646,42 +655,28 @@ func bytesReader(b []byte) io.Reader {
return bytes.NewReader(b)
}
func resolveUpstreamURL(route *server.HubRoute, base *url.URL, c fiber.Ctx) *url.URL {
func resolveUpstreamURL(route *server.HubRoute, base *url.URL, c fiber.Ctx, hook *hookState) *url.URL {
uri := c.Request().URI()
pathVal := string(uri.Path())
clean := normalizeRequestPath(route, pathVal)
if newPath, ok := applyPyPISimpleFallback(route, clean); ok {
clean = newPath
}
if newPath, ok := applyDockerHubNamespaceFallback(route, clean); ok {
clean = newPath
}
if route != nil && route.Config.Type == "pypi" && strings.HasPrefix(clean, "/files/") {
trimmed := strings.TrimPrefix(clean, "/files/")
parts := strings.SplitN(trimmed, "/", 3)
if len(parts) >= 3 {
scheme := parts[0]
host := parts[1]
rest := parts[2]
filesBase := &url.URL{Scheme: scheme, Host: host}
if !strings.HasPrefix(rest, "/") {
rest = "/" + rest
}
relative := &url.URL{Path: rest, RawPath: rest}
if query := string(uri.QueryString()); query != "" {
relative.RawQuery = query
}
return filesBase.ResolveReference(relative)
rawQuery := append([]byte(nil), uri.QueryString()...)
clean := normalizeRequestPath(route, string(uri.Path()))
if hook != nil {
if hook.clean != "" {
clean = hook.clean
}
}
if route != nil && route.Config.Type == "composer" && strings.HasPrefix(clean, "/dist/") {
if distTarget, ok := parseComposerDistURL(clean, string(uri.QueryString())); ok {
return distTarget
if hook.rawQuery != nil {
rawQuery = hook.rawQuery
}
if hook.hasHooks && hook.def.ResolveUpstream != nil {
if u := hook.def.ResolveUpstream(hook.ctx, base.String(), clean, rawQuery); u != "" {
if parsed, err := url.Parse(u); err == nil {
return parsed
}
}
}
}
relative := &url.URL{Path: clean, RawPath: clean}
if query := string(uri.QueryString()); query != "" {
relative.RawQuery = query
if len(rawQuery) > 0 {
relative.RawQuery = string(rawQuery)
}
return base.ResolveReference(relative)
}
@@ -718,85 +713,27 @@ type cachePolicy struct {
requireRevalidate bool
}
func determineCachePolicyWithHook(route *server.HubRoute, locator cache.Locator, method string, def hooks.Hooks, enabled bool, ctx *hooks.RequestContext) cachePolicy {
base := determineCachePolicy(route, locator, method)
if !enabled || def.CachePolicy == nil {
return base
}
updated := def.CachePolicy(ctx, locator.Path, hooks.CachePolicy{
AllowCache: base.allowCache,
AllowStore: base.allowStore,
RequireRevalidate: base.requireRevalidate,
})
base.allowCache = updated.AllowCache
base.allowStore = updated.AllowStore
base.requireRevalidate = updated.RequireRevalidate
return base
}
func determineCachePolicy(route *server.HubRoute, locator cache.Locator, method string) cachePolicy {
if route == nil || method != http.MethodGet {
if method != http.MethodGet {
return cachePolicy{}
}
policy := cachePolicy{allowCache: true, allowStore: true}
path := stripQueryMarker(locator.Path)
switch route.Config.Type {
case "docker":
if path == "/v2" || path == "v2" || path == "/v2/" {
return cachePolicy{}
}
if strings.Contains(path, "/_catalog") {
return cachePolicy{}
}
if isDockerImmutablePath(path) {
return policy
}
policy.requireRevalidate = true
return policy
case "go":
if strings.Contains(path, "/@v/") &&
(strings.HasSuffix(path, ".zip") || strings.HasSuffix(path, ".mod") || strings.HasSuffix(path, ".info")) {
return policy
}
policy.requireRevalidate = true
return policy
case "npm":
if strings.Contains(path, "/-/") && strings.HasSuffix(path, ".tgz") {
return policy
}
policy.requireRevalidate = true
return policy
case "pypi":
if isPyPIDistribution(path) {
return policy
}
policy.requireRevalidate = true
return policy
case "composer":
if isComposerDistPath(path) {
return policy
}
if isComposerMetadataPath(path) {
policy.requireRevalidate = true
return policy
}
return cachePolicy{}
default:
return policy
}
}
func isDockerImmutablePath(path string) bool {
if strings.Contains(path, "/blobs/sha256:") {
return true
}
if strings.Contains(path, "/manifests/sha256:") {
return true
}
return false
}
func isPyPIDistribution(path string) bool {
switch {
case strings.HasSuffix(path, ".whl"):
return true
case strings.HasSuffix(path, ".tar.gz"):
return true
case strings.HasSuffix(path, ".tar.bz2"):
return true
case strings.HasSuffix(path, ".tgz"):
return true
case strings.HasSuffix(path, ".zip"):
return true
case strings.HasSuffix(path, ".egg"):
return true
default:
return false
}
return cachePolicy{allowCache: true, allowStore: true}
}
func isCacheableStatus(status int) bool {
@@ -808,13 +745,14 @@ func (h *Handler) isCacheFresh(
route *server.HubRoute,
locator cache.Locator,
entry cache.Entry,
hook *hookState,
) (bool, error) {
ctx := c.Context()
if ctx == nil {
ctx = context.Background()
}
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c)
upstreamURL := resolveUpstreamURL(route, route.UpstreamURL, c, hook)
resp, err := h.revalidateRequest(c, route, upstreamURL, locator, "")
if err != nil {
return false, err
@@ -888,113 +826,6 @@ func extractModTime(header http.Header) time.Time {
return time.Now().UTC()
}
func applyDockerHubNamespaceFallback(route *server.HubRoute, path string) (string, bool) {
if !isDockerHubRoute(route) {
return path, false
}
repo, rest, ok := splitDockerRepoPath(path)
if !ok || repo == "" {
return path, false
}
if repo == "library" || strings.Contains(repo, "/") {
return path, false
}
normalized := "/v2/library/" + repo + rest
return normalized, true
}
func isDockerHubRoute(route *server.HubRoute) bool {
if route == nil || route.Config.Type != "docker" || route.UpstreamURL == nil {
return false
}
host := strings.ToLower(route.UpstreamURL.Hostname())
switch host {
case "registry-1.docker.io", "docker.io", "index.docker.io":
return true
default:
return false
}
}
func splitDockerRepoPath(path string) (string, string, bool) {
if !strings.HasPrefix(path, "/v2/") {
return "", "", false
}
suffix := strings.TrimPrefix(path, "/v2/")
if suffix == "" || suffix == "/" {
return "", "", false
}
segments := strings.Split(suffix, "/")
var repoSegments []string
for i, seg := range segments {
if seg == "" {
return "", "", false
}
switch seg {
case "manifests", "blobs", "tags", "referrers":
if len(repoSegments) == 0 {
return "", "", false
}
rest := "/" + strings.Join(segments[i:], "/")
return strings.Join(repoSegments, "/"), rest, true
case "_catalog":
return "", "", false
}
repoSegments = append(repoSegments, seg)
}
return "", "", false
}
func applyPyPISimpleFallback(route *server.HubRoute, path string) (string, bool) {
if route == nil || route.Config.Type != "pypi" {
return path, false
}
if strings.HasPrefix(path, "/simple/") || strings.HasPrefix(path, "/files/") {
return path, false
}
if strings.HasSuffix(path, ".whl") || strings.HasSuffix(path, ".tar.gz") || strings.HasSuffix(path, ".tar.bz2") ||
strings.HasSuffix(path, ".zip") {
return path, false
}
trimmed := strings.Trim(path, "/")
if trimmed == "" || strings.HasPrefix(trimmed, "_") {
return path, false
}
return "/simple/" + trimmed + "/", true
}
func parseComposerDistURL(path string, rawQuery string) (*url.URL, bool) {
if !strings.HasPrefix(path, "/dist/") {
return nil, false
}
trimmed := strings.TrimPrefix(path, "/dist/")
parts := strings.SplitN(trimmed, "/", 3)
if len(parts) < 3 {
return nil, false
}
scheme := parts[0]
host := parts[1]
rest := parts[2]
if scheme == "" || host == "" {
return nil, false
}
if rest == "" {
rest = "/"
} else {
rest = "/" + rest
}
target := &url.URL{
Scheme: scheme,
Host: host,
Path: rest,
RawPath: rest,
}
if rawQuery != "" {
target.RawQuery = rawQuery
}
return target, true
}
type bearerChallenge struct {
Realm string
Service string
@@ -1130,6 +961,7 @@ func (h *Handler) logAuthRetry(route *server.HubRoute, upstream string, requestI
route.ModuleKey,
string(route.RolloutFlag),
false,
route.ModuleKey == hubmodule.DefaultModuleKey(),
)
fields["action"] = "proxy_retry"
fields["upstream"] = upstream
@@ -1150,6 +982,7 @@ func (h *Handler) logAuthFailure(route *server.HubRoute, upstream string, reques
route.ModuleKey,
string(route.RolloutFlag),
false,
route.ModuleKey == hubmodule.DefaultModuleKey(),
)
fields["action"] = "proxy"
fields["upstream"] = upstream
@@ -1204,20 +1037,3 @@ func normalizeETag(value string) string {
}
return strings.Trim(value, "\"")
}
func ensureProxyHubType(route *server.HubRoute) error {
switch route.Config.Type {
case "docker":
return nil
case "npm":
return nil
case "go":
return nil
case "pypi":
return nil
case "composer":
return nil
default:
return fmt.Errorf("unsupported hub type: %s", route.Config.Type)
}
}