千家信息网

gateway、webflux、reactor-netty请求日志输出的方式是什么

发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,本篇内容介绍了"gateway、webflux、reactor-netty请求日志输出的方式是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理
千家信息网最后更新 2025年01月20日gateway、webflux、reactor-netty请求日志输出的方式是什么

本篇内容介绍了"gateway、webflux、reactor-netty请求日志输出的方式是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

gateway、webflux、reactor-netty请求日志输出

场景

在使用spring cloud gateway时想要输出请求日志,考虑到两种实现方案

方案一

官网中使用Reactor Netty Access Logs方案,配置"-Dreactor.netty.http.server.accessLogEnabled=true"开启日志记录。

输出如下:

reactor.netty.http.server.AccessLog :
10.2.20.177 - - [02/Dec/2020:16:41:57 +0800] "GET /fapi/gw/hi/login HTTP/1.1" 200 319 8080 626 ms

  • 优点:简单方便

  • 缺点:格式固定,信息量少

方案二

创建一个logfilter,在logfilter中解析request,并输出请求信息

  • 优点:可以自定义日志格式和内容,可以获取body信息

  • 缺点:返回信息需要再写一个filter,没有匹配到路由时无法进入到logfilter中

思路

对方案一进行改造,使其满足需求。对reactor-netty源码分析,主要涉及

  • AccessLog:日志工具,日志结构体

  • AccessLogHandler:http1.1协议日志控制,我们主要使用这个。

  • AccessLogHandler2:http2协议日志控制

代码如下:

package reactor.netty.http.server; import reactor.util.Logger;import reactor.util.Loggers; import java.time.ZonedDateTime;import java.time.format.DateTimeFormatter;import java.util.Locale;import java.util.Objects; final class AccessLog {    static final Logger log = Loggers.getLogger("reactor.netty.http.server.AccessLog");    static final DateTimeFormatter DATE_TIME_FORMATTER =            DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss Z", Locale.US);    static final String COMMON_LOG_FORMAT =            "{} - {} [{}] \"{} {} {}\" {} {} {} {} ms";    static final String MISSING = "-";     final String zonedDateTime;     String address;    CharSequence method;    CharSequence uri;    String protocol;    String user = MISSING;    CharSequence status;    long contentLength;    boolean chunked;    long startTime = System.currentTimeMillis();    int port;     AccessLog() {        this.zonedDateTime = ZonedDateTime.now().format(DATE_TIME_FORMATTER);    }     AccessLog address(String address) {        this.address = Objects.requireNonNull(address, "address");        return this;    }     AccessLog port(int port) {        this.port = port;        return this;    }     AccessLog method(CharSequence method) {        this.method = Objects.requireNonNull(method, "method");        return this;    }     AccessLog uri(CharSequence uri) {        this.uri = Objects.requireNonNull(uri, "uri");        return this;    }     AccessLog protocol(String protocol) {        this.protocol = Objects.requireNonNull(protocol, "protocol");        return this;    }     AccessLog status(CharSequence status) {        this.status = Objects.requireNonNull(status, "status");        return this;    }     AccessLog contentLength(long contentLength) {        this.contentLength = contentLength;        return this;    }     AccessLog increaseContentLength(long contentLength) {        if (chunked) {            this.contentLength += contentLength;        }        return this;    }     AccessLog chunked(boolean chunked) {        this.chunked = chunked;        return this;    }     long duration() {        return System.currentTimeMillis() - startTime;    }     void log() {        if (log.isInfoEnabled()) {            log.info(COMMON_LOG_FORMAT, address, user, zonedDateTime,                    method, uri, protocol, status, (contentLength > -1 ? contentLength : MISSING), port, duration());        }    }}
  • AccessLogHandler:日志控制

package reactor.netty.http.server; import io.netty.buffer.ByteBuf;import io.netty.buffer.ByteBufHolder;import io.netty.channel.ChannelDuplexHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpRequest;import io.netty.handler.codec.http.HttpResponse;import io.netty.handler.codec.http.HttpResponseStatus;import io.netty.handler.codec.http.HttpUtil;import io.netty.handler.codec.http.LastHttpContent; /** * @author Violeta Georgieva */final class AccessLogHandler extends ChannelDuplexHandler {     AccessLog accessLog = new AccessLog();     @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        if (msg instanceof HttpRequest) {            final HttpRequest request = (HttpRequest) msg;            final SocketChannel channel = (SocketChannel) ctx.channel();             accessLog = new AccessLog()                    .address(channel.remoteAddress().getHostString())                    .port(channel.localAddress().getPort())                    .method(request.method().name())                    .uri(request.uri())                    .protocol(request.protocolVersion().text());        }        ctx.fireChannelRead(msg);    }     @Override    @SuppressWarnings("FutureReturnValueIgnored")    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {        if (msg instanceof HttpResponse) {            final HttpResponse response = (HttpResponse) msg;            final HttpResponseStatus status = response.status();             if (status.equals(HttpResponseStatus.CONTINUE)) {                //"FutureReturnValueIgnored" this is deliberate                ctx.write(msg, promise);                return;            }             final boolean chunked = HttpUtil.isTransferEncodingChunked(response);            accessLog.status(status.codeAsText())                     .chunked(chunked);            if (!chunked) {                accessLog.contentLength(HttpUtil.getContentLength(response, -1));            }        }        if (msg instanceof LastHttpContent) {            accessLog.increaseContentLength(((LastHttpContent) msg).content().readableBytes());            ctx.write(msg, promise.unvoid())               .addListener(future -> {                   if (future.isSuccess()) {                       accessLog.log();                   }               });            return;        }        if (msg instanceof ByteBuf) {            accessLog.increaseContentLength(((ByteBuf) msg).readableBytes());        }        if (msg instanceof ByteBufHolder) {            accessLog.increaseContentLength(((ByteBufHolder) msg).content().readableBytes());        }        //"FutureReturnValueIgnored" this is deliberate        ctx.write(msg, promise);    }}

执行顺序

AccessLogHandler.channelRead > GlobalFilter.filter > AbstractLoadBalance.choose >response.writeWith >AccessLogHandler.write

解决方案

对AccessLog和AccessLogHandler进行重写,输出自己想要的内容和样式。

AccessLogHandler中重写了ChannelDuplexHandler中的channelRead和write方法,还可以对ChannelInboundHandler和ChannelOutboundHandler中的方法进行重写,覆盖请求的整个生命周期。

spring-webflux、gateway、springboot-start-web问题

Spring-webflux

当两者一起时配置的并不是webflux web application, 仍然时一个spring mvc web application。

官方文档中有这么一段注解:

很多开发者添加spring-boot-start-webflux到他们的spring mvc web applicaiton去是为了使用reactive WebClient. 如果希望更改webApplication 类型需要显示的设置,如SpringApplication.setWebApplicationType(WebApplicationType.REACTIVE).

            org.springframework.boot        spring-boot-starter-webflux                org.springframework.boot        spring-boot-starter-web    

结论一:

当两者一起时配置的并不是webflux web application, 仍然时一个spring mvc web application。但是启动不会报错,可以正常使用,但是webflux功能失效

Spring-gateway

因为gateway和zuul不一样,gateway用的是长连接,netty-webflux,zuul1.0用的就是同步webmvc。

所以你的非gateway子项目启动用的是webmvc,你的gateway启动用的是webflux. spring-boot-start-web和spring-boot-start-webflux相见分外眼红。

不能配置在同一pom.xml,或者不能在同一项目中出现,不然就会启动报错

            org.springframework.cloud        spring-cloud-starter-gateway    

结论二:

当spring-cloud-gateway和spring-boot-starer-web两者一起时配置的时候, 启动直接报错,依赖包冲突不兼容

"gateway、webflux、reactor-netty请求日志输出的方式是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0