背景
对象存储 OSS是面向海量非结构化数据对象的存储服务。随着云计算的普及和飞速增长,越来越多的开发者把他们的应用建筑在了 OSS之上。OSS对外提供的是RESTful形式的接口,其最重要的特点之一是无状态性(statelessness),即OSS服务器不会保持除了单次请求之外的,任何与其通信的客户端的通信状态。因此对于断点续传这样有状态功能的实现,关键点在于如何在客户端完成状态维护 。
本文将以Python为例,介绍通过OSS是实现大文件的断点下载和断点上传的。
具体操作步骤
(一)在OSS上实现大文件的断点下载
所谓断点下载,就是要从文件已经下载的地方开始继续下载。为了方便理解,我们先来看一个从OSS下载一个文件保存到本地的Python例子。在这个例子[1]中,我们从一个名为 “lingyun”的bucket里面,下载一个叫“example.dat”的文件,并且以相同名字保存在当前目录。
from oss_api import *
HOST="oss.aliyuncs.com"
BUCKET = "lingyun"
OBJECT = "example.dat"
ACCESS_ID = "*******************"
SECRET_ACCESS_KEY = "*******************"
#下载文件
oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY)
res = oss.get_object(BUCKET, OBJECT)
#保存文件
if 200 == res.status:
f = file(OBJECT, 'w')
f.write(res.read())
f.close()
print "Download succeeded."
else:
print "Download failed."
基于上面的代码,下面的程序显示了增加断点续传功能的文件下载代码,变化的地方加粗标注出来了:
from oss_api import *
HOST="oss.aliyuncs.com" #ads
BUCKET = "lingyun"
OBJECT = "example.dat"
BUFFER_SIZE = 10240 # 写入数据的buffer大小
ACCESS_ID = "*******************"
SECRET_ACCESS_KEY = "*******************"
oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY)
# 流式地将数据写入文件
def flush_data(file, http_res):
while True:
data = res.read(BUFFER_SIZE)
if len(data) != 0:
file.write(data)
else:
break
# 获取本地文件长度
f = file(OBJECT, 'a')
file_len = f.tell()
# 设置HTTP Header里面的Range参数,跳过已经收到的数据
headers = {}
headers["range"] = "bytes=" + str(file_len) + "-"
res = oss.get_object(BUCKET, OBJECT, headers)
if 206 == res.status: # 返回指定范围内的数据
flush_data(f, res)
print "Download succeeded."
else: # 下载失败
print "Download failed."
f.close()
这段代码和前段代码相比,有四处发生了变化:
- 增加了流式写入本地文件的逻辑。防止下载的数据对象过大,无法一下子读入本地的内存中;
- 向OSS发送数据前,获取本地文件长度。
- 构造HTTP的Range Header,要求OSS从指定的位置开始下载。
- 判断OSS返回的HTTP值,并做出相应的处理:如果OSS返回206,说明下载的是指定位置范围内的数据;其他状态码表明“Range”参数错误或者发生异常。
在使用“Range”这个HTTP 参数时,请注意以下三点:
- Range参数中的文件位置是从0开始,最大值是文件长度减1;
- 如果Range参数填写错误,OSS将忽视这个参数[2];
- Range参数设置正确的话,OSS将返回HTTP状态码206(不是200)以表示返回的是部分数据。
通过“Range”参数,还可以实现大文件的并发下载。这个功能作为思考题留给各位读者,感兴趣的读者可以自己实现一下。OSS官方的SDK里面也提供了一个多线程下载功能的实现,供大家参考。
(二)在OSS上实现大文件的断点上传:
相对于断点下载,断点上传的实现显然要复杂得多。OSS提供的解决办法可以理解为:在客户端将大文件切分成若干适合公网传输的小数据块;然后将这些小数据块分别上传到OSS上;最后在OSS服务器端将这些小数据块合并成最终的文件。为了实现这个功能,OSS单独发布一套上传API接口——Multipart Upload。这套API接口共有6个:
- Initiate Multipart Upload:初始化一个Multipart Upload事件;
- Upload Part:上传数据块;
- Complete Multipart Upload:完成一个Multipart Upload事件;
- Abort Multipart Upload:中止一个Multipart Upload事件;
- List Multipart Uploads:列出所有存在的Multipart Upload事件;
- List Parts:列出某个Multipart Upload事件下的所有数据块。
这套接口中定义了两个唯一识别码(UUID):Upload ID和Part ID,分别用于标识某个Multipart Upload上传事件和某个数据块。一个完整的Multipart上传过程由以下几步组成:
(1)Initiate Multipart Upload: 初始化一个Multipart Upload事件
客户端通知OSS要上传一个大文件,OSS返回给客户端一个唯一标识这次Multipart上传事件的Upload ID。Python示例代码如下:
oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY)
res = oss.init_multi_upload(BUCKET, OBJECT)
下面是OSS返回的HTTP结果示例:
BUCKET
OBJECT
0004D4184129F5A1A42663160C4C58B1
其中“0004D4184129F5A1A42663160C4C58B1”就是OSS为这次Multipart Upload事件分配的Upload ID。通过这个接口,用户只是在OSS上注册了一个Multipart Upload事件,并没有任何文件被创建或改变。你可以对同一个文件创建多个Multipart Upload事件,在这些Multipart Upload事件没有完成(Complete)或被中止(Abort)之前,它们都是同时存在的。
(2) Upload Part:上传数据块
在客户端将大文件切分成多个适合公网传输大小(建议5MB)的数据块(Part),然后分别上传到OSS上,并告知OSS这些数据块属于某个Upload ID。Python 示例代码如下:
oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY)
res = oss. upload_part (BUCKET, OBJECT, data, upload_id, part_id )
其中,“data”表示要上传的Part数据内容;“upload_id”为此次上传事件的ID;“part_id”是该数据块的索引。Part ID不但唯一标识这一数据块,还标识了这个数据块在整个文件内的相对位置。如果你在同一个Upload ID下,使用一个已上传过的Part ID上传了新的数据,那么OSS上已有的这个part数据将被覆盖。除了最后一块Part数据没有大小限制以外,其他的Part数据不能小于5MB。Part ID的有效范围是1~10000。OSS并不要求属于同一个Upload ID的Part ID必须是连续的,比如:用户可以只上传Part ID为1、16、51的数据块;但Part ID的大小表示了数据块之间的相对位置,例如Part ID为16的数据块,在整个文件中必须在Part ID为51的数据块之前。Upload Part命令执行成功后,OSS会返回这个Part数据的MD5值给客户端。用户需要保存这些MD5值,以便在OSS上最后生成文件时使用。
(3)Complete Multipart Upload:完成一个Multipart Upload事件
在上传完所有的数据块到OSS上之后,我们就可以要求OSS在服务器端将指定的某个Upload ID所属的数据块组合成最终的文件。在执行该操作时,客户端需要提供一个XML格式的文件,其中详细列举出了该文件所需的Part ID及其对应的MD5值。一个XML的例子如下:
<CompleteMultipartUpload>
<Part>
<PartNumber>1</PartNumber>
<ETag>1DC6D29FD1E1989793B83F5C2FD0C5E0</ETag>
</Part>
<Part>
<PartNumber>16</PartNumber>
<ETag>E17AC4037030A1227D1C1B115619C6F1</ETag>
</Part>
<Part>
<PartNumber>51</PartNumber>
<ETag>807014FC970ED07BA28DE40B20E5BD59</ETag>
</Part>
</CompleteMultipartUpload>
当我们构建好这个XML文件后,就可以通过调用OSS Python SDK的接口来发送完成Multipart Upload事件的请求,代码示例如下:
oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY)
res = oss.complete_upload(BUCKET, OBJECT, upload_id, part_msg_xml)
OSS收到提交的XML列表后,会逐一判断每个Part是否存在,以及对应的MD5值是否和客户端提供的MD5值相等。当所有的Part验证通过后,OSS将把这些数据Part组合成一个最终的Object。需要注意的是,用户可以在这次请求里,不指定所有已经上传的Part。例如,刚才我们成功上传了1、16和51共三个数据块到某个Upload ID名下,我们可以只指定用Part 1、51来组成最后的文件(注意Part的ID仍然要求是升序的)。当OSS生成最终的文件后,会将没有用到的16号Part删除,以释放磁盘空间。
整个Multipart Upload流程的Python伪代码如下所示:
#初始化OSS对象
oss = OssAPI(HOST, ACCESS_ID, SECRET_ACCESS_KEY)
# 初始化Multipart Upload事件并获得Upload ID
upload_id = init_multi_upload(oss, bucket_name, object_name)
# 将本地文件分解成为多个part,并计算出每个part的起始位置和长度
(pos_list, len_list) = split_file_to_part_list(file_name)
# 开启多个线程来上传part
thread_pool = []
for index in range(0, thread_sum):
# 在create_thread_worker 里调用OSS API上传指定的part,上传结果保存在upload_res
upload_thread = create_thread_worker (oss, file_name, pos_list[index],
len_list[index], upload_res[index])
thread_pool.append(upload_thread)
upload_thread.start()
# 等待所有线程结束
for upload_thread in thread_pool:
upload_thread.join()
# 创建最终合成文件的part列表(XML格式)
part_msg_xml = create_part_xml(upload_res)
# 要求OSS完成本次Multipart Upload事件
res = complete_multipart_upload(oss, bucket, object, upload_id, part_msg_xml)
上面的例子中,使用到了OSS提供的三个接口。其余的三个接口主要提供了对Upload ID和Part ID的查询和删除,方便用户的管理。由于篇幅原因,这三个接口就不在这里做展开说明了,感兴趣的朋友可以参考《OSS API文档》里面的相应章节。
在OSS提供的Multipart Upload方法中,由于各个数据块之间是相互独立的,所以在传输过程中,如果任何一个数据块传输失败或者进程被挂起,只需要客户端记录下每个数据块的上传状态,下次重启上传进程时,继续上传那些还未上传成功的数据块即可,这样就实现了断点上传功能。另外,通过这个接口,还可以实现大文件的并发上传、向OSS流式地写入数据等功能,有兴趣的读者可以自己实现一下。
后记
希望通过这篇文章,大家可以对如何使用OSS进行大文件的断点下载和上传的方法有所了解,也希望更多的朋友能分享更多更好的使用OSS的经验。
注释:[1]为了便于理解,本文的代码实例忽略了一些简单的出错处理以及极端情况的判断逻辑。
[2] 如果其他参数都合法,这个请求将符合get object请求的语法,OSS会返回整个object的内容,而不是用户期望的部分数据。
相关链接:
关于本文更加详细介绍:http://www.educity.cn/wulianwang/1465065.html
OSS API 文档简介:
https://help.aliyun.com/document_detail/31947.html?spm=5176.788315709.6.222.oxsSDy