Datahub Python SDK入门手册

前言

DataHub是 MaxCompute 提供的流式数据处理(Streaming Data)服务,它提供流式数据的发布 (Publish)和订阅 (Subscribe)的功能,让您可以轻松构建基于流式数据的分析和应用。DataHub 可以对各种移动设备,应用软件,网站服务,传感器等产生的大量流式数据进行持续不断的采集,存储和处理。用户可以编写应用程序或者使用流计算引擎来处理写入到 DataHub 的流式数据比如实时web访问日志、应用日志、各种事件等,并产出各种实时的数据处理结果比如实时图表、报警信息、实时统计等。用户编写Datahub应用程序最简单直接的方式就是基于Datahub SDK进行,目前Datahub官方提供的SDK包括C++ SDK和Java SDK,随着越来越多的Pythoner使用Datahub,Python版本Datahub SDK需求量也日益上升,这里就告诉各位Pythoner们一个好消息,Datahub官方Python SDK Beta正式Release(Github地址),使用非常简单,这里做个入门介绍,大家如有任何疑问随时在Github上提问留言。

安装

快速安装

$ sudo pip install pydatahub

源码安装

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install

安装验证

$ python -c "from datahub import DataHub"

如果上述命令执行成功,恭喜你安装Datahub Python版本SDK成功!

基本概念

详见: https://help.aliyun.com/document_detail/47440.html?spm=5176.product27797.3.2.VGxgya

准备工作

  • 访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供访问的服务地址。
  • 创建Project
    • 登陆Datahub WebConsole页面,创建Project
  • 初始化Datahub
import sys
import traceback

from datahub import DataHub
from datahub.utils import Configer
from datahub.models import Topic, RecordType, FieldType, RecordSchema, BlobRecord, TupleRecord, CursorType
from datahub.errors import DatahubException, ObjectAlreadyExistException

access_id = your access id
access_key = your access key
endpoint = your datahub server endpoint
dh = DataHub(access_id, access_key, endpoint)

Topic操作

Tuple Topic

  • Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:
类型 含义 值域
Bigint 8字节有符号整型。请不要使用整型的最小值 (-9223372036854775808),这是系统保留值。 -9223372036854775807 ~ 9223372036854775807
String 字符串,只支持UTF-8编码。 单个String列最长允许1MB。
Boolean 布尔型。 可以表示为True/False,true/false, 0/1
Double 8字节双精度浮点数。 -1.0 10308 ~ 1.0 10308
TimeStamp 时间戳类型 表示到微秒的时间戳类型
  • 创建示例
topic = Topic(name=topic_name)
topic.project_name = project_name
topic.shard_count = 3
topic.life_cycle = 7
topic.record_type = RecordType.TUPLE
topic.record_schema = RecordSchema.from_lists(['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'], [Fie
ldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])

try:
    dh.create_topic(topic)
    print "create topic success!"
    print "=======================================\n\n"
except ObjectAlreadyExistException, e:
    print "topic already exist!"
    print "=======================================\n\n"
except Exception, e:
    print traceback.format_exc()
    sys.exit(-1)

Blob Topic

  • Blob类型Topic支持写入一块二进制数据作为一个Record,数据将会以BASE64编码传输。
topic = Topic(name=topic_name)
topic.project_name = project_name
topic.shard_count = 3
topic.life_cycle = 7
topic.record_type = RecordType.BLOB

try:
    dh.create_topic(topic)
    print "create topic success!"
    print "=======================================\n\n"
except ObjectAlreadyExistException, e:
    print "topic already exist!"
    print "=======================================\n\n"
except Exception, e:
    print traceback.format_exc()
    sys.exit(-1)

数据发布/订阅

获取Shard列表

  • list_shards接口获取topic下的所有shard
shards = dh.list_shards(project_name, topic_name)

返回结果是一个List对象,每个元素是一个shard,可以获取shard_id,state状态,begin_hash_key,end_hash_key等信息

发布数据

  • put_records接口向一个topic发布数据
failed_indexs = dh.put_records(project_name, topic_name, records)

其中传入参数records是一个List对象,每个元素为一个record,但是必须为相同类型的record,即Tuple类型或者Blob类型,返回结果为写入失败记录的数组下标

  • 写入Tuple类型Record示例
try:
    # block等待所有shard状态ready
    dh.wait_shards_ready(project_name, topic_name)
    print "shards all ready!!!"
    print "=======================================\n\n"

    topic = dh.get_topic(topic_name, project_name)
    print "get topic suc! topic=%s" % str(topic)
    if topic.record_type != RecordType.TUPLE:
        print "topic type illegal!"
        sys.exit(-1)
    print "=======================================\n\n"

    shards = dh.list_shards(project_name, topic_name)
    for shard in shards:
        print shard
    print "=======================================\n\n"

    records = []

    record0 = TupleRecord(schema=topic.record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.shard_id = shards[0].shard_id
    record0.put_attribute('AK', '47')
    records.append(record0)

    record1 = TupleRecord(schema=topic.record_schema)
    record1['bigint_field'] = 2
    record1['string_field'] = 'yc2'
    record1['double_field'] = 10.02
    record1['bool_field'] = False
    record1['time_field'] = 1455869335000011
    record1.shard_id = shards[1].shard_id
    records.append(record1)

    record2 = TupleRecord(schema=topic.record_schema)
    record2['bigint_field'] = 3
    record2['string_field'] = 'yc3'
    record2['double_field'] = 10.03
    record2['bool_field'] = False
    record2['time_field'] = 1455869335000013
    record2.shard_id = shards[2].shard_id
    records.append(record2)

    failed_indexs = dh.put_records(project_name, topic_name, records)
    print "put tuple %d records, failed list: %s" %(len(records), failed_indexs)
    # failed_indexs如果非空最好对failed record再进行重试
    print "=======================================\n\n"
except DatahubException, e:
    print traceback.format_exc()
    sys.exit(-1)
else:
    sys.exit(-1)

获取cursor

  • 获取Cursor,可以通过三种方式获取:OLDEST, LATEST, SYSTEM_TIME

    • OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
    • LATEST: 表示获取的cursor指向当前最新的record
    • SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, shard_id)

通过get_cursor接口获取用于读取指定位置之后数据的cursor

订阅数据

  • 从指定shard读取数据,需要指定从哪个Cursor开始读,并指定读取的上限数据条数,如果从Cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。
dh.get_records(topic, shard_id, cursor, 10)
  • 消费Tuple类型Record示例
try:
    # block等待所有shard状态ready
    dh.wait_shards_ready(project_name, topic_name)
    print "shards all ready!!!"
    print "=======================================\n\n"

    topic = dh.get_topic(topic_name, project_name)
    print "get topic suc! topic=%s" % str(topic)
    if topic.record_type != RecordType.TUPLE:
        print "topic type illegal!"
        sys.exit(-1)
    print "=======================================\n\n"

    cursor = dh.get_cursor(project_name, topic_name, CursorType.OLDEST, '0')
    while True:
        (record_list, record_num, next_cursor) = dh.get_records(topic, '0', cursor, 10)
        for record in record_list:
            print record
        if 0 == record_num:
            time.sleep(1)
        cursor = next_cursor

except DatahubException, e:
    print traceback.format_exc()
    sys.exit(-1)
else:
    sys.exit(-1)

结尾

时间: 2025-01-06 22:29:07

Datahub Python SDK入门手册的相关文章

日志服务使用Python SDK快速入门指南

为快速开始使用日志服务Python SDK,请按照如下步骤进行: 创建阿里云账号 为了访问阿里云日志服务,你需要有一个阿里云账号.如果没有,可首先如下创建阿里云账号: 访问阿里云官方网站,点击页面上"注册"按钮. 按照屏幕提示完成注册流程并进行实名认证. 为了更好地使用阿里云服务,建议尽快完成实名认证,否则部分阿里云服务将无法使用.具体实名认证流程请参考这里 获取阿里云访问秘钥 为了使用SDK,你必须申请阿里云的访问秘钥: 登陆阿里云管理控制台. 访问阿里云秘钥管理页面. 选择一对用于

阿里云资源编排服务Python SDK使用入门

阿里云资源编排服务(ROS)为我们维护云计算资源提供了一个低成本.可靠.标准化的方案.基于ROS提供的能力,我们只要编写和维护资源模板文件,就可以达到维护云计算资源的目的,而不再需要花费很多的时间通过控制台来人肉配置.ROS为一些场景下的资源维护提供了不同的思路和可能性,比如很多需要大量临时计算资源的场景,使用ROS将使整个资源申请.应用构建.资源释放过程非常简单. ROS同时提供RESTful API和SDK,这使得我们可以很容易的把ROS对于资源的操作能力集成到我们的应用中.下面我们通过示例

OAS的使用——Python SDK

OAS的使用--Python SDK [TOC] 当需要向OAS备份归档的文件量非常大的时候,通过web控制台和命令行工具来完成是不可能的,这时候需要使用OAS提供的SDK编写操作代码来实现,SDK包括Python SDK和Java SDK,本文主要讲解Python SDK, Java SDK的使用可以参考官方文档中的开发者工具和[最佳实践](https://docs.aliyun.com/?spm=5176.383338.201.102.NNmH36#/pub/oas/best_practic

Access 2007简易入门手册

我只想告诉你我非常喜欢这本书.我对Microsoft Access的经验足以让我跳过这本傻瓜系列教材,但是实际情况是,我非常享受于这本书中介绍Access 2007新的用户界面的时候被过分宠坏的感觉.当然,我已经安装了它,并且已经试了试新的Ribbons和菜单,但是读了第二章(Getting Started, Getting Around)两遍之后,我才适应新的浏览样式. 这本入门手册的最大的特点是一整套吸引注意力的页边的图标,比如提示.警告和值得注意的知识点.但是并没有"Access 2007

Python基础入门之seed()方法的使用

 这篇文章主要介绍了Python基础入门之seed()方法的使用,是Python学习当中的基础知识,需要的朋友可以参考下     seed() 设置生成随机数用的整数起始值.调用任何其他random模块函数之前调用这个函数. 语法 以下是seed()方法的语法: ? 1 seed ( [x] ) 注意:此函数是无法直接访问的,所以需要导入seed模块,然后需要使用random静态对象来调用这个函数. 参数 x -- 这是下一个随机数的种子.如果省略,则需要系统时间,以产生下一个随机数. 返回值

Python编程入门的一些基本知识

  这篇文章主要介绍了Python编程入门的一些基本知识,包括注释需和Shell命令使用等基本内容,要的朋友可以参考下 Python与Perl,C和Java语言等有许多相似之处.不过,也有语言之间有一些明确的区别.本章的目的是让你迅速学习Python的语法. 第一个Python程序: 交互模式编程: 调用解释器不经过脚本文件作为参数,显示以下提示: ? 1 2 3 4 5 $ python Python 2.6.4 (#1, Nov 11 2014, 13:34:43) [GCC 4.1.2 2

《树莓派Python编程入门与实战》——2.3 使用Raspbian图形用户界面

2.3 使用Raspbian图形用户界面 树莓派Python编程入门与实战 默认情况下,当你启动树莓派并登录后会进入到Linux命令行.但是树莓派同样还有一个图形用户界面(GUI,Graphical User Interface). 为了打开图形界面,你需要在命令行键入startx并且回车.然后轻量级的X11桌面环境(LXDE,Lightweight X11 Desktop Environment)就启动了,你可以看到一个类似图2.1的图形界面. 提示: Linux桌面环境 一个关于Linux的

《树莓派Python编程入门与实战》——3.7 创建Python脚本

3.7 创建Python脚本 树莓派Python编程入门与实战 你可以将Python语句写入文件后再批量运行它们,而不是在每次需要运行程序的时候都一行一行输入进去.这些包含Python语句的文件叫作脚本. 你可以通过Python交互式shell或者用IDLE运行这些Python脚本.清单3.3显示了名为sample.py的脚本文件,它包含两个语句. 清单3.3 sample.py脚本 pi@raspberrypi ~ $ cat py3prog/sample.py print ("Here is

SharePoint 2013 入门教程之入门手册

原文:SharePoint 2013 入门教程之入门手册 当我们搭建完环境,创建应用程序和网站集后,就已经正式开启了我们的SharePoint之旅了,进入网站以后,开始基本的使用.设置,了解SharePoint相关特性,下面,来简单了解下SharePoint吧. 1.  创建网站集完毕,打开首页,如图1: (图1 SharePoint 发布站点首页)  如上图,可能画的比较乱,11是共享本网站,12是关注此网站,12右边的是最大化内容(也就是编号5,效果如图2). 看图比较乱,上面的东西几乎都是