SpringBoot使用Netty实现远程调2.0

SpringBoot使用Netty实现远程调2.0

前言

不久之前溪源我发布了一篇博客时关于使用如何使用Netty实现远程调用,之前做的只是一个简单的demo,最近在此基础上,进行了进一步的扩展:包括使用反射机制获取类、异常的统一处理等,虽然有一定程度上的改进,但还是有一定程度上的不足,我会持续更新继续改进的。

SpringBoot使用Netty实现远程调用可参考我的博客:SpringBoot使用Netty实现远程调用

正文

SpringBoot使用Netty实现远程调2.0

SpringBoot使用Netty实现远程调用中我们已经可以实现了通过Netty实现远程调用的功能了,但是之前做的只是点对点的调用,现在本篇博客是在之前的基础做出了一些改动:

客户端的部分优化

NettyClientUtil:Netty客户端工具类

/**
* Netty客户端
**/
@Slf4j
public class NettyClientUtil {

    /**
     * 用于RPC远程调用
     * @param commandPOJO
     * @return
     */
    public static ResponseResult rpcNetty(CommandPOJO commandPOJO) {
        String commandPOJOStr = FastJsonUtils.convertObjectToJSON(commandPOJO);
        NettyClientHandler nettyClientHandler = new NettyClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
                .option(ChannelOption.TCP_NODELAY, true)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("decoder", new StringDecoder());
                        socketChannel.pipeline().addLast("encoder", new StringEncoder());
                        socketChannel.pipeline().addLast(nettyClientHandler);
                    }
                });
        try {
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();
            log.info("客户端发送成功....");
            //发送消息
            future.channel().writeAndFlush(commandPOJOStr);
            // 等待连接被关闭
            future.channel().closeFuture().sync();
            return nettyClientHandler.getResponseResult();
        } catch (Exception e) {
            log.error("客户端Netty失败", e);
            throw new BusinessException(CouponTypeEnum.OPERATE_ERROR);
        } finally {
            //以一种优雅的方式进行线程退出
            group.shutdownGracefully();
        }
    }

}

NettyClientHandler:客户端处理类

/**
* 客户端处理器
**/
@Slf4j
@Setter
@Getter
public class NettyClientHandler extends ChannelInboundHandlerAdapter {


    private ResponseResult responseResult;


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端Active .....");
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端收到消息: {}", msg.toString());
        this.responseResult = (ResponseResult) FastJsonUtils.convertJsonToObject(msg.toString(),ResponseResult.class);
        ctx.close();
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端部分优化

NettyServer:服务启动监听器

/**
* 服务启动监听器
**/
@Slf4j
public class NettyServer {

    public void start() {
        InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);
        //new 一个主线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //new 一个工作线程组
        EventLoopGroup workGroup = new NioEventLoopGroup(200);
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer())
                .localAddress(socketAddress)
                //设置队列大小
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        //绑定端口,开始接收进来的连接
        try {
            ChannelFuture future = bootstrap.bind(socketAddress).sync();
            log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("服务器开启失败", e);
        } finally {
            //关闭主线程组
            bossGroup.shutdownGracefully();
            //关闭工作线程组
            workGroup.shutdownGracefully();
        }
    }
}

NettyServerHandler:服务端处理器

/**
* netty服务端处理器
**/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    /**
     * 用于获取SpringBean
     */
    private static ApplicationContext context  = SpringUtil.getApplicationContext();


    /**
     * 客户端连接会触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel active......");
    }


    /**
     * 客户端发消息会触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服务器收到消息: {}", msg.toString());
        CommandPOJO commandPOJO = (CommandPOJO) FastJsonUtils.convertJsonToObject(msg.toString(),CommandPOJO.class);
        IInvokeService invokeService =context.getBean(IInvokeService.class);
        ResponseResult responseResult =invokeService.invokeMethod(commandPOJO);
        ctx.write(FastJsonUtils.convertObjectToJSON(responseResult));
        ctx.flush();
    }


    /**
     * 发生异常触发
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

InvokeServiceImpl:远程调用的服务类

@Service
@Slf4j
public class InvokeServiceImpl implements IInvokeService {

    /**
     * 调用方法(带参数)
     *
     * @param commandPOJO
     * @return
     */
    @Override
    public ResponseResult invokeMethod(CommandPOJO commandPOJO) {
        ResponseResult responseResult = ResponseResult.error();
        Class<?> tClass = null;
        Class<?> rClass = null;
        try {
            tClass = Class.forName(commandPOJO.getInvokeClass());
            try {
                rClass = Class.forName(commandPOJO.getRequestPojoPath());
            } catch (ClassNotFoundException e) {
                log.error("请求类{}不存在,异常{}", commandPOJO.getRequestPojoPath(), e);
                throw new BusinessException(CouponTypeEnum.OPERATE_ERROR, "请求类:" + commandPOJO.getRequestPojoPath() + "不存在");
            }
            Method method = tClass.getMethod(commandPOJO.getInvokeMethod(), rClass);
            return ResponseResult.success(method.invoke(tClass.newInstance(), FastJsonUtils.convertJsonToObject(commandPOJO.getRequestParamsJson(), rClass)), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc());
        } catch (ClassNotFoundException classBiz) {
            log.error("服务端接口{}不存在,异常{}", commandPOJO.getInvokeClass(), classBiz);
            responseResult.setErrorMsg("服务端该接口:" + commandPOJO.getInvokeClass() + "不存在");
        } catch (NoSuchMethodException nbiz) {
            log.error("服务端接口{}没有方法{},异常{}", commandPOJO.getInvokeClass(), commandPOJO.getInvokeMethod(), nbiz);
            responseResult.setErrorMsg("服务端接口:" + commandPOJO.getInvokeClass() + "没有该方法:" + commandPOJO.getInvokeMethod());
        } catch (JSONException jsonException) {
            log.error("请求json字符串违法{},无法转换成对应的请求类{}", commandPOJO.getRequestParamsJson(), commandPOJO.getRequestPojoPath(), jsonException);
            responseResult.setErrorMsg("请求json字符串违法:" + commandPOJO.getRequestParamsJson() + ",无法转换成对应的请求类:" + commandPOJO.getRequestPojoPath());
        } catch (BusinessException biz) {
            log.error("调用方法业务层异常", biz);
            responseResult.setErrorMsg("调用方法业务层异常" + biz.getMessage());
        } catch (Exception e) {
            log.error("调用方法异常", e);
            responseResult.setErrorMsg("调用方法异常" + e.getMessage());
        }
        return responseResult;
    }
}

RpcInvokeServiceImpl:业务方法

@Slf4j
@Service
public class RpcInvokeServiceImpl implements IRpcInvokeService {


    /**
     * 用于测试远程调用
     * @param helloReq
     * @return
     */
    @Override
    public String sysHello(SysHelloReq helloReq) {
        return helloReq.getMsg()+",你也好哇";
    }
}

验证

验证接口

@RestController
@Slf4j
@Api(value = "远程调用模块")
@RequestMapping("/xiyuanrpc")
public class RPCController {
    @PostMapping("/rpcNettybyInvoke")
    @ApiOperation(value = "rpc远程调用")
    @InvokeParameterCheck
    @MethodLogPrint
    public ResponseResult rpcNettybyInvoke(@Valid @RequestBody CommandPOJO pojo) {
        return NettyClientUtil.rpcNetty(pojo);
    }
}

通过knife4j访问测试接口
-SpringBoot使用Netty实现远程调2.0-

源码

项目源码可从的我的github中获取:github源码地址

-SpringBoot使用Netty实现远程调2.0-

未分类>
匿名

发表评论

匿名网友