如何使用函数计算实现流式处理large文件

在某些场景中,用户需要对oss上的某个大文件抽取出某些信息, 本教程以oss上的一个大小为400M,内含105万条记录的csv文件为例,讲解如果利用函数计算流式处理, 查找出包含'Lilly'的记录。

一,单线程分块读取处理
# coding=utf-8
import re
import json
import oss2
import logging
import time
import csv

CHUNK_SIZE = 1024*8

 # 函数服务主函数
def handler(event, context):
    start = time.time()
    endpoint = 'oss-cn-shanghai-internal.aliyuncs.com'
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, endpoint, 'demo-oss')
    start = time.time()
    r = bucket.get_object('sampleusers_10m_2.csv')
    last_str = ''
    result = []
    for data in r:
    #while 1:
    # 函数需要的内存跟CHUNK_SIZE有关,这边8k实验速度最快,当oss默认的chunk速度不合适的时候,可以尝试换下chunk大小
    #    data = r.read(CHUNK_SIZE)
        if not data:
            break
        msg = (last_str + data) if last_str else data
        rows = msg.splitlines()
        if data[-1] != '\n':
            # 本数据块剩下的字符串,和下一个数据块首部一部分是完整的一行
            last_str = rows[-1]
            rows = rows[:-1]
        else:
            last_str = ''

        for row in rows:
            if 'Lilly' in row:
                result.append(row)
        # 如果使用csv库中可以转为可读性更好的dict的处理,可以支持更强的查询,但是性能很差,40s+
        # 如果真的需要强查询,这边可以结合'Lilly' in row,再csv.DictReader,能取个中间效果
        # 或者这边可以用正则来处理一些复杂查询
        #f_csv = csv.DictReader(rows)
        #for row in f_csv:
        #    if row['name'] == 'Micheal Swanson':
        #        result.append(row)
    cost_time = time.time() - start
    print cost_time
    return result

上面这个例子调用100次数据,消耗时间如下:

avg: 6.19895411968

min: 5.37684202194

max: 6.4414331913

二,分治法

将大数据分割,使用尽量等分的规则将part data分别交给子函数函数query_part处理,然后将结果汇总

主函数query
# -*- coding: utf-8 -*-
import logging
import fc
import json
import oss2
import math
import threading
import time

result = []

# 文件一行最大为可能512bytes,这是一个尝试值
MAX_PER_ROW_BYTES = 512

class MyThread(threading.Thread):
    def run(self):
        r = self.client.invoke_function('ApiFc', 'query_part', self.payload)
        global result
        result.extend(json.loads(r))

    def set_init_data(self, client, payload):
        self.client = client
        self.payload = payload

def handler(event, context):
    file_name = 'sampleusers_10m_2.csv'
    # 分块的大小以event的参数传过来
    part_num = int(event)
    creds = context.credentials
    client = fc.Client(
            endpoint='<your account id>.cn-shanghai-internal.fc.aliyuncs.com',
            accessKeyID=creds.accessKeyId,
            accessKeySecret=creds.accessKeySecret,
            securityToken=creds.securityToken
        )

    # get bucket length
    endpoint = 'oss-cn-shanghai-internal.aliyuncs.com'
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, endpoint, 'demo-oss')
    r = bucket.get_object(file_name)
    bytes_length = int(r.stream.fileobj.response.headers['Content-Length'])

    # multiThread process
    global result
    result = []
    threads = []
    PART_SIZE = int( math.ceil( float(bytes_length)/part_num ) )

    begin = 0
    end = PART_SIZE-1

    for i in xrange(part_num):
        # 这边分块一定要科学,就是每一块都是没有残余字符串,即分出来的块是full lines
        off_set = -1
        if end < bytes_length - 1:
            for i in range(10000):
                if off_set == -1:
                    # 每次以MAX_PER_ROW_BYTES大小去找到本数据块的最后一个'\n'
                    r_s = bucket.get_object(file_name, byte_range=(end+MAX_PER_ROW_BYTES*i, end+MAX_PER_ROW_BYTES*(i+1)))
                    data = r_s.read()
                    off_set = data.find('\n')

                if off_set != -1:
                    break

            end = end + off_set

        payload={'begin':begin, 'end':end}
        t = MyThread()
        t.set_init_data(client, json.dumps(payload))
        t.start()
        threads.append(t)

        begin = end + 1
        end = end + PART_SIZE - 1
        if end >= bytes_length:
            end = bytes_length-1

        if begin >= bytes_length:
            break

    for t in threads:
        t.join()

    return result

具体的查询每个数据块的函数query_part代码如下:

# coding=utf-8
import re
import json
import oss2
import time
import csv

CHUNK_SIZE = 1024*8

 # 函数服务主函数
def handler(event, context):
    evt = json.loads(event)
    begin = evt['begin']
    end = evt['end']
    endpoint = 'oss-cn-shanghai-internal.aliyuncs.com'
    creds = context.credentials
    auth = oss2.StsAuth(creds.accessKeyId, creds.accessKeySecret, creds.securityToken)
    bucket = oss2.Bucket(auth, endpoint, 'demo-oss')
    start = time.time()
    r = bucket.get_object('sampleusers_10m_2.csv', byte_range=(begin, end))
    last_str = ''
    result = []
    for data in r:
    #while 1:
    #    data = r.read(CHUNK_SIZE)
        if not data:
            break
        msg = (last_str + data) if last_str else data
        rows = msg.splitlines()
        if data[-1] != '\n':
            last_str = rows[-1]
            rows = rows[:-1]
        else:
            last_str = ''

        for row in rows:
            if 'Lilly' in row:
                result.append(row)
    return result

上面这个例子调用100次数据,分块的数目设置为16的时候,消耗时间如下:

avg: 0.684439804554

min: 0.556694984436

max: 0.858777999878

本地评估invoke的时间的代码如下:

注意:记得修改query主函数的返回值是函数执行时间

#coding=utf-8
import os

command = 'fcli function invoke -f query  -s ApiFc --event-str 16'
sum = 0
NUM = 100
time_lst = []
min_t = 10000000
max_t = 0

for i in xrange(NUM):
    r = os.popen(command)
    info = r.readlines()
    for line in info:
        line = line.strip('\r\n')
        t = float(line)
        if t > max_t:
            max_t = t
        if t < min_t:
            min_t = t
        sum += t
        time_lst.append(t)

print "avg: ", sum/NUM
print "min: ", min_t
print "max: ", max_t
print "all: ", time_lst
时间: 2024-09-25 12:00:29

如何使用函数计算实现流式处理large文件的相关文章

流式计算的系统设计和实现

阿里云数据事业部强琦为大家带来题为"流式计算的系统设计与实现"的演讲,本文主要从增量计算和流式计算开始谈起,然后讲解了与批量计算的区别,重点对典型系统技术概要进行了分析,包括Storm.Kinesis.MillWheel,接着介绍了核心技术.消息机制以及StreamSQL等,一起来了解下吧.   增量计算和流式计算 流式计算 流计算对于时效性要求比较严格,实时计算就是对计算的时效性要求比较强.流计算是利用分布式的思想和方法,对海量"流"式数据进行实时处理的系统,它源

Spark Streaming 流式计算实战

这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容.  业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名称,然后根据这两个信息形成 userName/year/month/day/hh/normal  userName/year/month/day/hh/delay 路径,存储到HDFS中.如果我们发现日志产生的时间和到达的时间相差超过的一定的阈值,那么会放到 delay 目录,否则放在正常的 no

HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)

标签 PostgreSQL , HTAP , OLTP , OLAP , 场景与性能测试 背景 PostgreSQL是一个历史悠久的数据库,历史可以追溯到1973年,最早由2014计算机图灵奖得主,关系数据库的鼻祖Michael_Stonebraker 操刀设计,PostgreSQL具备与Oracle类似的功能.性能.架构以及稳定性. PostgreSQL社区的贡献者众多,来自全球各个行业,历经数年,PostgreSQL 每年发布一个大版本,以持久的生命力和稳定性著称. 2017年10月,Pos

PgSQL · 应用案例 · 流式计算与异步消息在阿里实时订单监测中的应用

背景 在很多业务系统中,为了定位问题.运营需要.分析需要或者其他需求,会在业务中设置埋点,记录用户的行为在业务系统中产生的日志,也叫FEED日志. 比如订单系统.在业务系统中环环相扣,从购物车.下单.付款.发货,收货(还有纠纷.退款等等),一笔订单通常会产生若干相关联的记录. 每个环节产生的属性可能是不一样的,有可能有新的属性产生,也有可能变更已有的属性值. 为了便于分析,通常有必要将订单在整个过程中产生的若干记录(若干属性),合并成一条记录(订单大宽表). 通常业务系统会将实时产生的订单FEE

双11数据大屏背后的秘密:大规模流式增量计算及应用

首先从理解什么是数据流开始今天的分享,其实在真实的世界中,大部分的数据都是连续产生的数据流,比如手机上产生的GPS信号.用户在互联网上的行为.在线搜索.用户的点击.社交网络分享.即时通信以及一些传感器和物联网设备采集的日志信息等,这些数据都是连续产生的,自然就形成了数据流,在这些数据流产生以后,在很多场景下对于数据流的实时分析就会产生很大的价值. 接下来从大家比较熟悉的数据场景切入,比如大家经常会关注的股市情况,股价的波动其实就是实时数据的分析和聚合,除此之外大家在日常生活中还可能比较关心天气预

双11数据大屏背后:大规模流式增量计算及应用(附资料)

首先从理解什么是数据流开始今天的分享,其实在真实的世界中,大部分的数据都是连续产生的数据流,比如手机上产生的GPS信号.用户在互联网上的行为.在线搜索.用户的点击.社交网络分享.即时通信以及一些传感器和物联网设备采集的日志信息等,这些数据都是连续产生的,自然就形成了数据流,在这些数据流产生以后,在很多场景下对于数据流的实时分析就会产生很大的价值. 接下来从大家比较熟悉的数据场景切入,比如大家经常会关注的股市情况,股价的波动其实就是实时数据的分析和聚合,除此之外大家在日常生活中还可能比较关心天气预

观点:流式计算推动实时处理商业变革

在这一年,我们看到众多厂商工作重点主要是围绕整合Hadoop或NoSQL数据处理引擎以及改善基本的数据存储.Hadoop最成功的一点就是其采用了MapReduce.MapReduce是一种处理超大型数据集并生成相关执行的编程模型,MapReduce的核心思想主要是借鉴了函数是编程语言以及矢量变成语言里的特性. 现今包括Microsoft.IBM.Oracle.Cloudera.MapR等众多厂商相继推出了与自身相结合的Hadoop产品.例如Oracle NoSQL Database,其是Orac

2016美国QCon看法:在Beam上,我为什么说Google有统一流式计算的野心

编者按:流式计算(Stream Processing)在经历了若干年的发展之后,已经有了比较完整的生态,如开源的Storm, Flink, Spark等,未开源的如Google的DataFlow,几乎每个巨头都有自己的流式计算系统.生态虽繁荣但分散,各个平台之间也是互不兼容的,一个平台上写的程序很难移植到另外一个平台,这些领域难题再加上Google大一统流式计算的野心催生了Apache孵化器的新项目Beam.            Google是最早实践大数据的公司,目前大数据繁荣的生态很大一部

漫谈流式计算的一致性

参考, http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/ http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/   对于batch分析,fault-tolerant很容易做,失败只需要replay,就可以完美做到