java-网络编程-大文件搬运

一切源于:

阿里搬砖头比赛说好是Client端线程级的同步阻塞请求,结果一帮人用了协程来完成这件事。其实吧,我想说就算用协程来完成,其实本质也和异步差不多(就网络通讯层),不过却激发了我的好奇心,因为比赛的结果是1G极限,只用了3秒!

3秒…如果我们将题目往对我有利的思考方向改变下,不再是Client端线程级的同步阻塞,只要求Server端请求应答同步即可。即:Server端在没收到一个请求之前,不能提前将下一个应答发出!如果我将题目修改为这样,我也不见得非常有信心能将时间控制在3秒以内!

所以这里立了个小项目来验证不同的实现方式下,对网络传输、CPU使用的开销。项目起名为镭射,取汇集小请求将能量集中爆发之意。

打算挑战下极限搬运的。

思路参考

穿越nat传输完整的文件,而且能自动重传丢失报文,采用UDP协议,在跨大洲之间进行完整的大文件(超过2GB)传输。
主要解决思路和技术点如下:
对每个报文进行文件任务标志。可以同时传输多个文件。
发送文件内容之前,先发送文件名。然后返回文件名收到后的确认,然后再开始传输。
对发送报文进行计数,每隔1000个报文(某个时间间隔),接收端发送一个计数报文,报告丢包率。采取降低发送速度等措施。
长期丢包率在99%以上,尝试提高发送速度一倍。如果丢包率超过10%就降速。
文件传输中断,有中断信息。接收端长期没有接受到文件报文,则终止任务。
发送完毕后,接收端发送完毕报文,结束发送。
10000个报文组成一个块的概念,接收端每收到10000个(某个数字)报文,发送接受块成功的消息。整个块不会重复发送。如果块出现某个报文发送失败,则重新发送报文。
每个UDP报文,大小是1464字节,其中64个字节用来标示任务。
采用java编写,在web端使用,在tomcat启动时,也就是servlet的contextlistener启动的时候,启动守护进程,监听udp的80端口。是否与servlet进行协同。为了支持子网,客户端只负责发送和上传数据,不能下载数据。所以计数和控制,可能需要tcp的参与。

首先是报文,为了降低开发工作量。
先使用udp报文的48个字节。剩下24个还没想好。
首先将48个字节分为6个long类型。
第一个long,标识版本和报文类型。
比如1表示是数据传输
2表示是重传通知等等。
第二个long,是uuid的high位
第三个long,是uuid的low位。
一起标识一个文件。
第四个long是包序号。
第五个long是包内报文序号
第六个long是下面的报文长度。
第七个long是在被传输文件里面的位移量
第八个long是在传输文件里面的写入长度

开始为了测试方便,将udp报文的数据长度设为1024,总长度是1024+64=1088
将缓冲窗口设为512KB,即在内存开辟一个512KB的缓冲器,当收满512个UDP报文时,写内存中的数据写入目标文件。如果512个报文在单位时间内,没有收满,说明有丢包,则选择丢失的报文重传。丢包判断待定。在UDT协议里面是采用等待4倍的RTT时间判断的,在4倍的RTT时间后未到达的报文都被判断为丢失。
目前完成了udp的服务启动。数据传输窗口的相关开发。其实只要保证窗口内的512KB数据传输成功即可。每512KB作为一个window,然后返回确认信息。客户端确认接到512KB完成的确认信息后,再去发送下一个512KB。确认信息里面可以加入crc校验或者MD5校验。客户端判断一下是否正确,决定是否重传。如果客户端决定,继续发送,则直接发送报文即可。服务器端,接到新的报文,如果是新报文ID,则将旧512KB数据写入实体文件,然后更新window,将收到的第一个报文写入window的相应位置。如果客户端没有收到服务器端的window结束的确认报文,则等待,而不发送新的报文。服务器端等待4个RTT时间,再发送确认报文,最多重试5次。如果5次客户端都没收到确认报文,不发送新数据,服务器没有接受新数据,则服务器端认为网络中断。停止任务。将任务写入临时文件。等待客户端下一次重传唤醒。刚才浏览下网页又有新的收获:http://developer.chrome.com/apps/socket.html
chrome的websocket已经实现了udp,估计这个程序的上传客户端可以直接拿javascript来写了。比较理想了。之前的思路有点问题,关于客户端停止发送这一点,会导致传输效率变低,应该是客户端继续发送,直到5个RTT时间后,没有收到上个window结束信号之后,再停止发送。
UDP之所以比TCP效率高,本质原因是TCP的一个bug,TCP将RTT时间和带宽联系在一起,片面认为RTT时间越长,带宽越低,而真实情况是,RTT和带宽没有直接关系。如果跟RTT建立联系了,我写UDP传输就没意思了。

//解析中靶情况

            //取出自己已经发送的包的数量

            //统计中靶的数量

            //判断丢包率

            //如果丢包率高于20%,则降低传输速度20%

            //如果丢包率等于99%以上,则提高120%的速度

            //如果箭已经射完,但是靶上还有箭没有,则重射,参数windowId,packetId,全部放入内存了

            //如果确认某个window的最后一箭的消息丢失,等5个rrt,重新放这个window的最后一箭

            break;

        case Data:

            //如果是某个windows最后一箭,则写入文件。发回确认最后一箭后的中靶情况。换一个window。

            //如果window不是当前window了。那么返回全部中靶信息。

            //重新设计头部

            //high

            //low

            //window long window 序号

            //type int 类别

            //id int 包序号

            //length int 包长度

            //count int window内包数
       DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                socket.receive(packet);
                //解析头部
                this.header.load(packet);

                //是否是数据的
                PacketType type = PacketType.values()[this.header.getType()];

                switch (type) {
                    //为了发送,发现丢包,调整速度
                    case Target:

                        Archer archer = inst.getArcher(header.getHigh(), header.getLow());
                        if (archer == null) {
                            return;
                        }
                        archer.update();//有回包,说明网络OK
                        int packetSend = archer.getPacket(header.getWindow());
                        lost = (packetSend - header.getScore()) / packetSend;
                        //archer.setBandwidth((long)(archer.getBandwidth()/lost));
                        if (lost > 0.10) {
                            archer.setBandwidth((long) (archer.getBandwidth() / lost));
                        } else {
                            if (lost < 0.01) {
                                archer.setBandwidth((long) (archer.getBandwidth() * 1.2));
                            } else {
                                //啥也不做保持原速
                            }
                        }
                        //补射
                        this.result.load(header, packet);
                        for (int i = 0; i < packetSend; i++) {
                            byte[] data = result.getBuffer();
                            if (data[i] == 0x0) {
                                Miss miss = new Miss();
                                miss.setPacket(i);
                                miss.setWindow(header.getWindow());
                                archer.miss(miss);
                            }
                        }

                        break;
                    //
                    case Data:
                        Target target = inst.getTarget(header.getHigh(), header.getLow());
                        target.update();
                        //新的文件传递
                        if (target == null) {
                            Bow initBow = new Bow(packet.getAddress(), packet.getPort());
                            target = new Target(initBow, header.getHigh(), header.getLow());
                            target.open();
                            inst.addTarget(target);//这里启动的线程,此线程只是用来测量心跳
                        }
                        //是否需要换弓,应对对称式的nat,必要的情况可以缓冲多把弓,目前只有一把
                        Bow bow = target.getBow();
                        if (bow != null) {
                            if ((!bow.getAddress().equals(packet.getAddress())) && bow.getPort() != packet.getPort()) {
                                bow.close();
                                Bow initBow = new Bow(packet.getAddress(), packet.getPort());
                                target.setBow(initBow);
                            }
                        }
                        Iterator<Window> iter = target.Windows(); //此处没有限制window的数量,有可能导致系统崩溃,不过测试应该问题不大
                        boolean isExist = false;
                        Window win = null;
                        while (iter.hasNext()) {
                            win = iter.next();
                            if (win.getId() == header.getWindow()) {

                                isExist = true;
                                break;
                            }
                        }
                        if (win == null || !isExist) {
                            //此报文对应的窗口不存在
                            if (header.getWindow() > target.getPosition() / FastConfig.WindowLength) {
                                win = new Window(header);
                                this.result.load(header, packet);
                                win.setData(header.getId(), this.result.getBuffer());
                                target.addWindow(win);
                            } else {
                                //如果是陈旧报文,则丢弃此报文
                            }
                        } else {
                            this.result.load(header, packet);
                            win.setData(header.getId(), this.result.getBuffer());
                            if (win.isFull()) {
                                target.write(header.getWindow(), buffer);
                                //删除此窗口
                                target.removeWindow(win);
                            }
                        }

                        break;
                }
时间: 2024-07-30 13:42:38

java-网络编程-大文件搬运的相关文章

遇到一个棘手的问题,需要java网络编程大神帮忙解答下~

问题描述 遇到一个棘手的问题,需要java网络编程大神帮忙解答下~ 问题是这样的: 我把MINA核心的非阻塞轮训方式的的代码用JDK7的AIO异步IO替换了,现在已经包装完成,测试的时候遇到两个问题: 测试的是这样的,服务端启动后20秒后释放所有资源关闭,客户端启动15秒后释放所有资源关闭,大部分情况下测试都是正常的,但是偶尔会出现客户端服务端都关闭后,再次启动服务的过程后,要么服务端抛出AsynchronousCloseException,客户端抛出远程主机强迫关闭一个现有连接:要么是客户端的

服务器-JAVA网络编程问题请大神指导

问题描述 JAVA网络编程问题请大神指导 面试被问到了,请教大神:同一服务器相同Server是否可以共用一个端口?同一服务器不同Server是否可以共用一个端口? 解决方案 一个TOMCAT 可以有多个项目 占一个端口, 多个TOMCAT 需要各自使用不同端口.一个端口只能被一个服务使用. 解决方案二: linux内核中有端口reuse技术,这样可以多个应用绑定到同一个端口,然后内核来调度把连接转发给某个应用.nginx中worker有采用这个

请大神推荐Java网络编程书籍,有介绍IGMPv3协议的

问题描述 最近在学校做有关SDN的项目,需要用JAVA编写有关IGMP协议的内容--旧版本的JAVA网络编程书籍很少有涉及到这方面的介绍,所以请各位推荐书籍.不胜感激! 解决方案

Java网络编程(一)

关于JAVA网络编程的技术非常繁多,如:SOCKET.RMI.EJB.WEBSERVICE.MQ.中间数据等等方法,但是万变都是源于基础中通信原理,有些是轻量级的.有重量级的:有实时调用.有异步调用:这么多的技术可以说什么都可以用,关键在什么场合用什么最适合你,这些技术主要用于多个子系统之间相互通信的方法,如一个大型的软件应用分多个子系统,它们可能由不同的厂商来完成,这些子系统最终需要整合为一个系统,那么整合的基础就是交互,要么是通过数据交互,要么是通过接口调用,要么通过中间数据等等.本文从最基

Java网络编程之简单的服务端客户端应用实例_java

本文实例讲述了Java网络编程之简单的服务端客户端应用.分享给大家供大家参考.具体如下: 在Java中,我们使用java.net.Socket及其相关类来完成有关网络的相关功能.Socket类非常简单易用,因为Java技术隐藏了建立网络连接和通过连接发送数据的复杂过程.下面所说的内容只适用于TCP协议. 一.连接到服务器 我们可以使用Socket类的构造函数来打开一个套接字,如 Socket sk = new Socket("210.0.235.14",13); 其中,210.0.23

Java网络编程从入门到精通(34)

Java网络编程从入门到精通(34):读写缓冲区中的数据---使用get和put方法按顺序读写单个数据 对于缓冲区来说,最重要的操作就是读写操作.缓冲区提供了两种方法来读写缓冲区中的数据:get.put方法和array方法.而get.put方法可以有三种读写数据的方式:按顺序读写单个数据.在指定位置读写单个数据和读写数据块.除了上述的几种读写数据的方法外,CharBuffer类还提供了用于专门写字符串的put和append方法.在本文及后面的文章中将分别介绍这些读写缓冲区的方法. 虽然使用all

JAVA网络编程服务器多线程接受套接字,如何能使服务器的静态常量,与客户端的数据进行同步?

问题描述 JAVA网络编程服务器多线程接受套接字,如何能使服务器的静态常量,与客户端的数据进行同步? 服务器Server客户端CLientServer静态常量num创建服务器,ServerSocket的端口号为8000,连接套接字.每创建一个客户端,客户端就创建一个Socket,端口号为8000,与服务器进行连接,与此同时,客户端新建ServerSocket,端口号为3000+Server.num,连接套接字.服务器与客户端连接后,服务器新建子线程Handler.子线程Handler,新建Ser

java网络编程错误java.net.ConnectException: Connection refused: connect求教

问题描述 java网络编程错误java.net.ConnectException: Connection refused: connect求教 刚开始学习java网络编程,在书上看到一段代码,就动手试一下,结果出现下面错误: java.net.ConnectException: Connection refused: connect 源代码如下: import java.io.*; import java.net.*; class lx01 { public static void main(S

Java网络编程从入门到精通(34):读写缓冲区中的数据---使用get和put方法按顺序读写单个数据

本文为原创,如需转载,请注明作者和出处,谢谢! 上一篇:Java网络编程从入门到精通(33):非阻塞I/O的缓冲区(Buffer)     对于缓冲区来说,最重要的操作就是读写操作.缓冲区提供了两种方法来读写缓冲区中的数据:get.put方法和array方法.而get.put方法可以有三种读写数据的方式:按顺序读写单个数据.在指定位置读写单个数据和读写数据块.除了上述的几种读写数据的方法外,CharBuffer类还提供了用于专门写字符串的put和append方法.在本文及后面的文章中将分别介绍这