一款BIO网络通信框架
三大组件
- channel
- buffer
- selector
java.noi中的byteBuffer
读写都需要切换模式,因为byteBuffe内部有三个指针,当读写的时候postition指针都会从头开始,limit来限制能读多少和写多少,capacity能存储多少,对于数据未读完,需要调用compact()方法来压缩数据
1 2 3 4 5 6 7 8 9
| FileChannel channel = new FileInputStream("data.txt").getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); while (channel.read(buffer) != -1) { buffer.flip(); while (buffer.hasRemaining()) { log.info("{}", (char) buffer.get()); } buffer.clear(); }
|
Demo
server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public class Server { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server receive msg:" + msg); } }); } }).bind(9527); } }
|
client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
public class Client { public static void main(String[] args) throws InterruptedException { new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { channel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("127.0.0.1", 9527)) .sync() .channel() .writeAndFlush("hello, world!"); } }
|
关闭操作处理
如果直接将channel关闭后,去做一些操作,可能会导致关闭后的操作并不是关闭后去做处理,可能会在关闭前就执行了。因为不在一个线程中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { channel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("127.0.0.1", 9527));
Channel channel = channelFuture.sync().channel(); new Thread(() -> { channel.close(); System.out.println("关闭之后的一些操作"); }, "testThread");
|
解决方式1
1 2 3 4
| new Thread(() -> { channel.closeFuture(); System.out.println("关闭之后的一些操作"); });
|
解决方式2
通过对channelFuture添加addListener中的 operationComplete 方法中执行后置操作
1 2 3 4 5 6 7 8
| ChannelFuture channelFuture = new Bootstrap().connect();
channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("这里面来执行关闭之后的操作"); } });
|
Future 和 Promise
JDK中的Future,表示异步执行任务,但是需要任务执行完之后才能获取结果
netty中的Future,与jdk类似也需要任务执行完成后才能获取结果
netty中的promise,不仅有netty的future的功能,并且能够将任务返回的结果与任务解耦,作为两个线程传递中间者
jdk中的future
1 2 3 4 5 6 7 8 9
| FutureTask futureTask = new FutureTask<>(new Callable<Object>() { @Override public Object call() throws Exception { Thread.sleep(1000); return null; } }); futureTask.run(); System.out.println(futureTask.get());
|
netty中future
1 2 3 4 5 6 7 8 9 10 11
| NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); EventLoop eventLoop = eventExecutors.next();
Future<Object> submit = eventLoop.submit(new Callable<Object>() { @Override public Object call() throws Exception { Thread.sleep(1000); return "ok"; } }); System.out.println(submit.get());
|
netty中的promise
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); EventLoop eventLoop = eventExecutors.next(); Promise<Integer> promise = eventLoop.newPromise();
new Thread(() -> { try { Thread.sleep(2000); promise.setSuccess(1); } catch (InterruptedException e) { e.printStackTrace(); }
}).start(); System.out.println(promise.get());
|
handler 和 pipeline
channelhandle用于处理channel中各种事件处理
入站一般使用 ChannelInboundHandlerAdapter
出站一般使用 ChannelOutboundHandlerAdapter
关于出站入站顺序,下图中,绿色代表入站,蓝色代表出站;入站按照入站实现进行调用,如果中间有出站的实现则会跳过,后序直接调用chnnel的writeandflush方法会从tail向前开始查找出站的实现,调用ctx的write的话是可能会略掉一部分出站实现,因为这样出站的顺序是按照当前ctx调用顺序向前找的。

ByteBuf
- 堆内存(受jvm管理)
- 直接内存(jvm管理不到,由系统管理)
1 2
| ByteBufAllocator.DEFAULT.heapBuffer(); ByteBufAllocator.DEFAULT.directBuffer();
|
组成
主要由容量和最大容量、写指针、读指针。
最大容量用于扩容,最大容量和容量之间是可扩容的容量
写指针和读指针分别表示读和写到那个位置了

内存的释放
对于直接内存由于未被jvm所管理,所以接内存需要手动来释放,并且应该最后调用者来释放(谁最后使用谁来释放)
slice方法
采用零拷贝,效率高。不用再次拷贝到用户态,直接从内核态复制到socket或者网卡中。如果复制过后手动将内存释放掉了,会导致复制后的报错
duplicate方法
与slice一致,但是回收不会导致复制后的不会报错
copy方法
直接拷贝底层数据,与bytebuf无关系
粘包、半包问题
- 声明消息固定长度,解析时到达这个长度才会解析
- 使用分隔符
- 自定义协议