Netty是什么
Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
为什么选择Netty
- 事件驱动模型
- 避免多线程
- 单线程处理多任务
- 非阻塞I/O,I/O读写不再阻塞,而是返回0
- 基于block的传输,通常比基于流的传输更高效
- 更高级的IO函数,zero-copy
- IO多路复用大大提高了Java网络应用的可伸缩性和实用性
什么是NIO
IO操作有两个步骤:
1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
传统的BIO代码:
public class BIO {
ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(8088);
while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
Socket socket = serverSocket.accept();
executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
}
class ConnectIOnHandler extends Thread{
private Socket socket;
public ConnectIOnHandler(Socket socket){
this.socket = socket;
}
public void run(){
while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
String someThing = socket.read()....//读取数据
if(someThing!=null){
......//处理数据
socket.write()....//写数据
}
}
}
}
这是一个经典的每连接每线程的模型,之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质:
- 利用多核。
- 当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源
现在的多线程一般都使用线程池,可以让线程的创建和回收成本相对较低。在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的I/O并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。
不过,这个模型最本质的问题在于,严重依赖于线程。但线程是很"贵"的资源,主要表现在:
- 线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。
- 线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。
- 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
- 容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。
所以,当面对十万甚至百万级连接的时候,传统的BIO模型是无能为力的。随着移动端应用的兴起和各种网络游戏的盛行,百万级长连接日趋普遍,此时,必然需要一种更高效的I/O处理模型。
Java中的NIO使用的是Selector模式,所有的Channel都会向Selector中注册,Selector使用了IO多路复用,可以同时监听处理多个SocketChannel。我们可以通过遍历Selector来检测Channel是否有数据读写操作。NIO模式在等待数据的过程是非阻塞的,而是通过死循环遍历Selector。
原生NIO代码
public class BasicNioServer implements Runnable{
//1 多路复用器(管理所有的通道)
private Selector seletor;
//2 建立缓冲区
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3
// private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public BasicNioServer(int port){
try {
//1 打开路复用器
this.seletor = Selector.open();
//2 打开服务器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 设置服务器通道为非阻塞模式
ssc.configureBlocking(false);
//4 绑定地址
ssc.bind(new InetSocketAddress(port));
//5 把服务器通道注册到多路复用器上,并且监听阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//1 必须要让多路复用器开始监听
this.seletor.select();
//2 返回多路复用器已经选择的结果集
Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
//3 进行遍历
while(keys.hasNext()){
//4 获取一个选择的元素
SelectionKey key = keys.next();
//5 直接从容器中移除就可以了
keys.remove();
//6 如果是有效的
if(key.isValid()){
//7 如果为阻塞状态
if(key.isAcceptable()){
this.accept(key);
}
//8 如果为可读状态
if(key.isReadable()){
this.read(key);
}
//9 写数据
if(key.isWritable()){
//this.write(key); //ssc
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// private void write(SelectionKey key){
// //ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// //ssc.register(this.seletor, SelectionKey.OP_WRITE);
// }
private void read(SelectionKey key) {
try {
//1 清空缓冲区旧的数据
this.readBuf.clear();
//2 获取之前注册的socket通道对象
SocketChannel sc = (SocketChannel) key.channel();
//3 读取数据
int count = sc.read(this.readBuf);
//4 如果没有数据
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
this.readBuf.flip();
//6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收缓冲区数据
this.readBuf.get(bytes);
//8 打印结果
String body = new String(bytes).trim();
System.out.println("Server : " + body);
// 9..可以写回给客户端数据
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//1 获取服务通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 执行阻塞方法
SocketChannel sc = ssc.accept();
//3 设置阻塞模式
sc.configureBlocking(false);
//4 注册到多路复用器上,并设置读取标识
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new BasicNioServer(8765)).start();;
}
}
- 需要了解复杂的API:SocketChannel、ByteBuffer、Selector等
- ByteBuffer读写索引是同一个,容易操作不当
- 编写代码复杂
- 不支持心跳机制、连接监控
Netty的方式
public class NettyServer {
private ServerBootstrap serverBootstrap;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelFuture future;
private Map<String, ChannelHandlerContext> ctxMap;
public NettyServer() {
ctxMap = new HashMap<>();
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new ServerHandler(NettyServer.this));
}
});
}
public void doBind() {
try {
future = serverBootstrap.bind("127.0.0.1", 12345).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public Map<String, ChannelHandlerContext> getCtxMap() {
return this.ctxMap;
}
/**
*
* @param ip
* @param msg
*/
public void sendMsg(String ip, String msg) {
ChannelHandlerContext handlerContext = ctxMap.get(ip);
if (handlerContext == null) {
System.out.println("找不到客户端信息");
return;
}
handlerContext.writeAndFlush(msg);
}
public static void main(String[] args) {
NettyServer nettyServer = new NettyServer();
new Thread(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
String[] info = input.split(" ");
nettyServer.sendMsg(info[0], info[1]);
}
});
nettyServer.doBind();
}
}
Netty处理消息
public class ClientHandler extends ChannelInboundHandlerAdapter {
public NettyClient nettyClient;
public ClientHandler(NettyClient nettyClient) {
= nettyClient;
}
/**
* channel注册成功
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* channel被关闭
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接被关闭,准备进行重连");
nettyClient.doConnect();
}
/**
* channel读取消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到服务端消息:" + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
/**
* 事件触发回调
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
//读超时
case READER_IDLE:
//写超时
case WRITER_IDLE:
//读写都超时
case ALL_IDLE:
doPing(ctx);
break;
}
}
}
private void doPing(ChannelHandlerContext ctx) {
ctx.writeAndFlush("ping");
}
}
Netty的核心组件:
- Bootstrap
- EventLoopGroup
- EventLoop
- SocketChannel
- ChannelInitializer
- ChannelPipeline
- ChannelHandler
Netty高性能的原因:
-
使用nio
-
Netty使用的是Reactor的线程模型
Netty线程模型.png -
ByteBuf极大地优化了IO操作
即所谓的 Zero-copy, 就是在操作数据时, 不需要将数据 buffer 从一个内存区域拷贝到另一个内存区域. 因为少了一次内存的拷贝, 因此 CPU 的效率就得到的提升.
在 OS 层面上的 Zero-copy 通常指避免在 用户态(User-space) 与 内核态(Kernel-space) 之间来回拷贝数据. 例如 Linux 提供的 mmap 系统调用, 它可以将一段用户空间内存映射到内核空间, 当映射成功后, 用户对这段内存区域的修改可以直接反映到内核空间; 同样地, 内核空间对这段区域的修改也直接反映用户空间. 正因为有这样的映射关系, 我们就不需要在 用户态(User-space) 与 内核态(Kernel-space) 之间拷贝数据, 提高了数据传输的效率.
而需要注意的是, Netty 中的 Zero-copy 与上面我们所提到到 OS 层面上的 Zero-copy 不太一样, Netty的 Zero-coyp 完全是在用户态(Java 层面)的, 它的 Zero-copy 的更多的是偏向于 优化数据操作 这样的概念.
Netty的事件处理模型:
Netty事件处理模型.png
Netty源码分析
MultithreadEventLoopGroup.java设置默认的线程数
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
{}", DEFAULT_EVENT_LOOP_THREADS);
}
}
EventLoopGroup和EventLoop
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
/*******************************NioEventLoopGroup.java*******************************/
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop的代码
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
Netty粘包拆包
TCP为确保传输的准确性,设置了ack应答机制;TCP为了减少ack的次数,优化网络传输,使用了一个传输缓冲区,默认机制是一定时间内当缓冲区的数据达到一定大小的时候,才将数据传输出去。当我们频繁传输短消息的时候,就会引发TCP的粘包现象。TCP拆包是相对于粘包的,拆包是指把TCP传输的数据,准确的拆成真实的传输数据。
Netty提供了解决粘包拆包的解码器:
- LineBasedFrameDecoder 换行符
- DelimiterBaseFrameDecoder 分隔符
- FixdLengthFrameDecoder 定长
- StringDecoder 将消息解析成字符串
###Client端:
new Thread(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("--------------run----------------------");
String str = "7月不下雪";
for (int i = 0; i < 100; i++) {
//换行符作为数据完整性的标志
client.sendMsg(str + i + System.lineSeparator());
}
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String input = scanner.next();
client.sendMsg(input);
}
}).start();
###服务端
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//为了得到正确的拆包,必须把正确的解码器放在第一位
pipeline.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new ServerHandler(NettyServer.this));
}
});
需要注意的是 group.shutdownGracefully();
会关闭EventLoopGroup
,Channel
依赖于EventLoopGroup
来进行事件分发,所以会导致Channel重连失败
。
public boolean doConnect() {
try {
future = bootstrap.connect("127.0.0.1", 12345).sync();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("---------------------重连成功-----------------------");
}else {
System.out.println("---------------------重连失败-----------------------");
}
}
});
log.error("建立连接------------------------");
//这是一个阻塞的方法,监听服务端连接断开事件
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} finally {
log.error("连接关闭------------------------");
// 这段代码会关闭EventLoopGroup,所以不能放在finally,不然服务端关闭连接的时候,会捕获到异常,最后执行这段代码,会导致重连失败
// group.shutdownGracefully();
}
return true;
}
Netty天然支持心跳机制:
public NettyClient() {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
//监听Channel读写状态,在ChannelHandler中可以重写回调方法处理对应的事件
.addLast(new IdleStateHandler(5, 5,5))
.addLast(new ClientHandler(NettyClient.this));
}
});
}
/**
* ClientHandler.java
* 事件触发回调
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
//读超时
case READER_IDLE:
//写超时
case WRITER_IDLE:
//读写都超时
case ALL_IDLE:
//发送心跳
doPing(ctx);
break;
}
}
}