问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

Netty 系列(2) — Netty 入门

发布网友 发布时间:2024-09-27 07:11

我来回答

1个回答

热心网友 时间:2024-10-30 18:07

Netty 基础Netty 简介

如果基于原生的 NIO 进行网络编程,有很多问题都需要自己进行大量的编程来实现,比如连接异常、网络闪断、粘包拆包、半包读写、网络拥塞、异常码流、请求排队等等。开发者需要深入掌握线程、IO、网络等相关概念,很容易导致代码复杂、晦涩,难以快速地写出高可靠性的实现。Java 自身的 NIO 设计更偏底层,其复杂性、扩展性等方面,存在一定的局限性。

在基础 NIO 之上,Netty 通过精巧设计的事件机制,将业务逻辑和无关技术逻辑进行隔离,封装了很多复杂的网络通信细节,提供了相对十分简单易用的 API,非常适合网络编程,开发者可以基于它开发出非常复杂的网络通信程序。

Netty 是一个异步的、基于事件驱动的 Client/Server 的网络应用程序框架,用以快速开发高吞吐量、低延时、高可靠性的网络服务器和客户端程序。Netty 是业界最流行的一个Java开源 NIO 框架 之一,广泛应用于各种分布式、即时通信和中间件中,例如 Dubbo、Elasticsearch、RocketMQ 等。

Netty 模块

从 Netty 官方的模块划分来看,主要包含三大模块:

Core:核心模块,包括零拷贝、API库、可扩展的事件模型等。

Transport Services:传输服务,包括 Socket、Datagram、HTTP Tunnel 等。

Protocol Support:协议支持,包括 HTTP、WebSocket、SSL、Google Protobuf、zlib/gzip 压缩与解压缩、Large File Transfer 大文件传输等等。

Netty 完全是 Java NIO 框架的一个超集,除了核心的事件机制等,Netty 还额外提供了很多功能,例如:

Netty 除了支持传输层的 UDP、TCP、SCTP协议,也支持 HTTP(s)、WebSocket 等多种应用层协议,它并不是单一协议的 API。

Netty 提供了一系列扩展的编解码框架,与应用开发场景无缝衔接,并且性能良好。

扩展了 Java NIO Buffer,提供了自己的 ByteBuf 实现,并且深度支持 Direct Buffer 等技术。

版本说明

本系列文章使用的 Netty 版本为 4.1.76.Final。

Maven 依赖如下:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.76.Final</version></dependency>

Netty Github:https://github.com/netty/netty

参考书籍:《Netty 权威指南》

Netty 核心组件Demo 程序

下面先看下用 Netty 写的一个客户端/服务端网络通信程序,代码注释说明了每行代码的含义。

Netty 服务端

package com.lyyzoo.netty.netty;import java.nio.charset.StandardCharsets;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {private final int port;public NettyServer(int port) {this.port = port;}/** * 启动 Netty Server */public void start() {// 负责和客户端建立网络连接的线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 负责处理网络IO请求读取和处理的线程组EventLoopGroup workerGroup = new NioEventLoopGroup(32);try {// Netty网络服务器(服务端启动类)ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap// 设置线程组.group(bossGroup, workerGroup)// 网络通信通道,负责监听指定的端口.channel(NioServerSocketChannel.class)// 网络通道处理器初始化.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// SocketChannel 的处理管道ChannelPipeline channelPipeline = socketChannel.pipeline();// 添加一些处理器channelPipeline// 自定义的服务端处理器.addLast(new NettyServerHandler());}})// 设置全连接队列大小.option(ChannelOption.SO_BACKLOG, 1024)// 保持网络连接.childOption(ChannelOption.SO_KEEPALIVE, true);// 绑定要监听的端口,sync() 同步等待完成ChannelFuture channelFuture = serverBootstrap.bind(port).sync();// 对关闭通道进行监听channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {// 释放资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}static class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel active...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("channel read...");// 读数据并处理请求ByteBuf reqBuf = (ByteBuf) msg;byte[] reqBytes = new byte[reqBuf.readableBytes()];reqBuf.readBytes(reqBytes);System.out.println("Request data: " + new String(reqBytes, StandardCharsets.UTF_8));// 响应客户端请求System.out.println("channel writing...");ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Client!", StandardCharsets.UTF_8));ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8));ctx.channel().flush();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("channel read complete...");ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("channel exception...");cause.printStackTrace();ctx.close();}}public static void main(String[] args) {// 启动 Netty ServerNettyServer nettyServer = new NettyServer(9000);nettyServer.start();}}

Netty 客户端

package com.lyyzoo.netty.netty;import java.nio.charset.StandardCharsets;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) {// 线程组EventLoopGroup group = new NioEventLoopGroup(1);try {// 客户端启动类入口Bootstrap bootstrap = new Bootstrap();bootstrap.group(group)// 设置管道.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)// 管道初始化器.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// SocketChannel 的处理管道ChannelPipeline channelPipeline = socketChannel.pipeline();// 添加处理器channelPipeline// 自定义的客户端处理器.addLast(new NettyClientHandler());}});// connect() 发起异步连接,sync() 同步等待连接成功ChannelFuture channelFuture = bootstrap.connect("localhost", 9000).sync();// 等待客户端连接关闭channelFuture.channel().closeFuture().sync();System.out.println("Client close...");} catch (Exception e) {e.printStackTrace();} finally {// 释放资源group.shutdownGracefully();}}static class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel active...");ctx.channel().write(Unpooled.copiedBuffer("Hello Netty Server!", StandardCharsets.UTF_8));ctx.channel().write(Unpooled.copiedBuffer("Hello World!!!", StandardCharsets.UTF_8));ctx.channel().flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("channel read...");ByteBuf resBuf = (ByteBuf) msg;byte[] resBytes = new byte[resBuf.readableBytes()];resBuf.readBytes(resBytes);System.out.println("Response data: " + new String(resBytes, StandardCharsets.UTF_8));// 关闭通道ctx.channel().close();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("channel read complete...");ctx.flush();}}}Netty 核心组件

下面来看下Demo程序中用到的一些组件和配置。

ServerBootstrap

ServerBootstrap 是 Netty 服务端的启动类入口,主要就是通过它来设置服务端启动相关的参数。底层通过门面模式对各种能力进行抽象封装,尽量不让用户跟过多的底层API打交道,降低开发难度。与之相对应, Bootstrap 则是客户端的启动辅助类。

最后通过 bind() 方法调用操作系统的 bind 和 listen 函数,开始监听客户端请求。

EventLoopGroup

EventLoop 是 Netty 处理事件的核心机制,EventLoopGroup 类似于一个线程组,内部包含多个 EventLoop。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel,Selector 的轮询操作由 EventLoop 绑定的线程执行。

ServerBootstrap 设置了两个 EventLoopGroup,一个为 bossGroup,一个为 workerGroup,bossGroup 负责轮询监听端口;workerGroup 负责处理 Socket 连接请求。

ServerSocketChannel

接着 channel(NioServerSocketChannel.class) 设置了监听端口的管道为 NioServerSocketChannel,负责和底层操作系统打交道,监听端口,创建 SocketChannel。

在 BIO 中对应的为 ServerSocket,在 NIO 中对应 ServerSocketChannel。

ChannelHandler

通过 childHandler() 设置管道初始化器,可以通过这个初始化器添加 SocketChannel 自定义处理器。Demo 中,我们在自定义的 NettyClientHandler 和 NettyServerHandler 中实现客户端与服务端间的网络数据交互和请求处理。

ChannelHandler 是 Netty 提供给用户用于扩展和定制的关键接口,利用 ChannelHandler 可以完成大多数的功能定制,例如业务逻辑、消息编码、心跳、安全认证、流量控制等。

ChannelPipeline

SocketChannel 在创建的时候就会初始化一个管道 ChannelPipeline,可以通过 SocketChannel 获取到这个管道,然后添加一系列自定义处理器。

网络事件以事件流的形式在 ChannelPipeline 中流转,ChannelPipeline 就是负责处理网络事件的责任链,负责管理和执行 ChannelHandler。

ChannelFuture

Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,ChannelFuture 对象就代表这个异步操作本身,这是 Netty 实现异步 IO 的基础之一。

Netty 的异步编程模型都是建立在 Future 与回调概念之上的,Netty 扩展了 Java 标准的 Future,提供了针对自己场景的特有 Future 定义。

SO_BACKLOG

option(ChannelOption.SO_BACKLOG, 128) 设置了一个 BACKLOG 参数,这个参数就对应了TCP三次握手的 ACCEPT(全连接) 队列的大小,这个参数就是在调用 listen 函数时所需的 backlog 参数 。

listen 函数的Linux头文件以及函数定义如下:

#include <sys/socket.h>int listen(int sockfd, int backlog);

listen 函数会根据传入的 backlog 参数与系统的 /proc/sys/net/core/somaxconn 取二者的较小值。如果 ACCEPT 队列满了,就会拒绝客户端连接。

SO_KEEPALIVE

childOption(ChannelOption.SO_KEEPALIVE, true) 设置了 KEEPALIVE 参数。

TCP网络连接通过三次握手建立后,默认在2个小时内没发送过任何网络包时,如果设置了 KEEPALIVE=true,此时就会向对方发送探测包,防止对方已经断开连接,而自己还占着资源。

维护TCP长连接时系统配置的参数有如下几个:

# 空闲多长时间发送探测包net.ipv4.tcp_keepalive_time = 7200# 探测失败的重试次数,如果多次探测对方都没有回应,则关闭自己这端的连接net.ipv4.tcp_keepalive_probes = 9# 发送探测包的周期net.ipv4.tcp_keepalive_intvl = 75

ByteBuf

当我们进行数据传输时,往往会用到缓冲区,常用的就是 NIO 中的 java.nio.Buffer。Netty 则扩展了 NIO Buffer,提供了自己的 ByteBuf 实现。

在 channelRead 中我们将 msg 转换为 ByteBuf,这是因为 SocketChannel 中接收的字节数据会放入 ByteBuf 缓冲区中。通过 ByteBuf 的 readableBytes 方法可以获取缓冲区可读的字节数,根据可读的字节数创建 byte 数组,通过 ByteBuf 的 readBytes 方法将缓冲区中的字节数组复制到新建的 byte 数组中,最后通过 new String 构造函数获取请求报文。在响应数据时,也需要将消息先写入缓冲区 ByteBuf 中,再写入 SocketChannel。

Netty 通信原理

我们通过下图结合Demo程序再来看下 Netty 的核心组件是如何工作的。

EventLoopGroup 维护了一组 EventLoop,EventLoop 用来处理 Channel 生命周期中发生的事件。每个 EventLoop 会绑定一个 Thread 和 Selector,Netty 中的 Channel 会注册到一个 EventLoop 中,最后其实就是注册到 Selector 上,由 Selector 来监听 Channel 的事件,监听到事件后就交由 Thread 来处理。

Netty 服务端通过 ServerBootstrap 来启动,ServerBootstrap 首先需要设置两个 EventLoopGroup,一个用于监听客户端的TCP连接请求;一个用于处理建立好连接的 Channel 的IO请求。

ServerBootstrap 调用 bind() 方法绑定地址时,底层会调用操作系统的 socket()、bind()、listen() 函数,然后得到一个监听通道 ServerSocketChannel。这个 ServerSocketChannel 会注册到 EventLoop 中,由其绑定的 Selector 来监听 ServerSocketChannel 中的事件。

Netty 客户端通过 Bootstrap 来启动,Bootstrap 需设置一个 EventLoopGroup。Bootstrap 调用 connect() 方法连接服务端,底层会调用操作系统的 socket()、connect() 函数。通过TCP三次握手建立连接后,双方都会得到一个 SocketChannel,然后注册到 EventLoop 中,由其绑定的 Selector 来监听 SocketChannel 中的事件。

通过TCP三次握手建立的连接 SocketChannel 初始化时就会创建一个 ChannelPipeline,然后由 ChannelInitializer 来初始化,添加各种输入/输出处理器,来处理IO请求。ChannelPipeline 会负责事件在职责链中的有序传播,职责链可以选择监听和处理自己关心的事件。

原文:https://juejin.cn/post/7100845891346497549
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
江西蓝天学院和赣江职业技术学院哪个好? 江西有什么好的中专学校 什么是单点登录(SSO),以及单点登录的实现流程 前端实现单点登录(SSO) 前端单点登录如何实现 面试官:来说说单点登录的三种实现方式 为什么很多人不买哈弗H9 座机如何开通国内长途 座机如何申请 辽宁省出租车手续怎么办? Netty源码篇8-Pipeline Handler HandlerContext创建和调度 handler源码... Reactor-Netty基本抽象类介绍 飞机channel是什么意思? 卧室照光时间多久可以 卧室照明需要多久时间? Golang channel 三大坑,你踩过了嘛? 猛虎为其兽皮而死是什么意思 微信红包挂可以控制别人发包抢包大小么 vivox9怎么关闭开发者模式 我最爱的家人最后一集怎么不播了 我最爱的家人大结局今晚为啥不播了 我最爱的家人大结局怎么不更新了 我最爱的家人最后一集怎么不更新了 我最爱的家人中央8怎么不播了 我最爱的家人最后两集今晚8频道为啥不播放? 我最爱的家人大结局怎么不播了呀 安化周边踏青好去处_益阳安化旅游景点推荐 益阳好玩的地方推荐 益阳好玩的地方有什么 雷峰湖地质公园属哪个省? ...音乐,绘画作品,民间风俗???急急急急啊啊啊啊啊啊 卤味中加白糖有什么作用? 协程与Channels (CSP: Kotlin, Golang) 世界知名品牌钻石项链有哪些?世界十大钻石项链品牌 周大福的钻戒和爱恋珠宝的钻戒哪个牌子的钻戒性价比好? 《VR女友》无设备游戏教程介绍_《VR女友》无设备游戏教程是什么_百度知 ... 等雨停写成成繁体字 万杰揉面机的质量怎么样? 万杰揉面机质量是否靠谱? 我是在淘宝出售自己家蜂蜜的,希望亲们能给一些比较大众化的蜂蜜检验方... 我想买部万杰揉面机,大家认为可以吗? 请问揉面机应该怎么选择呢 在赛尔号里除了已经绝版的稀有精灵有哪些? 建筑设计依据有哪些 整天上班比较忙,懒得去逛超市,能在网上买乐渔鱼豆腐吗? ...种乐渔牌的鱼豆腐,味道超赞的,想买点吃吃,网上能不能买到啊?_百度... 经常给儿子吃乐渔的鱿鱼头,现在乐渔还有什么更好吃的东西麽? 钢筋采用百分二十五节头不受限制 姐姐说想吃鱼豆腐,哪儿有得卖啊? 钢筋梁接头不超百分之二十五按什么计算 二十五毫米,钢管的横截面面积是多少 双层双向的筏板百分之二十五的接头率是指上下层的钢筋化分吗?_百度...