欢迎访问 生活随笔!

凯发k8官方网

当前位置: 凯发k8官方网 > 编程资源 > 编程问答 >内容正文

编程问答

手撕 rpc 2 -凯发k8官方网

发布时间:2024/9/30 编程问答 36 豆豆
凯发k8官方网 收集整理的这篇文章主要介绍了 手撕 rpc 2 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

把单个客户端改成多个呢?

在成功连接几个client后报错了,是拆解包的时候出错,为什么呢?看一下多个客户端连接时候的 netty 的模型:


因为服务程序什么时候从内核的buffer里读走数据跟客户端往buffer里写数据是两个独立的动作,所以在多客户端(多个socket)的场景下是不能整齐的读到正确的包,所以报错解码错误,io是双向的,服务端解码错误的问题在客户端也一样,怎么处理?

处理通信流的解码问题

出现这种问题底层的原因是

因为多个socket往一个buffer里写,所以要保证程序读取的时候每次都能跟包的头部(header和body的头部)对齐,而且每次readchannel()的之前程序把该次请求的包发完整。强大的 netty 给了我们 bytetomessagedecoder ,在 pipeline 的业务事件之前加上解码事件就可以了。

另外解决一些问题

1.服务器使用20个线程处理 listen socket 连接和 io socket。
2.多个 client 连接,维护一个线程池来管理这些连接。
3.使用 netty 的 bytetomessagedecoder 解决解码不正确问题。
4.使用 completablefuture 来获取客户端调用方法的返回。

package rpc;import io.netty.bootstrap.bootstrap; import io.netty.bootstrap.serverbootstrap; import io.netty.buffer.bytebuf; import io.netty.buffer.pooledbytebufallocator; import io.netty.channel.*; import io.netty.channel.nio.nioeventloopgroup; import io.netty.channel.socket.nio.nioserversocketchannel; import io.netty.channel.socket.nio.niosocketchannel; import io.netty.handler.codec.bytetomessagedecoder; import org.junit.test;import java.io.*; import java.lang.reflect.invocationhandler; import java.lang.reflect.method; import java.lang.reflect.proxy; import java.net.inetsocketaddress; import java.util.list; import java.util.random; import java.util.uuid; import java.util.concurrent.completablefuture; import java.util.concurrent.concurrenthashmap; import java.util.concurrent.atomic.atomicinteger;/*** 1.假设一个需求,写一个 rpc* 2.来回通信,连接数量,拆包* 3.动态代理,序列化,协议封装* 4.连接池*/ public class myrpctest {/*** 模拟 consumer*/@testpublic void startserver() { // 启动20个 selector,每个线程都维护列一个 io(注意不是 listen) 的 socket 的序列,如果有socket 进来了,就处理,这是 netty 内部自己实现的nioeventloopgroup eventexecutors = new nioeventloopgroup(20);nioeventloopgroup worker = eventexecutors;serverbootstrap serverbootstrap = new serverbootstrap();channelfuture localhost = serverbootstrap.group(eventexecutors, worker).channel(nioserversocketchannel.class).childhandler(new channelinitializer<niosocketchannel>() {@overrideprotected void initchannel(niosocketchannel niosocketchannel) {channelpipeline pipeline = niosocketchannel.pipeline();pipeline.addlast(new mydecoder());pipeline.addlast(new serverrequesthandler());}}).bind(new inetsocketaddress("localhost", 9090));try {localhost.sync().channel().closefuture().sync();} catch (interruptedexception e) {e.printstacktrace();}}@testpublic void get() {// 开启一个线程模拟服务器new thread(() -> {startserver();}).start();try {thread.sleep(2000);} catch (interruptedexception e) {e.printstacktrace();}system.out.println("server start ....");// 获取服务atomicinteger atomicinteger = new atomicinteger();int size = 30;thread[] threads = new thread[size];for (int i = 0; i < size; i) {threads[i] = new thread(() -> {car car = proxyget(car.class);int args = atomicinteger.incrementandget();string s = car.race("client: " args);// 打印传参看是不是服务能对应上system.out.println("client got args: " s " --- " args);});}for (thread thread : threads) {thread.start();}try {system.in.read();} catch (ioexception e) {e.printstacktrace();}}private static <t> t proxyget(class<t> interfaceinfo) {// 实现各个版本的动态代理classloader classloader = interfaceinfo.getclassloader();class<?>[] interfaces = {interfaceinfo};// 用 jdk 的动态代理实现return (t) proxy.newproxyinstance(classloader, interfaces, new invocationhandler() {@overridepublic object invoke(object proxy, method method, object[] args) throws throwable {// 1.调用,服务、方法、参数,封装成 contentstring name = interfaceinfo.getname(); // 服务名string methodname = method.getname(); // 方法名class<?>[] parametertypes = method.getparametertypes(); // 方法的返回类型// 2.把调用服务的信息封装成一个可以序列化的对象// 先封装 bodymycontent content = new mycontent();content.setname(name);content.setmethodname(methodname);content.setparametertypes(parametertypes);content.setargs(args);// 把 content 做成字节数组准备写出去bytearrayoutputstream bos = new bytearrayoutputstream();objectoutputstream oos = new objectoutputstream(bos);oos.writeobject(content);byte[] msgbody = bos.tobytearray();// 再封装 header,header 需要 body 信息,myheader myheader = createheader(msgbody);bos.reset();oos = new objectoutputstream(bos);oos.writeobject(myheader);byte[] msgheader = bos.tobytearray();// 3.服务准备好了,接下来准备连接,模拟一个 size 是1的连接池clientfactory factory = clientfactory.getfactory();niosocketchannel client = factory.getclient(new inetsocketaddress("localhost", 9090));bytebuf bytebuf = pooledbytebufallocator.default.directbuffer(msgheader.length msgbody.length);// 用这个控制结果返回,其中 complete() 方法异步计算结果,get()方法阻塞获取结果completablefuture<string> future = new completablefuture<>();responsemappinghandler.addcallback(myheader.getrequestid(), future);bytebuf.writebytes(msgheader);bytebuf.writebytes(msgbody);channelfuture channelfuture = client.writeandflush(bytebuf);channelfuture.sync();// 把结果返回给客户端啊return future.get();}});}static myheader createheader(byte[] msgbytes) {myheader header = new myheader();int size = msgbytes.length;// 用16进制的,32 位可以做很多事情int f = 0x14141414;long requestid = math.abs(uuid.randomuuid().getleastsignificantbits());header.setflag(f);header.setrequestid(requestid);header.setdatalen(size);return header;} }/*** 模拟服务*/ interface car {string race(string msg); }/*** 头部定义三个标志* 1.方法的标记,用32位的位标记* 2.请求的 id* 3.请求体的长度*/ class myheader implements serializable {int flag;long requestid;long datalen;public int getflag() {return flag;}public void setflag(int flag) {this.flag = flag;}public long getrequestid() {return requestid;}public void setrequestid(long requestid) {this.requestid = requestid;}public long getdatalen() {return datalen;}public void setdatalen(long datalen) {this.datalen = datalen;} }/*** 模拟请求体*/ class mycontent implements serializable {// 服务名string name;// 方法名string methodname;// 返回值类型class<?>[] parametertypes;// 参数object[] args;string res;public string getres() {return res;}public void setres(string res) {this.res = res;}public string getname() {return name;}public void setname(string name) {this.name = name;}public string getmethodname() {return methodname;}public void setmethodname(string methodname) {this.methodname = methodname;}public class<?>[] getparametertypes() {return parametertypes;}public void setparametertypes(class<?>[] parametertypes) {this.parametertypes = parametertypes;}public object[] getargs() {return args;}public void setargs(object[] args) {this.args = args;} }/*** 模拟客户端的创建,用单例*/ class clientfactory {int pollsize = 50; // 连接池里有50个连接nioeventloopgroup clientworker;random random = new random();private static final clientfactory factory;private clientfactory() {}static {factory = new clientfactory();}public static clientfactory getfactory() {return factory;}concurrenthashmap<inetsocketaddress, clientpool> outboxes = new concurrenthashmap<inetsocketaddress, clientpool>();public synchronized niosocketchannel getclient(inetsocketaddress address) {clientpool clientpool = outboxes.get(address);if (clientpool == null) {outboxes.putifabsent(address, new clientpool(pollsize));clientpool = outboxes.get(address);}int i = random.nextint(pollsize);// 如果有就返回if (clientpool.clients[i] != null && clientpool.clients[i].isactive()) {return clientpool.clients[i];}// 没有就创建synchronized (clientpool.locks[i]) {return clientpool.clients[i] = create(address);}}private niosocketchannel create(inetsocketaddress address) {// 基于 netty 的客户端创建方式clientworker = new nioeventloopgroup(1);bootstrap bs = new bootstrap();channelfuture connect = bs.group(clientworker).channel(niosocketchannel.class).handler(new channelinitializer<niosocketchannel>() {@overrideprotected void initchannel(niosocketchannel niosocketchannel) {channelpipeline pipeline = niosocketchannel.pipeline();pipeline.addlast(new mydecoder());pipeline.addlast(new clientresponses());}}).connect(address);try {niosocketchannel client = (niosocketchannel) connect.sync().channel();return client;} catch (interruptedexception e) {e.printstacktrace();}return null;} }/*** 模拟线连接池*/ class clientpool {niosocketchannel[] clients;object[] locks;clientpool(int size) {clients = new niosocketchannel[size];locks = new object[size];for (int i = 0; i < size; i) {在这里插入代码片locks[i] = new object();}} }class clientresponses extends channelinboundhandleradapter {@overridepublic void channelread(channelhandlercontext ctx, object msg) throws exception {mydatapackage data = (mydatapackage) msg;responsemappinghandler.runcallback(data);} }/*** 用于主线程的阻塞的控制*/ class responsemappinghandler {static concurrenthashmap<long, completablefuture> mapping = new concurrenthashmap<>();public static void addcallback(long requestid, completablefuture cb) {mapping.putifabsent(requestid, cb);}public static void runcallback(mydatapackage data) {mapping.get(data.getheader().getrequestid()).complete(data.getcontent().getres());removecallback(data.getheader().getrequestid());}public static void removecallback(long requestid) {mapping.remove(requestid);} }/*** 服务端注册的事件* 没有具体的业务逻辑,*/ class serverrequesthandler extends channelinboundhandleradapter {@overridepublic void channelread(channelhandlercontext ctx, object msg) throws exception {mydatapackage data = (mydatapackage) msg;string iothreadname = thread.currentthread().getname();ctx.executor().parent().next().execute(() -> { // ctx.executor().execute(() -> {string execthreadname = thread.currentthread().getname();mycontent content = new mycontent();string s = "io thread: " iothreadname " exec thread: " execthreadname " from args: " data.getcontent().getargs()[0];content.setres(s);byte[] contentbyte = myserializeutil.serialize(content);myheader header = new myheader();// 0x14141424 标记是服务端, 客户端是 0x14141414header.setflag(0x14141424);header.setrequestid(data.getheader().getrequestid());header.setdatalen(contentbyte.length);byte[] headerbyte = myserializeutil.serialize(header);bytebuf bytebuf = pooledbytebufallocator.default.directbuffer(headerbyte.length contentbyte.length);bytebuf.writebytes(headerbyte);bytebuf.writebytes(contentbyte);ctx.writeandflush(bytebuf);});} }/*** 利用 netty 的 bytetomessagedecoder 进行解码*/ class mydecoder extends bytetomessagedecoder {@overrideprotected void decode(channelhandlercontext channelhandlercontext, bytebuf bytebuf, list<object> list) throws exception {// 82 是打断点跟踪得出来的 header 的长度while (bytebuf.readablebytes() >= 82) {byte[] bytes = new byte[82];// 不让指针移动bytebuf.getbytes(bytebuf.readerindex(), bytes);bytearrayinputstream bis = new bytearrayinputstream(bytes);objectinputstream objectinputstream = new objectinputstream(bis);myheader myheader = (myheader) objectinputstream.readobject();if (bytebuf.readablebytes() >= myheader.getdatalen()) {bytebuf.readbytes(82); // 把指针移动到 content 的头部byte[] data = new byte[(int) myheader.getdatalen()];bytebuf.readbytes(data);bytearrayinputstream bytearrayinputstream = new bytearrayinputstream(data);objectinputstream ois = new objectinputstream(bytearrayinputstream);// 因为客户端和服务端都用同一个 decoder,区分一下if (myheader.getflag() == 0x14141414) {mycontent mycontent = (mycontent) ois.readobject();list.add(new mydatapackage(myheader, mycontent));} else if (myheader.getflag() == 0x14141424) {mycontent mycontent = (mycontent) ois.readobject();list.add(new mydatapackage(myheader, mycontent));}} else {break;}}} }class mydatapackage {private myheader header;private mycontent content;public mydatapackage(myheader myheader, mycontent mycontent) {this.header = myheader;this.content = mycontent;}public myheader getheader() {return header;}public void setheader(myheader header) {this.header = header;}public mycontent getcontent() {return content;}public void setcontent(mycontent content) {this.content = content;} }

总结

以上是凯发k8官方网为你收集整理的手撕 rpc 2的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得凯发k8官方网网站内容还不错,欢迎将凯发k8官方网推荐给好友。

网站地图