一款BIO网络通信框架

三大组件

  1. channel
  2. buffer
  3. selector

java.noi中的byteBuffer

读写都需要切换模式,因为byteBuffe内部有三个指针,当读写的时候postition指针都会从头开始,limit来限制能读多少和写多少,capacity能存储多少,对于数据未读完,需要调用compact()方法来压缩数据

1
2
3
4
5
6
7
8
9
FileChannel channel = new FileInputStream("data.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer) != -1) {
buffer.flip();
while (buffer.hasRemaining()) {
log.info("{}", (char) buffer.get());
}
buffer.clear();
}

Demo

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author boluo
* @date 2023/11/25
*/
public class Server {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server receive msg:" + msg);
}
});
}
}).bind(9527);
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @author boluo
* @date 2023/11/25
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 9527))
.sync()
.channel()
.writeAndFlush("hello, world!");
}
}

关闭操作处理

如果直接将channel关闭后,去做一些操作,可能会导致关闭后的操作并不是关闭后去做处理,可能会在关闭前就执行了。因为不在一个线程中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 9527));

Channel channel = channelFuture.sync().channel();
new Thread(() -> {
// >>>> 可能会导致操作顺序不确定, close方法并不是当前这个testThread在同一线程执行
channel.close();
System.out.println("关闭之后的一些操作");
// <<<<
}, "testThread");

解决方式1

1
2
3
4
new Thread(() -> {
channel.closeFuture();// 不用close
System.out.println("关闭之后的一些操作");
});

解决方式2

通过对channelFuture添加addListener中的 operationComplete 方法中执行后置操作

1
2
3
4
5
6
7
8
ChannelFuture channelFuture = new Bootstrap().connect();

channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("这里面来执行关闭之后的操作");
}
});

Future 和 Promise

JDK中的Future,表示异步执行任务,但是需要任务执行完之后才能获取结果

netty中的Future,与jdk类似也需要任务执行完成后才能获取结果

netty中的promise,不仅有netty的future的功能,并且能够将任务返回的结果与任务解耦,作为两个线程传递中间者

jdk中的future

1
2
3
4
5
6
7
8
9
FutureTask futureTask = new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws Exception {
Thread.sleep(1000);
return null;
}
});
futureTask.run();
System.out.println(futureTask.get());

netty中future

1
2
3
4
5
6
7
8
9
10
11
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
EventLoop eventLoop = eventExecutors.next();

Future<Object> submit = eventLoop.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
Thread.sleep(1000);
return "ok";
}
});
System.out.println(submit.get()); // 或者getNow, 但是可能会去不了结果

netty中的promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
EventLoop eventLoop = eventExecutors.next();
Promise<Integer> promise = eventLoop.newPromise(); // 作为中间者-传值

new Thread(() -> {
try {
Thread.sleep(2000);
promise.setSuccess(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

}).start();
System.out.println(promise.get());

handler 和 pipeline

channelhandle用于处理channel中各种事件处理

入站一般使用 ChannelInboundHandlerAdapter

出站一般使用 ChannelOutboundHandlerAdapter

关于出站入站顺序,下图中,绿色代表入站,蓝色代表出站;入站按照入站实现进行调用,如果中间有出站的实现则会跳过,后序直接调用chnnel的writeandflush方法会从tail向前开始查找出站的实现,调用ctx的write的话是可能会略掉一部分出站实现,因为这样出站的顺序是按照当前ctx调用顺序向前找的。

image-20231128142417472

ByteBuf

  1. 堆内存(受jvm管理)
  2. 直接内存(jvm管理不到,由系统管理)
1
2
ByteBufAllocator.DEFAULT.heapBuffer();
ByteBufAllocator.DEFAULT.directBuffer();

组成

主要由容量和最大容量、写指针、读指针。

最大容量用于扩容,最大容量和容量之间是可扩容的容量

写指针和读指针分别表示读和写到那个位置了

image-20231128152953868

内存的释放

对于直接内存由于未被jvm所管理,所以接内存需要手动来释放,并且应该最后调用者来释放(谁最后使用谁来释放)

slice方法

采用零拷贝,效率高。不用再次拷贝到用户态,直接从内核态复制到socket或者网卡中。如果复制过后手动将内存释放掉了,会导致复制后的报错

duplicate方法

与slice一致,但是回收不会导致复制后的不会报错

copy方法

直接拷贝底层数据,与bytebuf无关系

粘包、半包问题

  1. 声明消息固定长度,解析时到达这个长度才会解析
  2. 使用分隔符
  3. 自定义协议