python断点续传的原理及实现例子(支持多线程)

一、 简单原理

说到断点续传,就不得不说一些和断点续传相关的HTTP头部字段。

① Content-Length

Content-Length用于指示HTTP响应包中实体的大小。除非使用了分块编码,否则Content-Length首部就是带有实体主体的报文必须使用的。使用Content-Length首部是为了能够检测出服务器崩溃而导致的报文截尾,并对共享持久连接的多个报文进行正确分段。

检测结尾
 HTTP的早期版本采用关闭连接的办法来划定报文的结束。但是,没有Content-Length的话,客户端无法区分到底是报文结束时正常的关闭连接还是报文传输中由于服务器崩溃而导致的连接关闭。客户端需要通过Content-Length来检测报文截尾。
 报文截尾的问题对缓存代理服务器来说尤为重要。如果缓存服务器收到被截尾的报文却没有识别出截尾的话,它可能会存储不完整的内容并多次使用他来提供服务。缓存代理服务器通常不会为没有显式Content-Length首部的HTTP主体做缓存,以此来减小缓存已截尾报文的风险。
Content-Length与持久连接
Content-Length首部对于持久链接是必不可少的。如果响应通过持久连接传送,就可能有另一条HTTP响应紧随其后。客户端通过Content-Length首部就可以知道报文在何处结束,下一条报文从何处开始。因为连接是持久的,客户端无法依赖连接关闭来判断报文的结束。
有一种情况,使用持久连接可以没有Content-Length首部,即采用分块编码(chunked encoding)时。在分块编码的情况下,数据是分为一系列的块来发送的,没块都有大小说明。哪怕服务器在生成首部的时候不知道整个实体的大小(通常是因为实体是动态生成的),仍然可以使用分块编码传输若干已知大小的块。
② Transfer-Encoding

HTTP协议中只定义了一种Transger-Encoding,也就是chunked。举个例子,如果服务端的主体是动态生成的。而客户端不希望服务端将主体全部生成后再获取,因为中间的时延会特别大。chunked的格式如下:

HTTP/1.1 200 OK

Transfer-Encoding: chunked

2 ab

a 0123456789

0
1
2
3
4
5
6
7
8
9
HTTP/1.1 200 OK
 
Transfer-Encoding: chunked
 
2 ab
 
a 0123456789
 
0
③ Content-Enconding

常见有如下三种:gzip,deflate,compress。它用来指示实体是以什么算法进行编码的。通常,Content-Encoding与Transfer-Encoding结合使用。

④ Content-Range

用于响应头,指定整个实体中的一部分的插入位置,他也指示了整个实体的长度。在服务器向客户返回一个部分响应,它必须描述响应覆盖的范围和整个实体长度。一般格式:

 Content-Range: bytes start-end/total

⑤ Range

用于请求头中,指定第一个字节的位置和最后一个字节的位置,一般格式:

 Range:bytes=start-end

 

二、单线程实现

① 是否支持断点续传

采用head获取部分实体,看是否返回头中含有Content-Range
采用head获取部分实体,看返回状态码是否为206。
② 具体实现步骤

使用head方法获取文件大小
获取本地文件大小
设置请求头Range信息
利用requqests.response.iter_content以及开启stream模式
文件下载到一定大小就写入

 代码如下 复制代码

# usr/bin/env python
# coding: utf-8

"""
Copyright (c) 2015-2016 cain
author: cain <singforcain@gmail.com>
"""

import os
import time
import logging
import datetime
import requests
import argparse

class fileDownload(object):
    def __init__(self, url, file_name):
        """
        :param url:文件的下载地址
        :param file_name:重命名文件的名字
        :return:
        """
        self.url = url
        self.file_name = file_name
        self.stat_time = time.time()
        self.file_size = self.getSize()
        self.offset = self.getOffset()
        self.downloaded_size = self.offset
        self.headers = self.setHeaders()
        self.tmpfile = ""
        self.info()

    def info(self):
        logging.info("Downloaded    [%s] bytes" % (self.offset))

    def setHeaders(self):
        """
        根据已下载文件的大小设置Range头部范围并返回
        :return:
        """
        start = self.offset
        end = self.file_size - 1
        range = "bytes={0}-{1}".format(start, end)
        return {"Range": range}

    def getOffset(self):
        if os.path.exists(self.file_name):
            if self.file_size == os.path.getsize(self.file_name):
                exit()
            else:
                return os.path.getsize(self.file_name)
        else:
            return 0

    def getSize(self):
        """
        :return:返回文件的大小,采用head的方式
        """
        response = requests.head(self.url)
        return int(response.headers["content-length"])

    def download(self):
        """
        断点续传的核心部分
        :return:
        """
        with open(self.file_name, "ab") as f:
            try:
                r = requests.get(self.url, stream=True, headers=self.headers)
                for chunk in r.iter_content(chunk_size=1024):
                    if not chunk:
                        break
                    self.tmpfile += chunk
                    if len(self.tmpfile) == 1024*50:
                        f.write(self.tmpfile)
                        self.downloaded_size += len(self.tmpfile)
                        logging.info("Downloaded ---[%.2f%%] [%s/%s] bytes" % (float(self.downloaded_size)
                                                                           /self.file_size*100,
                                                                           self.downloaded_size, self.file_size))
                        self.tmpfile = ""
            except KeyboardInterrupt:
                logging.warning("Interruped by user")
                logging.info("Ending the thread,please do not exit")
            finally:
                f.write(self.tmpfile)
                self.downloaded_size += len(self.tmpfile)
                logging.info("Downloaded ---[%.2f%%] %s/%s bytes" % (float(self.downloaded_size)
                                                                   /self.file_size*100,
                                                                   self.downloaded_size, self.file_size))
                consume = int(time.time()) - self.stat_time
                logging.info("It consumes %d seconds" % (consume))
                logging.info("End at %s" % (time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))))

def init():
    """
    配置日志信息
    :return:
    """
    logging.basicConfig(format='[%(asctime)s]\t[%(levelname)s]\t%(message)s',
                    level="DEBUG",
                    datefmt="%Y/%m/%d %I:%M:%S %p"
                    )

def run(url, name):
    if not name:
        name = url.split("/")[-1]
    file = fileDownload(url, name)
    file.download()

if __name__ == '__main__':
    init()
    parser = argparse.ArgumentParser()
    parser.add_argument("url", help="The file's url")
    parser.add_argument("--name", help="The file's name you want to rename")
    args = parser.parse_args()
    run(args.url, args.name)

三、多线程实现(非断点续传)

 

 代码如下 复制代码
# usr/bin/env python
# coding: utf-8
 
"""
Copyright (c) 2015-2016 cain
author: cain <singforcain@gmail.com>
"""
 
import time
import math
import Queue
import logging
import argparse
import requests
import threading
 
mutex = threading.Lock()
 
 
class FileDownload(object):
    def __init__(self, url, filename, threadnum, bulk_size, chunk_size):
        self.url = url
        self.filename = filename
        self.threadnum = threadnum
        self.bulk_size = bulk_size
        self.chunk_size = chunk_size
        self.file_size = self.getSize()
        self.buildEmptyFile()
        self.queue = Queue.Queue(1024)
        self.setQueue()
 
 
    def getSize(self):
        """
        :return:返回文件的大小,采用head的方式
        """
        response = requests.head(self.url)
        return int(response.headers["content-length"])
 
    def buildEmptyFile(self):
        """
        建立空文件
        :return:
        """
        try:
            logging.info("Building empty file...")
            with open(self.filename, "w") as f:
                f.seek(self.file_size)
                f.write("\x00")
                f.close()
        except Exception as err:
            logging.error("Building empty file error...")
            logging.error(err)
            exit()
 
    def setQueue(self):
        """
        根据文件大小以及设置的每个任务的文件大小设置队列
        :return:返回队列信息
        """
        logging.info("Setting the queue...")
        tasknums = int(math.ceil(float(self.file_size)/self.bulk_size))     # 向上取整
        for i in range(tasknums):
            ranges = (self.bulk_size*i, self.bulk_size*(i+1)-1)
            self.queue.put(ranges)
 
    def download(self):
        while True:
            logging.info("Downloading data in %s" % (threading.current_thread().getName()))
            if not self.queue.empty():
                start, end = self.queue.get()
                tmpfile = ""
                ranges = "bytes={0}-{1}".format(start, end)
                headers = {"Range": ranges}
                logging.info(headers)
                r = requests.get(self.url, stream=True, headers=headers)
                for chunk in r.iter_content(chunk_size=self.chunk_size):
                    if not chunk:
                        break
                    tmpfile += chunk
                mutex.acquire()
                with open(self.filename, "r+b") as f:
                    f.seek(start)
                    f.write(tmpfile)
                    f.close()
                logging.info("Writing [%d]bytes data into the file..." % (len(tmpfile)))
                mutex.release()
            else:
                logging.info("%s is over..." % (threading.current_thread().getName()))
                break
 
    def run(self):
        threads = list()
        for i in range(self.threadnum):
            threads.append(threading.Thread(target=self.download))
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
 
def logInit():
    """
    配置日志信息
    :return:
    """
    logging.basicConfig(format='[%(asctime)s]\t[%(levelname)s]\t%(message)s',
                        level="DEBUG",
                        datefmt="%Y/%m/%d %I:%M:%S %p")
 
 
def start(url, filename, threadnum):
    """
    下载部分核心功能
    :param url:
    :param filename:
    :param threadnum:
    :return:
    """
    url = url
    filename = filename
    threadnum = threadnum if threadnum and threadnum < 20 else 5
    bulk_size = 2*1024*1014
    chunk_size = 50*1024
    print url, filename, threadnum, bulk_size, chunk_size
    Download = FileDownload(url, filename, threadnum, bulk_size, chunk_size)
    Download.run()
 
if __name__ == '__main__':
    logInit()
    logging.info("App is starting...")
    start_time = time.time()
    parser = argparse.ArgumentParser()
    parser.add_argument("url", help="The file's url")
    parser.add_argument("--filename", help="The file's name you want to rename")
    parser.add_argument("--threadnum", help="The threads you want to choose", type=int)
    args = parser.parse_args()
    start(args.url, args.filename, args.threadnum)
    logging.info("App in ending...")
    logging.info("It consumes [%d] seconds" % (time.time()-start_time))

 

四、多线程断点续传

这就是上面的结合体,不过通常会使用一个配置文件用于保存下载的状态,重新下载的时候就根据此文件重新配置。

时间: 2024-11-29 03:25:29

python断点续传的原理及实现例子(支持多线程)的相关文章

Linux下实现断点续传的原理介绍

  断点续传是一种结合本地存储和网络存储的技术,主要用来解决网络失效时的视频丢失问题.DVS通常本身没有视频存储功能,而是必须由后端的NVR来实现视频的存储,因此对于网络稳定性要求很高,网络连接失败.丢包严重.抖动等各种因素都可能造成视频数据的丢失.断点续传支持从文件上次中断的地方开始传送数据,而并非是从文件开头传送.这就是断点续传的定义.系统都默认可以断点续传,但我们很少知道他的原理,下面就来看看小编的介绍吧. 断点续传的原理 其实断点续传的原理很简单,就是在 Http 的请求上和一般的下载有

Android 断点续传的原理剖析与实例讲解_Android

 本文所要讲的是Android断点续传的内容,以实例的形式进行了详细介绍.        一.断点续传的原理        其实断点续传的原理很简单,就是在http的请求上和一般的下载有所不同而已.        打个比方,浏览器请求服务器上的一个文时,所发出的请求如下:        假设服务器域名为www.jizhuomi.com/android,文件名为down.zip. get /down.zip http/1.1 accept: image/gif, image/x-xbitmap,

android-谁能解释一下多线程下载,和断点续传的原理

问题描述 谁能解释一下多线程下载,和断点续传的原理 找了好长时间,在网上找的都是代码写的,但是不想看代码.想了解一下他们的原理.请指教 解决方案 http://www.liqwei.com/network/protocol/2011/886.shtml 解决方案二: 1.多线程下载,就是将要下载的文件分成多份,每个线程下载一份,然后在客户端合成最后的文件: 2.断电续传,就是文件还没有下完,但是由于种种原因,连接断开了,此时不需要重新下载整个文件,只需要下载还没有下载的. 3.多线程下载和断点续

java实现断点续传的原理

其实断点续传的原理很简单,就是在Http的请求上和一般的下载有所不同而已.  打个比方,浏览器请求服务器上的一个文时,所发出的请求如下:  假设服务器域名为wwww.sjtu.edu.cn,文件名为down.zip.  GET /down.zip HTTP/1.1  Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-  excel, application/msword, applica

很简单的Java断点续传实现原理_java

原理解析 在开发当中,"断点续传"这种功能很实用和常见,听上去也是比较有"逼格"的感觉.所以通常我们都有兴趣去研究研究这种功能是如何实现的? 以Java来说,网络上也能找到不少关于实现类似功能的资料.但是呢,大多数都是举个Demo然后贴出源码,真正对其实现原理有详细的说明很少. 于是我们在最初接触的时候,很可能就是直接Crtl + C/V代码,然后捣鼓捣鼓,然而最终也能把效果弄出来.但初学时这样做其实很显然是有好有坏的. 好处在于,源码很多,解释很少:如果我们肯下功

Android 断点续传的原理剖析与实例讲解

本文所要讲的是Android断点续传的内容,以实例的形式进行了详细介绍. 一.断点续传的原理 其实断点续传的原理很简单,就是在http的请求上和一般的下载有所不同而已. 打个比方,浏览器请求服务器上的一个文时,所发出的请求如下: 假设服务器域名为www.jizhuomi.com/android,文件名为down.zip. get /down.zip http/1.1 accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, appl

python中self原理实例分析

  本文实例讲述了python中self原理.分享给大家供大家参考.具体分析如下: 类的方法与普通的函数只有一个特别的区别--它们必须有一个额外的第一个参数名称,但是在调用这个方法的时候你不为这个参数赋值,Python会提供这个值.这个特别的变量指对象本身,按照惯例它的名称是self. 假如你有一个类称为MyClass和这个类的一个实例MyObject.当你调用这个对象的方法 MyObject.method(arg1, arg2) 的时候,这会由Python自动转为 MyClass.method

python 的reduce原理是什么?

问题描述 python 的reduce原理是什么? 如果函数接收多个参数而只给reduce的参数队列不够数量参数,则会发生一下各种情况: def func1(x,y,z): return x+y+z def func2(x,y): return x+y def func21(x,y=1): return x+y def func3(x,y,z): return x+y+z def func4(x,y,z=1): return x+y+z by = [1] by2 = [1,2] print re

java EJB 加密与解密原理的一个例子_php技巧

加密与解密原理的一个例子 package lockunlock;  import Java.awt.*;  import java.awt.event.*;  import java.Applet.*;  import javax.Swing.*;  import java.util.*;  public class LockUnlock extends JApplet {  private boolean isStandalone = false;  //Get a parameter val