首页 热点资讯 义务教育 高等教育 出国留学 考研考公
您的当前位置:首页正文

Netty

2024-12-20 来源:化拓教育网
Netty是什么

Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

为什么选择Netty
  • 事件驱动模型
  • 避免多线程
  • 单线程处理多任务
  • 非阻塞I/O,I/O读写不再阻塞,而是返回0
  • 基于block的传输,通常比基于流的传输更高效
  • 更高级的IO函数,zero-copy
  • IO多路复用大大提高了Java网络应用的可伸缩性和实用性

什么是NIO
IO操作有两个步骤:
1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

image.png

传统的BIO代码:

public class BIO {
     ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
     ServerSocket serverSocket = new ServerSocket();
     serverSocket.bind(8088);
     while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
     Socket socket = serverSocket.accept();
     executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
}

class ConnectIOnHandler extends Thread{
    private Socket socket;
    public ConnectIOnHandler(Socket socket){
       this.socket = socket;
    }
    public void run(){
      while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
          String someThing = socket.read()....//读取数据
          if(someThing!=null){
             ......//处理数据
             socket.write()....//写数据
          }
      }
    }
}

这是一个经典的每连接每线程的模型,之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质:

  • 利用多核。
  • 当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源

现在的多线程一般都使用线程池,可以让线程的创建和回收成本相对较低。在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的I/O并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。

不过,这个模型最本质的问题在于,严重依赖于线程。但线程是很"贵"的资源,主要表现在:

  • 线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。
  • 线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。
  • 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
  • 容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。
    所以,当面对十万甚至百万级连接的时候,传统的BIO模型是无能为力的。随着移动端应用的兴起和各种网络游戏的盛行,百万级长连接日趋普遍,此时,必然需要一种更高效的I/O处理模型。

Java中的NIO使用的是Selector模式,所有的Channel都会向Selector中注册,Selector使用了IO多路复用,可以同时监听处理多个SocketChannel。我们可以通过遍历Selector来检测Channel是否有数据读写操作。NIO模式在等待数据的过程是非阻塞的,而是通过死循环遍历Selector。
原生NIO代码

public class BasicNioServer implements Runnable{

    //1 多路复用器(管理所有的通道)
    private Selector seletor;
    //2 建立缓冲区
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    //3 
//  private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    
    public BasicNioServer(int port){
        try {
            //1 打开路复用器
            this.seletor = Selector.open();
            //2 打开服务器通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //3 设置服务器通道为非阻塞模式
            ssc.configureBlocking(false);   
            //4 绑定地址
            ssc.bind(new InetSocketAddress(port));
            //5 把服务器通道注册到多路复用器上,并且监听阻塞事件
            ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
            
            System.out.println("Server start, port :" + port);
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(true){
            try {
                //1 必须要让多路复用器开始监听
                this.seletor.select();
                //2 返回多路复用器已经选择的结果集
                Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                //3 进行遍历
                while(keys.hasNext()){
                    //4 获取一个选择的元素
                    SelectionKey key = keys.next();
                    //5 直接从容器中移除就可以了
                    keys.remove();
                    //6 如果是有效的
                    if(key.isValid()){
                        //7 如果为阻塞状态
                        if(key.isAcceptable()){
                            this.accept(key);
                        }
                        //8 如果为可读状态
                        if(key.isReadable()){
                            this.read(key);
                        }
                        //9 写数据
                        if(key.isWritable()){
                            //this.write(key); //ssc
                        }
                    }
                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
//  private void write(SelectionKey key){
//      //ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
//      //ssc.register(this.seletor, SelectionKey.OP_WRITE);
//  }

    private void read(SelectionKey key) {
        try {
            //1 清空缓冲区旧的数据
            this.readBuf.clear();
            //2 获取之前注册的socket通道对象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 读取数据
            int count = sc.read(this.readBuf);
            //4 如果没有数据
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
            this.readBuf.flip();
            //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收缓冲区数据
            this.readBuf.get(bytes);
            //8 打印结果
            String body = new String(bytes).trim();
            System.out.println("Server : " + body);
            
            // 9..可以写回给客户端数据 
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        
    }

    private void accept(SelectionKey key) {
        try {
            //1 获取服务通道
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            //2 执行阻塞方法
            SocketChannel sc = ssc.accept();
            //3 设置阻塞模式
            sc.configureBlocking(false);
            //4 注册到多路复用器上,并设置读取标识
            sc.register(this.seletor, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        
        new Thread(new BasicNioServer(8765)).start();;
    }
}
  • 需要了解复杂的API:SocketChannel、ByteBuffer、Selector等
  • ByteBuffer读写索引是同一个,容易操作不当
  • 编写代码复杂
  • 不支持心跳机制、连接监控
Netty的方式
public class NettyServer {

    private ServerBootstrap serverBootstrap;

    private EventLoopGroup bossGroup;

    private EventLoopGroup workerGroup;

    private ChannelFuture future;


    private Map<String, ChannelHandlerContext> ctxMap;

    public NettyServer() {
        ctxMap = new HashMap<>();
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8))
                                .addLast(new StringEncoder(CharsetUtil.UTF_8))
                                .addLast(new ServerHandler(NettyServer.this));
                    }
                });
    }


    public void doBind() {
        try {
            future = serverBootstrap.bind("127.0.0.1", 12345).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public Map<String, ChannelHandlerContext> getCtxMap() {
        return this.ctxMap;
    }

    /**
     *
     * @param ip
     * @param msg
     */
    public void sendMsg(String ip, String msg) {
        ChannelHandlerContext handlerContext = ctxMap.get(ip);
        if (handlerContext == null) {
            System.out.println("找不到客户端信息");
            return;
        }

        handlerContext.writeAndFlush(msg);
    }

    public static void main(String[] args) {
        NettyServer nettyServer = new NettyServer();

        new Thread(() -> {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String input = scanner.nextLine();
                String[] info = input.split(" ");
                nettyServer.sendMsg(info[0], info[1]);
            }
        });

        nettyServer.doBind();
    }
}
Netty处理消息
public class ClientHandler extends ChannelInboundHandlerAdapter {

    public NettyClient nettyClient;

    public ClientHandler(NettyClient nettyClient) {
         = nettyClient;
    }

    /**
     * channel注册成功
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    /**
     * channel被关闭
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接被关闭,准备进行重连");
        nettyClient.doConnect();
    }

    /**
     * channel读取消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("收到服务端消息:" + msg);
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }

    /**
     * 事件触发回调
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            switch (event.state()) {
                //读超时
                case READER_IDLE:
                //写超时
                case WRITER_IDLE:
                //读写都超时
                case ALL_IDLE:
                    doPing(ctx);
                    break;
            }
        }
    }


    private void doPing(ChannelHandlerContext ctx) {
        ctx.writeAndFlush("ping");
    }
}

Netty的核心组件:

  • Bootstrap
  • EventLoopGroup
  • EventLoop
  • SocketChannel
  • ChannelInitializer
  • ChannelPipeline
  • ChannelHandler
Netty高性能的原因:
  1. 使用nio

  2. Netty使用的是Reactor的线程模型


    Netty线程模型.png
  3. ByteBuf极大地优化了IO操作

即所谓的 Zero-copy, 就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 因为少了一次内存的拷贝, 因此 CPU 的效率就得到的提升.

在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率.

而需要注意的是, Netty 中的 Zero-copy 与上面我们所提到到 OS 层面上的 Zero-copy 不太一样, Netty的 Zero-coyp 完全是在用户态(Java 层面)的, 它的 Zero-copy 的更多的是偏向于 优化数据操作 这样的概念.


Netty的事件处理模型:


Netty事件处理模型.png

Netty源码分析

MultithreadEventLoopGroup.java设置默认的线程数
 static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                 NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
             {}", DEFAULT_EVENT_LOOP_THREADS);
        }
 }
EventLoopGroup和EventLoop
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
}


/*******************************NioEventLoopGroup.java*******************************/
 @Override
 protected EventLoop newChild(Executor executor, Object... args) throws Exception {
      return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
 }

NioEventLoop的代码

    /**
     * The NIO {@link Selector}.
     */
    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    
    
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See 
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }
Netty粘包拆包

TCP为确保传输的准确性,设置了ack应答机制;TCP为了减少ack的次数,优化网络传输,使用了一个传输缓冲区,默认机制是一定时间内当缓冲区的数据达到一定大小的时候,才将数据传输出去。当我们频繁传输短消息的时候,就会引发TCP的粘包现象。TCP拆包是相对于粘包的,拆包是指把TCP传输的数据,准确的拆成真实的传输数据。

Netty提供了解决粘包拆包的解码器:
  • LineBasedFrameDecoder 换行符
  • DelimiterBaseFrameDecoder 分隔符
  • FixdLengthFrameDecoder 定长
  • StringDecoder 将消息解析成字符串
###Client端:
 new Thread(() -> {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("--------------run----------------------");
            String str = "7月不下雪";
            for (int i = 0; i < 100; i++) {
                //换行符作为数据完整性的标志
                client.sendMsg(str + i + System.lineSeparator());
            }

            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String input = scanner.next();
                client.sendMsg(input);
            }
        }).start();

###服务端
serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //为了得到正确的拆包,必须把正确的解码器放在第一位
                        pipeline.addLast(new LineBasedFrameDecoder(1024))
                                .addLast(new StringDecoder(CharsetUtil.UTF_8))
                                .addLast(new StringEncoder(CharsetUtil.UTF_8))
                                .addLast(new ServerHandler(NettyServer.this));
                    }
                });

需要注意的是 group.shutdownGracefully();会关闭EventLoopGroup,Channel依赖于EventLoopGroup来进行事件分发,所以会导致Channel重连失败

    public boolean doConnect() {
        try {
            future = bootstrap.connect("127.0.0.1", 12345).sync();
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("---------------------重连成功-----------------------");
                    }else {
                        System.out.println("---------------------重连失败-----------------------");
                    }
                }
            });
            log.error("建立连接------------------------");
            //这是一个阻塞的方法,监听服务端连接断开事件
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        } finally {
            log.error("连接关闭------------------------");
//            这段代码会关闭EventLoopGroup,所以不能放在finally,不然服务端关闭连接的时候,会捕获到异常,最后执行这段代码,会导致重连失败
//            group.shutdownGracefully();
        }

        return true;
    }

Netty天然支持心跳机制:

public NettyClient() {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8))
                                .addLast(new StringEncoder(CharsetUtil.UTF_8))
                                //监听Channel读写状态,在ChannelHandler中可以重写回调方法处理对应的事件
                                .addLast(new IdleStateHandler(5, 5,5))
                                .addLast(new ClientHandler(NettyClient.this));
                    }
                });
}

    /**
     * ClientHandler.java
     * 事件触发回调
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            switch (event.state()) {
                //读超时
                case READER_IDLE:
                //写超时
                case WRITER_IDLE:
                //读写都超时
                case ALL_IDLE:
                    //发送心跳
                    doPing(ctx);
                    break;
            }
        }
    }
显示全文