在某些场景中,用户需要对oss上的某个大文件抽取出某些信息, 本教程以oss上的一个大小为400M,内含105万条记录的csv文件为例,讲解如果利用函数计算流式处理, 查找出包含'Lilly'的记录。
一,单线程分块读取处理
# coding=utf-8
import re
import json
import oss2
import logging
import time
import csv
CHUNK_SIZE = 1024*8
# 函数服务主函数
def handler(event, context):
start = time.time()
endpoint = 'oss-cn-shanghai-internal.aliyuncs.com'
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, endpoint, 'demo-oss')
start = time.time()
r = bucket.get_object('sampleusers_10m_2.csv')
last_str = ''
result = []
for data in r:
#while 1:
# 函数需要的内存跟CHUNK_SIZE有关,这边8k实验速度最快,当oss默认的chunk速度不合适的时候,可以尝试换下chunk大小
# data = r.read(CHUNK_SIZE)
if not data:
break
msg = (last_str + data) if last_str else data
rows = msg.splitlines()
if data[-1] != '\n':
# 本数据块剩下的字符串,和下一个数据块首部一部分是完整的一行
last_str = rows[-1]
rows = rows[:-1]
else:
last_str = ''
for row in rows:
if 'Lilly' in row:
result.append(row)
# 如果使用csv库中可以转为可读性更好的dict的处理,可以支持更强的查询,但是性能很差,40s+
# 如果真的需要强查询,这边可以结合'Lilly' in row,再csv.DictReader,能取个中间效果
# 或者这边可以用正则来处理一些复杂查询
#f_csv = csv.DictReader(rows)
#for row in f_csv:
# if row['name'] == 'Micheal Swanson':
# result.append(row)
cost_time = time.time() - start
print cost_time
return result
上面这个例子调用100次数据,消耗时间如下:
avg: 6.19895411968
min: 5.37684202194
max: 6.4414331913
二,分治法
将大数据分割,使用尽量等分的规则将part data分别交给子函数函数query_part处理,然后将结果汇总
主函数query
# -*- coding: utf-8 -*-
import logging
import fc
import json
import oss2
import math
import threading
import time
result = []
# 文件一行最大为可能512bytes,这是一个尝试值
MAX_PER_ROW_BYTES = 512
class MyThread(threading.Thread):
def run(self):
r = self.client.invoke_function('ApiFc', 'query_part', self.payload)
global result
result.extend(json.loads(r))
def set_init_data(self, client, payload):
self.client = client
self.payload = payload
def handler(event, context):
file_name = 'sampleusers_10m_2.csv'
# 分块的大小以event的参数传过来
part_num = int(event)
creds = context.credentials
client = fc.Client(
endpoint='<your account id>.cn-shanghai-internal.fc.aliyuncs.com',
accessKeyID=creds.accessKeyId,
accessKeySecret=creds.accessKeySecret,
securityToken=creds.securityToken
)
# get bucket length
endpoint = 'oss-cn-shanghai-internal.aliyuncs.com'
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, endpoint, 'demo-oss')
r = bucket.get_object(file_name)
bytes_length = int(r.stream.fileobj.response.headers['Content-Length'])
# multiThread process
global result
result = []
threads = []
PART_SIZE = int( math.ceil( float(bytes_length)/part_num ) )
begin = 0
end = PART_SIZE-1
for i in xrange(part_num):
# 这边分块一定要科学,就是每一块都是没有残余字符串,即分出来的块是full lines
off_set = -1
if end < bytes_length - 1:
for i in range(10000):
if off_set == -1:
# 每次以MAX_PER_ROW_BYTES大小去找到本数据块的最后一个'\n'
r_s = bucket.get_object(file_name, byte_range=(end+MAX_PER_ROW_BYTES*i, end+MAX_PER_ROW_BYTES*(i+1)))
data = r_s.read()
off_set = data.find('\n')
if off_set != -1:
break
end = end + off_set
payload={'begin':begin, 'end':end}
t = MyThread()
t.set_init_data(client, json.dumps(payload))
t.start()
threads.append(t)
begin = end + 1
end = end + PART_SIZE - 1
if end >= bytes_length:
end = bytes_length-1
if begin >= bytes_length:
break
for t in threads:
t.join()
return result
具体的查询每个数据块的函数query_part代码如下:
# coding=utf-8
import re
import json
import oss2
import time
import csv
CHUNK_SIZE = 1024*8
# 函数服务主函数
def handler(event, context):
evt = json.loads(event)
begin = evt['begin']
end = evt['end']
endpoint = 'oss-cn-shanghai-internal.aliyuncs.com'
creds = context.credentials
auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
bucket = oss2.Bucket(auth, endpoint, 'demo-oss')
start = time.time()
r = bucket.get_object('sampleusers_10m_2.csv', byte_range=(begin, end))
last_str = ''
result = []
for data in r:
#while 1:
# data = r.read(CHUNK_SIZE)
if not data:
break
msg = (last_str + data) if last_str else data
rows = msg.splitlines()
if data[-1] != '\n':
last_str = rows[-1]
rows = rows[:-1]
else:
last_str = ''
for row in rows:
if 'Lilly' in row:
result.append(row)
return result
上面这个例子调用100次数据,分块的数目设置为16的时候,消耗时间如下:
avg: 0.684439804554
min: 0.556694984436
max: 0.858777999878
本地评估invoke的时间的代码如下:
注意:记得修改query主函数的返回值是函数执行时间
#coding=utf-8
import os
command = 'fcli function invoke -f query -s ApiFc --event-str 16'
sum = 0
NUM = 100
time_lst = []
min_t = 10000000
max_t = 0
for i in xrange(NUM):
r = os.popen(command)
info = r.readlines()
for line in info:
line = line.strip('\r\n')
t = float(line)
if t > max_t:
max_t = t
if t < min_t:
min_t = t
sum += t
time_lst.append(t)
print "avg: ", sum/NUM
print "min: ", min_t
print "max: ", max_t
print "all: ", time_lst
时间: 2024-09-25 12:00:29