千家信息网

sentinel的SentinelGatewayFilter有什么作用

发表于:2025-01-26 作者:千家信息网编辑
千家信息网最后更新 2025年01月26日,这篇文章主要讲解了"sentinel的SentinelGatewayFilter有什么作用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"sentine
千家信息网最后更新 2025年01月26日sentinel的SentinelGatewayFilter有什么作用

这篇文章主要讲解了"sentinel的SentinelGatewayFilter有什么作用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"sentinel的SentinelGatewayFilter有什么作用"吧!

本文主要研究一下sentinel的SentinelGatewayFilter

SentinelGatewayFilter

Sentinel-1.6.2/sentinel-adapter/sentinel-spring-cloud-gateway-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/gateway/sc/SentinelGatewayFilter.java

public class SentinelGatewayFilter implements GatewayFilter, GlobalFilter {    private final GatewayParamParser paramParser = new GatewayParamParser<>(        new ServerWebExchangeItemParser());    @Override    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {        Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);        Mono asyncResult = chain.filter(exchange);        if (route != null) {            String routeId = route.getId();            Object[] params = paramParser.parseParameterFor(routeId, exchange,                r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_ROUTE_ID);            String origin = Optional.ofNullable(GatewayCallbackManager.getRequestOriginParser())                .map(f -> f.apply(exchange))                .orElse("");            asyncResult = asyncResult.transform(                new SentinelReactorTransformer<>(new EntryConfig(routeId, EntryType.IN,                    1, params, new ContextConfig(contextName(routeId), origin)))            );        }        Set matchingApis = pickMatchingApiDefinitions(exchange);        for (String apiName : matchingApis) {            Object[] params = paramParser.parseParameterFor(apiName, exchange,                r -> r.getResourceMode() == SentinelGatewayConstants.RESOURCE_MODE_CUSTOM_API_NAME);            asyncResult = asyncResult.transform(                new SentinelReactorTransformer<>(new EntryConfig(apiName, EntryType.IN, 1, params))            );        }        return asyncResult;    }    private String contextName(String route) {        return SentinelGatewayConstants.GATEWAY_CONTEXT_ROUTE_PREFIX + route;    }    Set pickMatchingApiDefinitions(ServerWebExchange exchange) {        return GatewayApiMatcherManager.getApiMatcherMap().values()            .stream()            .filter(m -> m.test(exchange))            .map(WebExchangeApiMatcher::getApiName)            .collect(Collectors.toSet());    }}
  • SentinelGatewayFilter实现了GatewayFilter、GlobalFilter接口;其filter方法主要是获取route信息,然后对asyncResult进行transform,这里使用的是SentinelReactorTransformer

SentinelReactorTransformer

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorTransformer.java

public class SentinelReactorTransformer implements Function, Publisher> {    private final EntryConfig entryConfig;    public SentinelReactorTransformer(String resourceName) {        this(new EntryConfig(resourceName));    }    public SentinelReactorTransformer(EntryConfig entryConfig) {        AssertUtil.notNull(entryConfig, "entryConfig cannot be null");        this.entryConfig = entryConfig;    }    @Override    public Publisher apply(Publisher publisher) {        if (publisher instanceof Mono) {            return new MonoSentinelOperator<>((Mono) publisher, entryConfig);        }        if (publisher instanceof Flux) {            return new FluxSentinelOperator<>((Flux) publisher, entryConfig);        }        throw new IllegalStateException("Publisher type is not supported: " + publisher.getClass().getCanonicalName());    }}
  • SentinelReactorTransformer使用entryConfig创建了MonoSentinelOperator或者MonoSentinelOperator

MonoSentinelOperator

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/MonoSentinelOperator.java

public class MonoSentinelOperator extends MonoOperator {    private final EntryConfig entryConfig;    public MonoSentinelOperator(Mono source, EntryConfig entryConfig) {        super(source);        AssertUtil.notNull(entryConfig, "entryConfig cannot be null");        this.entryConfig = entryConfig;    }    @Override    public void subscribe(CoreSubscriber actual) {        source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, true));    }}
  • MonoSentinelOperator在subscribe的时候,使用的是SentinelReactorSubscriber

FluxSentinelOperator

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/FluxSentinelOperator.java

public class FluxSentinelOperator extends FluxOperator {    private final EntryConfig entryConfig;    public FluxSentinelOperator(Flux source, EntryConfig entryConfig) {        super(source);        AssertUtil.notNull(entryConfig, "entryConfig cannot be null");        this.entryConfig = entryConfig;    }    @Override    public void subscribe(CoreSubscriber actual) {        source.subscribe(new SentinelReactorSubscriber<>(entryConfig, actual, false));    }}
  • FluxSentinelOperator在subscribe的时候,使用的是SentinelReactorSubscriber

SentinelReactorSubscriber

Sentinel-1.6.2/sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java

public class SentinelReactorSubscriber extends InheritableBaseSubscriber {    private final EntryConfig entryConfig;    private final CoreSubscriber actual;    private final boolean unary;    private volatile AsyncEntry currentEntry;    private final AtomicBoolean entryExited = new AtomicBoolean(false);    public SentinelReactorSubscriber(EntryConfig entryConfig,                                     CoreSubscriber actual,                                     boolean unary) {        checkEntryConfig(entryConfig);        this.entryConfig = entryConfig;        this.actual = actual;        this.unary = unary;    }    private void checkEntryConfig(EntryConfig config) {        AssertUtil.notNull(config, "entryConfig cannot be null");    }    @Override    public Context currentContext() {        if (currentEntry == null || entryExited.get()) {            return actual.currentContext();        }        com.alibaba.csp.sentinel.context.Context sentinelContext = currentEntry.getAsyncContext();        if (sentinelContext == null) {            return actual.currentContext();        }        return actual.currentContext()            .put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext());    }    private void doWithContextOrCurrent(Supplier> contextSupplier,                                        Runnable f) {        Optional contextOpt = contextSupplier.get();        if (!contextOpt.isPresent()) {            // Provided context is absent, use current context.            f.run();        } else {            // Run on provided context.            ContextUtil.runOnContext(contextOpt.get(), f);        }    }    private void entryWhenSubscribed() {        ContextConfig sentinelContextConfig = entryConfig.getContextConfig();        if (sentinelContextConfig != null) {            // If current we're already in a context, the context config won't work.            ContextUtil.enter(sentinelContextConfig.getContextName(), sentinelContextConfig.getOrigin());        }        try {            AsyncEntry entry = SphU.asyncEntry(entryConfig.getResourceName(), entryConfig.getEntryType(),                entryConfig.getAcquireCount(), entryConfig.getArgs());            this.currentEntry = entry;            actual.onSubscribe(this);        } catch (BlockException ex) {            // Mark as completed (exited) explicitly.            entryExited.set(true);            // Signal cancel and propagate the {@code BlockException}.            cancel();            actual.onSubscribe(this);            actual.onError(ex);        } finally {            if (sentinelContextConfig != null) {                ContextUtil.exit();            }        }    }    @Override    protected void hookOnSubscribe(Subscription subscription) {        doWithContextOrCurrent(() -> currentContext().getOrEmpty(SentinelReactorConstants.SENTINEL_CONTEXT_KEY),            this::entryWhenSubscribed);    }    @Override    protected void hookOnNext(T value) {        if (isDisposed()) {            tryCompleteEntry();            return;        }        doWithContextOrCurrent(() -> Optional.ofNullable(currentEntry).map(AsyncEntry::getAsyncContext),            () -> actual.onNext(value));        if (unary) {            // For some cases of unary operator (Mono), we have to do this during onNext hook.            // e.g. this kind of order: onSubscribe() -> onNext() -> cancel() -> onComplete()            // the onComplete hook will not be executed so we'll need to complete the entry in advance.            tryCompleteEntry();        }    }    @Override    protected void hookOnComplete() {        tryCompleteEntry();        actual.onComplete();    }    @Override    protected boolean shouldCallErrorDropHook() {        // When flow control triggered or stream terminated, the incoming        // deprecated exceptions should be dropped implicitly, so we'll not call the `onErrorDropped` hook.        return !entryExited.get();    }    @Override    protected void hookOnError(Throwable t) {        if (currentEntry != null && currentEntry.getAsyncContext() != null) {            // Normal requests with non-BlockException will go through here.            Tracer.traceContext(t, 1, currentEntry.getAsyncContext());        }        tryCompleteEntry();        actual.onError(t);    }    @Override    protected void hookOnCancel() {    }    private boolean tryCompleteEntry() {        if (currentEntry != null && entryExited.compareAndSet(false, true)) {            currentEntry.exit(1, entryConfig.getArgs());            return true;        }        return false;    }}
  • SentinelReactorSubscriber继承了InheritableBaseSubscriber(拷贝自reactor.core.publisher.BaseSubscriber,允许子类覆盖onSubscribe、onNext、onError、onComplete方法)

  • 这里hookOnSubscribe调用了entryWhenSubscribed,它在sentinelContextConfig不为null的时候会先执行ContextUtil.enter,然后使用SphU.asyncEntry创建了AsyncEntry,最后在finally里头在sentinelContextConfig不为null的时候执行ContextUtil.exit();

  • 这里hookOnNext、hookOnComplete、hookOnError都调用了tryCompleteEntry方法,它主要是尝试退出AsyncEntry

小结

  • SentinelGatewayFilter实现了GatewayFilter、GlobalFilter接口;其filter方法主要是获取route信息,然后对asyncResult进行transform,这里使用的是SentinelReactorTransformer

  • SentinelReactorTransformer使用entryConfig创建了MonoSentinelOperator或者MonoSentinelOperator;它们在subscribe的时候,使用的是SentinelReactorSubscriber

  • SentinelReactorSubscriber主要是在hookOnSubscribe的时候调用了entryWhenSubscribed方法创建AsyncEntry,在hookOnNext、hookOnComplete、hookOnError的时候调用了tryCompleteEntry方法,尝试退出AsyncEntry

感谢各位的阅读,以上就是"sentinel的SentinelGatewayFilter有什么作用"的内容了,经过本文的学习后,相信大家对sentinel的SentinelGatewayFilter有什么作用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0