netty入门demo(一)

前言

最近做一个项目:

  1. 大概需求: 多个温度传感器不断向java服务发送温度数据,该传感器采用socket发送数据;该数据以$符号开头和结尾,最后将处理的数据存入数据库;
  2. 我想到的处理方式:采用netty来接收和处理数据,然后用mybatis将处理后的数据存入数据库;

我在这之前从来没使用过netty,在网上倒是看到不少关于netty的文章,如今就趁着这个项目写一下我所学到的东西和遇到的问题,又是怎么去解决的;

接下来的几篇文章都是围绕着这个项目来写的;本篇主要写netty的入门demo;

正文

代码部分

新建一个maven项目

首先在pom.xml中导入:

1
2
3
4
5
6
7
8

 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha1</version>
        </dependency>
 

服务端

1. DiscardServer类,netty的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public class DiscardServer {
    public void run(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        System.out.println("准备运行端口:" + port);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b = b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ChildChannelHandler());
            //绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服务监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            //退出,释放线程资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new DiscardServer().run(8080);
    }
}

2. ChildChannelHandler类:

1
2
3
4
5
6
7
8

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new DiscardServerHandler());
    }
}

3. DiscardServerHandler类

在这里是继承的ChannelHandlerAdapter类,当然还可以继承其他的类,例如SimpleChannelInboundHandler,ChannelInboundHandlerAdapter都可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

public class DiscardServerHandler extends ChannelHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        try {
            ByteBuf in = (ByteBuf) msg;
            System.out.println("传输内容是");
            System.out.println(in.toString(CharsetUtil.UTF_8));
            ByteBuf resp= Unpooled.copiedBuffer("收到信息$".getBytes());
            ctx.writeAndFlush(resp);
        }  finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 出现异常就关闭
        cause.printStackTrace();
        ctx.close();
    }

}

启动netty服务;

好了,到这里就能开始接收数据了;

客服端

1.TimeClient类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

public class TimeClient {
    public void connect(int port,String host)throws Exception{
        //配置客户端
        System.out.println(port+"--"+host);
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        try {
            Bootstrap b=new Bootstrap();
            b.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //绑定端口,同步等待成功
            ChannelFuture f = b.connect(host,port).sync();
            //等待服务监听端口关闭
            f.channel().closeFuture().sync();
        }finally {
            //优雅退出,释放线程资源
            eventLoopGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
       new TimeClient().connect(8090,"localhost");
    }
}

2.TimeClientHandler 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

public class TimeClientHandler extends ChannelHandlerAdapter {
    private byte[] req;
    public TimeClientHandler(){
        req="$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$".getBytes();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message=null;
        for(int i=0;i<100;i++){
            message=Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            ByteBuf in = (ByteBuf) msg;
            System.out.println(in.toString(CharsetUtil.UTF_8));
        }  finally {
            ReferenceCountUtil.release(msg);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 出现异常就关闭
        cause.printStackTrace();
        ctx.close();
    }

}

在channelActive类中向服务端发送100次消息

先启动服务端,再启动客户端;

测试结果一:

服务端:

1
2
3
4
5
6

传输内容是
$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$$tmb00035ET3318/08/22 11:5704026.7
传输内容是
5,027.31,20.00,20.00$$tmb00035ET3318/08/22 

客户端:

1
2
3
4

8080--localhost
收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息收到信息

由于内容太多,就不都贴出来了j,直接写结果吧:

  1. 客户端发送100次数据,但是服务端只收到了28次,然后服务端向客户端返回28次数据,客户端却只收到一次;
  2. 可以发现服务端接收的数据不是完整接收的,这里出现了拆包,粘包的问题

这里就不讨论拆包,粘包了,百度一大堆,相信你也能看明白;

解决粘包,拆包的问题

解决拆包粘包的方法有很多:

  1. 消息定长,固定每个消息的固定长度
  2. 在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
  3. 将消息分为消息头和消息体,消息头中包含标识消息总长度;
  4. 更复杂的,或者其他的协议。

由于我负责的这个项目户端发送是由$开始和结束的数据,返回的数据我也设置的$结束,所以我选择了第二种方法;

只需要在服务端的DiscardServerHandler中和客户端的ChannelInitializer中添加几行相同的代码就行了;

服务端:

1
2
3
4
5
6
7
8
9
10

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
        socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
        socketChannel.pipeline().addLast(new DiscardServerHandler());
    }
}

客户端:

在如下的位置添加如下的代码:

1
2
3
4
5
6
7
8
9

 .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });

测试结果

这里我就不发送100次数据了,值发送10次:

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00
传输内容是
tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00

客户端:

1
2
3
4
5
6
7
8
9
10
11
12

收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息
收到信息

解决我所遇到的问题了;

总结

  1. 本来我只需要写服务端的代码的,但是为了更好的演示,所以我写了客户端
  2. 本篇文章主要就是使用netty发送和接收数据,还有就是拆包和粘包的问题,当然,netty还可以做其他很多的事情;
  3. netty针对对拆包粘包的问题有很多种解决办法:例如可以用LineBasedFrameDecoder和StringDecoder组合将信息已换行符来进行拆分;也可以用我上边的解决方法来解决以特殊字符结束的信息;
  4. 在解决拆包粘包信息的时候,注意信息是否符合定义的规则,不然会处理不了数据:例如我上边的例子,如果服务端在返回信息是不以$符结尾的话,客户端是打印不出来信息的,因为客户端会认为服务端还没有发送完信息,会一直等待,而且打印不出数据;
  5. 这篇文章只是我入门netty的一个小demo,对我还是很有帮助的,当然也希望对阅读者有那么一点点帮助;
  6. 有什么不对的地方还请指正,建议也是多多益善;
  7. 源码地址
坚持原创技术分享,您的支持将鼓励我继续创作!