Java开发入门到精通--从零开始手写 RPC—如何实现客户端调用服务端?
小职 2021-10-27 来源 :老马啸西风 阅读 454 评论 0

摘要:本篇主要介绍了Java开发入门到精通--从零开始手写 RPC—如何实现客户端调用服务端?,通过具体的内容展现,希望对Java开发的学习有一定的帮助。

本篇主要介绍了Java开发入门到精通--从零开始手写 RPC—如何实现客户端调用服务端?,通过具体的内容展现,希望对Java开发的学习有一定的帮助。

Java开发入门到精通--从零开始手写 RPC—如何实现客户端调用服务端?

写完了客户端和服务端,那么如何实现客户端和服务端的调用呢?

 

下面就让我们一起来看一下。

 

接口定义

计算方法

package com.github.houbb.rpc.common.service;

 

 

import com.github.houbb.rpc.common.model.CalculateRequest;

import com.github.houbb.rpc.common.model.CalculateResponse;

 

 

/**

 * <p> 计算服务接口 </p>

 *

 * <pre> Created: 2018/8/24 下午4:47  </pre>

 * <pre> Project: fake  </pre>

 *

 * @author houbinbin

 * @since 0.0.1

 */

public interface Calculator {

 

 

    /**

     * 计算加法

     * @param request 请求入参

     * @return 返回结果

     */

    CalculateResponse sum(final CalculateRequest request);

 

 

}

pojo

对应的参数对象:

 

CalculateRequest

package com.github.houbb.rpc.common.model;

 

 

import java.io.Serializable;

 

 

/**

 * <p> 请求入参 </p>

 *

 * <pre> Created: 2018/8/24 下午5:05  </pre>

 * <pre> Project: fake  </pre>

 *

 * @author houbinbin

 * @since 0.0.3

 */

public class CalculateRequest implements Serializable {

 

 

    private static final long serialVersionUID = 6420751004355300996L;

 

 

    /**

     * 参数一

     */

    private int one;

 

 

    /**

     * 参数二

     */

    private int two;

 

 

    public CalculateRequest() {

    }

 

 

    public CalculateRequest(int one, int two) {

        this.one = one;

        this.two = two;

    }

 

 

    //getter setter toString

 

 

}

CalculateResponse

package com.github.houbb.rpc.common.model;

 

 

import java.io.Serializable;

 

 

/**

 * <p> 请求入参 </p>

 *

 * <pre> Created: 2018/8/24 下午5:05  </pre>

 * <pre> Project: fake  </pre>

 *

 * @author houbinbin

 * @since 0.0.3

 */

public class CalculateResponse implements Serializable {

 

 

    private static final long serialVersionUID = -1972014736222511341L;

 

 

    /**

     * 是否成功

     */

   private boolean success;

 

 

    /**

     * 二者的和

     */

   private int sum;

 

 

    public CalculateResponse() {

    }

 

 

    public CalculateResponse(boolean success, int sum) {

        this.success = success;

        this.sum = sum;

    }

 

 

    //getter setter toString

}

客户端

核心部分

RpcClient 需要添加对应的 Handler,调整如下:

 

Bootstrap bootstrap = new Bootstrap();

ChannelFuture channelFuture = bootstrap.group(workerGroup)

        .channel(NioSocketChannel.class)

        .option(ChannelOption.SO_KEEPALIVE, true)

        .handler(new ChannelInitializer<Channel>(){

            @Override

            protected void initChannel(Channel ch) throws Exception {

                ch.pipeline()

                        .addLast(new LoggingHandler(LogLevel.INFO))

                        .addLast(new CalculateRequestEncoder())

                        .addLast(new CalculateResponseDecoder())

                        .addLast(new RpcClientHandler());

            }

        })

        .connect(RpcConstant.ADDRESS, port)

        .syncUninterruptibly();

netty 中的 handler 泳道设计的非常优雅,让我们的代码可以非常灵活地进行拓展。

 

接下来我们看一下对应的实现。

 

RpcClientHandler

package com.github.houbb.rpc.client.handler;

 

 

import com.github.houbb.log.integration.core.Log;

import com.github.houbb.log.integration.core.LogFactory;

import com.github.houbb.rpc.client.core.RpcClient;

import com.github.houbb.rpc.common.model.CalculateRequest;

import com.github.houbb.rpc.common.model.CalculateResponse;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

 

 

/**

 * <p> 客户端处理类 </p>

 *

 * <pre> Created: 2019/10/16 11:30 下午  </pre>

 * <pre> Project: rpc  </pre>

 *

 * @author houbinbin

 * @since 0.0.2

 */

public class RpcClientHandler extends SimpleChannelInboundHandler {

 

 

    private static final Log log = LogFactory.getLog(RpcClient.class);

 

 

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        CalculateRequest request = new CalculateRequest(1, 2);

 

 

        ctx.writeAndFlush(request);

        log.info("[Client] request is :{}", request);

    }

 

 

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        CalculateResponse response = (CalculateResponse)msg;

        log.info("[Client] response is :{}", response);

    }

 

 

}

这里比较简单,channelActive 中我们直接发起调用,入参的对象为了简单,此处固定写死。

 

channelRead0 中监听服务端的相应结果,并做日志输出。

 

CalculateRequestEncoder

请求参数是一个对象,netty 是无法直接传输的,我们将其转换为基本对象:

 

package com.github.houbb.rpc.client.encoder;

 

 

import com.github.houbb.rpc.common.model.CalculateRequest;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

 

 

/**

 * @author binbin.hou

 * @since 0.0.3

 */

public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {

 

 

    @Override

    protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {

        int one = msg.getOne();

        int two = msg.getTwo();

 

 

        out.writeInt(one);

        out.writeInt(two);

    }

 

 

}

CalculateResponseDecoder

针对服务端的响应,也是同理。

 

我们需要把基本的类型,封装转换为我们需要的对象。

 

package com.github.houbb.rpc.client.decoder;

 

 

import com.github.houbb.rpc.common.model.CalculateResponse;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

 

 

import java.util.List;

 

 

/**

 * 响应参数解码

 * @author binbin.hou

 * @since 0.0.3

 */

public class CalculateResponseDecoder extends ByteToMessageDecoder {

 

 

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        boolean success = in.readBoolean();

        int sum = in.readInt();

 

 

        CalculateResponse response = new CalculateResponse(success, sum);

        out.add(response);

    }

 

 

}

服务端

设置处理类

RpcServer 中的处理类要稍微调整一下,其他的保持不变。

 

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(workerGroup, bossGroup)

        .channel(NioServerSocketChannel.class)

        // 打印日志

        .handler(new LoggingHandler(LogLevel.INFO))

        .childHandler(new ChannelInitializer<Channel>() {

            @Override

            protected void initChannel(Channel ch) throws Exception {

                ch.pipeline()

                        .addLast(new CalculateRequestDecoder())

                        .addLast(new CalculateResponseEncoder())

                        .addLast(new RpcServerHandler());

            }

        })

        // 这个参数影响的是还没有被accept 取出的连接

        .option(ChannelOption.SO_BACKLOG, 128)

        // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。

        .childOption(ChannelOption.SO_KEEPALIVE, true);

RpcServerHandler

一开始这里是空实现,我们来添加一下对应的实现。

 

package com.github.houbb.rpc.server.handler;

 

 

import com.github.houbb.log.integration.core.Log;

import com.github.houbb.log.integration.core.LogFactory;

import com.github.houbb.rpc.common.model.CalculateRequest;

import com.github.houbb.rpc.common.model.CalculateResponse;

import com.github.houbb.rpc.common.service.Calculator;

import com.github.houbb.rpc.server.service.CalculatorService;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

 

 

/**

 * @author binbin.hou

 * @since 0.0.1

 */

public class RpcServerHandler extends SimpleChannelInboundHandler {

 

 

    private static final Log log = LogFactory.getLog(RpcServerHandler.class);

 

 

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        final String id = ctx.channel().id().asLongText();

        log.info("[Server] channel {} connected " + id);

    }

 

 

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        final String id = ctx.channel().id().asLongText();

 

 

        CalculateRequest request = (CalculateRequest)msg;

        log.info("[Server] receive channel {} request: {} from ", id, request);

 

 

        Calculator calculator = new CalculatorService();

        CalculateResponse response = calculator.sum(request);

 

 

        // 回写到 client 端

        ctx.writeAndFlush(response);

        log.info("[Server] channel {} response {}", id, response);

    }

 

 

}

读取到客户端的访问之后,我们获取到计算的入参 CalculateRequest,然后调用 sum 方法,获取到对应的 CalculateResponse,将结果通知客户端。

 

CalculateRequestDecoder

这里和客户端是一一对应的,我们首先把 netty 传递的基本类型转换为 CalculateRequest 对象。

 

package com.github.houbb.rpc.server.decoder;

 

 

import com.github.houbb.rpc.common.model.CalculateRequest;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

 

 

import java.util.List;

 

 

/**

 * 请求参数解码

 * @author binbin.hou

 * @since 0.0.3

 */

public class CalculateRequestDecoder extends ByteToMessageDecoder {

 

 

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        int one = in.readInt();

        int two = in.readInt();

 

 

        CalculateRequest request = new CalculateRequest(one, two);

        out.add(request);

    }

 

 

}

CalculateResponseEncoder

这里和客户端类似,我们需要把 response 转换为基本类型进行网络传输。

 

package com.github.houbb.rpc.server.encoder;

 

 

import com.github.houbb.rpc.common.model.CalculateResponse;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

 

 

/**

 * @author binbin.hou

 * @since 0.0.3

 */

public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {

 

 

    @Override

    protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {

        boolean success = msg.isSuccess();

        int result = msg.getSum();

        out.writeBoolean(success);

        out.writeInt(result);

    }

 

 

}

CalculatorService

服务端对应的实现类。

 

public class CalculatorService implements Calculator {

 

 

    @Override

    public CalculateResponse sum(CalculateRequest request) {

        int sum = request.getOne()+request.getTwo();

 

 

        return new CalculateResponse(true, sum);

    }

 

 

}

测试

服务端

启动服务端:

 

new RpcServer().start();

服务端启动日志:

 

[DEBUG] [2021-10-05 11:53:11.795] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.

[INFO] [2021-10-05 11:53:11.807] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端

十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered

信息: [id: 0xd399474f] REGISTERED

十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler bind

信息: [id: 0xd399474f] BIND: 0.0.0.0/0.0.0.0:9527

十月 05, 2021 11:53:13 上午 io.netty.handler.logging.LoggingHandler channelActive

信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE

[INFO] [2021-10-05 11:53:13.101] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动完成,监听【9527】端口

客户端

启动客户端:

 

new RpcClient().start();

日志如下:

 

[DEBUG] [2021-10-05 11:54:12.158] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.

[INFO] [2021-10-05 11:54:12.164] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服务开始启动客户端

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRegistered

信息: [id: 0x4d75c580] REGISTERED

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler connect

信息: [id: 0x4d75c580] CONNECT: /127.0.0.1:9527

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelActive

信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] ACTIVE

[INFO] [2021-10-05 11:54:13.403] [Thread-0] [c.g.h.r.c.c.RpcClient.run] - RPC 服务启动客户端完成,监听端口:9527

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler write

信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] WRITE: 8B

         +-------------------------------------------------+

         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |

+--------+-------------------------------------------------+----------------+

|00000000| 00 00 00 01 00 00 00 02                         |........        |

+--------+-------------------------------------------------+----------------+

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler flush

信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] FLUSH

[INFO] [2021-10-05 11:54:13.450] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelActive] - [Client] request is :CalculateRequest{one=1, two=2}

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ: 5B

         +-------------------------------------------------+

         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |

+--------+-------------------------------------------------+----------------+

|00000000| 01 00 00 00 03                                  |.....           |

+--------+-------------------------------------------------+----------------+

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:/127.0.0.1:9527] READ COMPLETE

[INFO] [2021-10-05 11:54:13.508] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=3}

可以看到,输出了对应的请求参数和响应结果。

 

当然,此时服务端也有对应的新增日志:

 

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelRead

信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9f5927, L:/127.0.0.1:9527 - R:/127.0.0.1:54030]

十月 05, 2021 11:54:13 上午 io.netty.handler.logging.LoggingHandler channelReadComplete

信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ COMPLETE

[INFO] [2021-10-05 11:54:13.432] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927

[INFO] [2021-10-05 11:54:13.495] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, two=2} from  

[INFO] [2021-10-05 11:54:13.505] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}


我是小职,记得找我

✅ 解锁高薪工作

✅ 免费获取基础课程·答疑解惑·职业测评

Java开发入门到精通--从零开始手写 RPC—如何实现客户端调用服务端?

本文由 @小职 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程