avatar
Bsx's tiny website
still alive...?
Invalid Date
if you like it...
Netty中常用的预定义handler的使用
2023-10-22
java, netty
2351需阅读12分钟

自定义handler

自定义入站的Handler可能会继承SimpleChannelInboundHandler<T>或者ChannelInboundHandlerAdapter,而SimpleChannelInboundHandler又是继承于ChannelInboundHandlerAdapter,最大的区别在于SimpleChannelInboundHandler会对没有外界引用的资源进行一定的清理,并且入站的消息可以通过泛型来规定

SimpleChannelInboundHandler覆盖了channelRead()方法,并在其中调用了channelRead0()处理逻辑,之后会释放消息资源

因此我们继承SimpleChannelInboundHandler后,处理入站的数据我们只需要重新实现channelRead0方法

IdleStateHandler

心跳机制主要是用来检测远端是否存活,如果不存活或活跃则对空闲Socket连接进行处理避免资源的浪费;

IdleStateHandler构造器

  • readerIdleTime读空闲超时时间设定,如果channelRead()方法超过readerIdleTime时间未被调用则会触发超时事件调用userEventTriggered()方法,传入事件为IdleStateEvent

  • writerIdleTime写空闲超时时间设定,如果write()方法超过writerIdleTime时间未被调用则会触发超时事件调用userEventTriggered()方法;

  • allIdleTime所有类型的空闲超时时间设定,包括读空闲和写空闲;

  • unit时间单位,包括时分秒等,例:TimeUnit.SECONDS;

服务端

  • 服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

  • 设定IdleStateHandler心跳检测每90秒进行一次读写检测,如果90秒内未读写则触发一次userEventTrigger()方法

JAVA
cp.addLast(new IdleStateHandler(0, 0, 90, TimeUnit.SECONDS));
JAVA
//90秒未读写则关闭连接 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.channel().close(); logger.warn("Channel idle in last {} seconds, close it", Beat.BEAT_TIMEOUT); } else { super.userEventTriggered(ctx, evt); } }

客户端

  • 客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

  • 设定IdleStateHandler心跳检测每30秒进行一次读写检测,如果30秒内未读写则触发一次userEventTrigger()方法,实现客户端每30秒向服务端发送一次消息;

JAVA
cp.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));
JAVA
//30s告诉服务端我还活着 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { //Send ping sendRequest(Beat.BEAT_PING); logger.debug("Client send beat-ping to " + remotePeer); } else { super.userEventTriggered(ctx, evt); } }

LengthFieldBasedFrameDecoder

基于长度字段的帧解码器,通过在消息头定义长度字段来标识消息总长度,自动屏蔽TCP底层的拆包和粘包问题

构造器:

JAVA
public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { }
  • byteOrder:表示字节流表示的数据是大端还是小端,用于长度域的读取;

  • maxFrameLength:表示的是包的最大长度,超出包的最大长度netty将会做一些特殊处理;

  • lengthFieldOffset:指的是长度域的偏移量,表示跳过指定长度个字节之后的才是长度域;

  • lengthFieldLength:长度字段本身的长度

  • lengthAdjustment:该字段加长度字段等于数据帧的长度,包体长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包;满足公式: 发送的字节数组bytes.length - lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment

  • initialBytesToStrip:从数据帧中跳过的字节数,表示获取完一个完整的数据包之后,忽略前面的指定的位数个字节,应用解码器拿到的就是不带长度域的数据包;

  • failFast:如果为true,则表示读取到长度域,TA的值的超过maxFrameLength,就抛出一个 TooLongFrameException,而为false表示只有当真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出。

例:

JAVA
//最大帧为65536字节,长度字段为4字节,长度值不包括头部 cp.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 0));

在帧解码器后通常是转自定义java对象的解码器:

JAVA
public class YourDecoder extends ByteToMessageDecoder { @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //将in反序列化为java对象,把java对象add进out里 } }
JAVA
public class YourEncoder extends MessageToByteEncoder { @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { //将java序列化为字节数组放进out里(按照帧解码器定制的格式) } }

LineBasedFrameDecoder

以换行符为结束标志的帧解码器,依次遍历ByteBuf中的可读字节,判断是否有"\n"或者"\r\n",如果有就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行

支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度,如果连续读到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

StringDecoder

将接收到的对象转换为字符串,然后继续调用后面的Handler,其后的handler接收到的不是ByteBuf而是String类型

LineBasedFrameDecoder+StringDecoder就是按行切换的文本解码器

StringEncoder

将写出的String类型转换成ByteBuf类型,写出时可以直接用String类型来写出

如果希望每次写出时自动加上换行符可以用LineEncoder来代替

使用例:

JAVA
ChannelPipeline pipeline = ...; // Decoders pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80)); pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); // Encoder pipeline.addLast("lineEncoder", new LineEncoder(LineSeparator.UNIX, CharsetUtil.UTF_8)); //LineSeparator.DEFAULT使用系统换行符 LineSeparator.WINDOWS使用windows

HttpServerCodec

该类其实是HttpRequestDecoderHttpResponseEncoder的封装,因此我们在ChannelPipeline中加入HttpServerCodec即可实现Http请求的解码和Http响应的编码;

HttpObjectAggregator

Http请求经过HttpServerCodec解码之后是HttpRequestHttpContents对象,HttpObjectAggregator会将多个HttpRequestHttpContents对象再拼装成一个FullHttpRequest,再将其传递到下个Handler

使用例:

JAVA
@Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast("codec", new HttpServerCodec()) .addLast("aggregator", new HttpObjectAggregator(512 * 1024))//聚合内容的最大长度(以字节为单位)。如果聚合内容的长度超过此值,将抛出TooLongFrameException .addLast(new HttpRequestHandler()); }

FullHttpRequest使用

FullHttpRequest api文档

JAVA
fullRequest.content().toString(CharsetUtil.UTF_8)//获取请求体body信息 private final HttpHeaders header = fullRequest.headers();//获取Netty内置的请求头对象 List<Map.Entry<String, String>> list = header.entries(); //将包含的请求信息赋值到list中 fullRequest.method().name(); //获取请求方法名 fullRequest.uri(); //获取请求URI fullRequest.protocolVersion().text() //获取HTTP协议版本,如"HTTP/1.0"

ChunkedWriteHandler

由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险

该handler的作用是支持异步发送大的码流,例如大文件传输,但不占用过多的内存,防止JVM内存溢出

JAVA
channelPipeline.addLast("chunkedWrite",new ChunkedWriteHandler());

WebSocketServerProtocolHandler

当调用该handler的handlerAdded方法时,会在它之前添加WebSocketServerProtocolHandshakeHandler,其channelRead方法会尝试接收一个FullHttpRequest对象,表示来自客户端的HTTP请求,随后该handler会进行一些验证操作,比如url验证,GET请求验证等,成功后将会进行握手相关操作,根据请求的websocket版本创建WebSocketServerHandshakerFactory,然后创建WebSocketServerHandshaker类来调用handshake方法进行握手 (在创建Handshaker后WebSocketServerProtocolHandshakeHandler就会从pipeline中移除)

handshake 方法中首先会把 HttpObjectAggregator 和 HttpContentCompressor 移除(如果有),然后如果之前使用了 HttpServerCodec,就在 HttpServerCodec 前添加WebSocket编解码器,然后给客户端一个http响应表示握手,最后移除掉HttpServerCodec。如果之前使用了 HttpRequestDecoder 和 HttpResponseEncoder,就把 HttpRequestDecoder 替换为 WebsocketDecoder ,并加入 WebSocketEncoder,然后给客户端一个http响应表示握手,最后移除掉HttpResponseEncoder。

简单来说就是握手后就不再进行http通信了,于是移除掉http相关的编解码器并添加websocket的编解码器

handshake 返回后会调用 fireUserEventTriggered 来触发之后handler的 userEventTriggered 方法,传入的事件是WebSocketServerProtocolHandler.HandshakeComplete,表示握手完成,可做一些自定义操作

WebSocket帧

帧 类 型描 述
BinaryWebSocketFrame包含了二进制数据
TextWebSocketFrame包含了文本数据
ContinuationWebSocketFrame包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame 的文本数据或者二进制数据
CloseWebSocketFrame表示一个CLOSE 请求,包含一个关闭的状态码和关闭的原因
PingWebSocketFrame请求传输一个PongWebSocketFrame
PongWebSocketFrame作为一个对于PingWebSocketFrame 的响应被发送

均继承自WebSocketFrame

使用例:

JAVA
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { public static final String WEBSOCKET_PATH = "/ws"; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //http解码器 pipeline.addLast(new HttpServerCodec()); //聚合器 pipeline.addLast(new HttpObjectAggregator(1024*1024*10)); //对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //webscoket 服务器处理的协议,用于指定给客户端连接访问的路由 //第二个参数代表根据startsWith来做URI匹配,即以WEBSOCKET_PATH开始的url都可建立websocket通信,默认为false即全匹配 pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, true)); //自定义handler(作用类似controller,客户端和服务器端之间发消息都在这个自定义handler里面处理) pipeline.addLast(new webSocketHandler()); } }
JAVA
public class webSocketHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof TextWebSocketFrame){ System.out.println("TextWebSocketFrame"+msg); textdoMessage(ctx,(TextWebSocketFrame)msg); }else if (msg instanceof WebSocketFrame){ System.out.println("WebSocketFrame"+msg); webdoMessage(ctx,(WebSocketFrame)msg); }else if (msg instanceof FullHttpRequest){ System.out.println("FullHttpRequest"+msg); httpdoMessage(ctx,(FullHttpRequest)msg); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { // 在此处获取URL、Headers等信息并做校验,可通过throw异常来中断链接。 } super.userEventTriggered(ctx, evt); } }

netty api文档

;
© 2024 BsxAbout meStill exploring...
GitHub·Twitter·Pixiv·Mail·Telegram
Powered by nextjs 14.