利用PyODPS 获取项目的元信息

刚进公司的一段时间,需要每天与ODPS数据表打交道。限于团队中文档梳理有限,所以通过PyODPS制作了3张元信息表:meta_tablesmeta_fieldsmeta_instances,供自己及团队同学使用。其中,主要利用了odps.list_tables()odps.list_instances()

表结构

  • meta_tables:分区表,每天定时对全部数据表进行更新。可用于项目下表容量盘查、旧表名单拉取、人员贡献统计。
字段 类型 注释
tbl_name string 表名
tbl_comment string 表注释
tbl_owner string 作者
tbl_pt_name string (如果是分区表)分区名
tbl_ddl_tm datetime 最近创建时间
tbl_mod_tm datetime 最近更新时间
etl_tm datetime ETL时间
pt string 按日期分区
  • meta_fields:分区表,每天定时对近24小时有过表结构修改的表进行更新。可用于字段类型、注释是否一致的校验。
字段 类型 注释
fld_name string 字段名
fld_type string 字段类型
fld_comment string 字段注释
etl_tm datetime ETL时间
tbl_name string 按表名分区
  • meta_instances:分区表,每天定时遍历00:00~06:00实例。可用于表间血缘关系提取、实例耗时统计。
字段 类型 注释
ins_name string 实例名
start_tm datetime 开始时间
end_tm datetime 结束时间
cost_tm bigint 总耗时(秒)
status string 实例状态
ins_owner string 作者
tsk_name string 子任务
tbl_in string 输入表(以,分割)
tbl_out string 输出表(以,分割)
etl_tm datetime ETL时间
pt string 按日期分区

获取代码

  • get_table_meta.py
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_table_meta.py
# 调度: 每日早6点调度
# 日志: get_table_meta.log
# ###########################################################################################

import os
from datetime import datetime
from odps import ODPS
from odps.models import Schema, Column, Partition

start_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur_path = os.path.split(os.path.realpath(__file__))[0]

odps = ODPS(access_id='<access_id>',
            secret_access_key='<access_key>',
            project='<project>',
            endpoint='http://service.odps.aliyun.com/api',
            tunnel_endpoint='http://dt.odps.aliyun.com')

to_table = 'meta_tables'
columns = [Column(name='tbl_name', type='string', comment='表名'),
           Column(name='tbl_comment', type='string', comment='表注释'),
           Column(name='tbl_owner', type='string', comment='作者'),
           Column(name='tbl_pt_name', type='string', comment='(如果是分区表)分区名'),
           Column(name='tbl_ddl_tm', type='datetime', comment='最近创建时间'),
           Column(name='tbl_mod_tm', type='datetime', comment='最近更新时间'),
           Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='pt', type='string', comment='按日期分区')]
schema = Schema(columns=columns, partitions=partitions)

records = []
try:
    for tbl in odps.list_tables():
        tm = datetime.now()
        records.append([tbl.name, tbl.comment, tbl.owner.split(':')[-1],
                        tbl.schema.partitions[0].name if tbl.schema.partitions else None,
                        tbl.last_meta_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
                        tbl.last_modified_time.strftime('%Y-%m-%d %H:%M:%S'),
                        tm.strftime('%Y-%m-%d %H:%M:%S')])
    partition = '%s=%s' % (partitions[0].name, datetime.now().strftime('%Y%m%d'))
    to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
    to_tbl.delete_partition(partition, if_exists=True)
    odps.write_table(to_table, records, partition=partition, create_partition=True)

except:
    status = 'failed'
    n = 0
else:
    status = 'succeed'
    n = len(records)

end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_table_meta.log'), 'a')
f.write("Update {status} with {n} tables from {start} to {end}\n".format(**log))
f.close()
  • get_field_meta.py
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_field_meta.py
# 调度: 每日早6点调度
# 日志: get_field_meta.log
# ###########################################################################################

import os
from datetime import datetime
from odps import ODPS
from odps.models import Schema, Column, Partition

start_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cur_path = os.path.split(os.path.realpath(__file__))[0]

to_table = 'meta_fields'
odps = ODPS(access_id='<access_id>',
            secret_access_key='<access_key>',
            project='<project>',
            endpoint='http://service.odps.aliyun.com/api',
            tunnel_endpoint='http://dt.odps.aliyun.com')

columns = [Column(name='fld_name', type='string', comment='字段名'),
           Column(name='fld_type', type='string', comment='字段类型'),
           Column(name='fld_comment', type='string', comment='字段注释'),
           Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='tbl_name', type='string', comment='表名')]
schema = Schema(columns=columns, partitions=partitions)
try:
    to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
    n = 0
    for tbl in odps.list_tables():
        tm = datetime.now()
        last_mod_tm = tbl.last_meta_modified_time
        if (tm - last_mod_tm).days > 0:
            continue
        else:
            cols = tbl.schema.get_columns()
            records = [[col.name, str(col.type).lower(), col.comment, tm.strftime('%Y-%m-%d %H:%M:%S')] for col in cols]
            partition = '%s=%s' %(partitions[0].name, tbl.name)
            to_tbl.delete_partition(partition, if_exists=True)
            odps.write_table(to_table, records, partition=partition, create_partition=True)
            n +=1
except:
    status = 'failed'
    n = 0
else:
    status = 'succeed'

end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_field_meta.log'), 'a')
f.write("Update {status} with {n} tables from {start} to {end}\n".format(**log))
f.close()
  • get_instance_meta.py
#!/usr/local/anaconda3/bin/python3
# -*- coding: utf-8 -*-
# ###########################################################################################
# 执行环境:分析服务器
# 脚本: get_instance_meta.py
# 调度: 每日早6点调度
# 日志: get_instance_meta.log
# ###########################################################################################

import os
import re
from datetime import datetime, date, time, timedelta
from odps import ODPS
from odps.models import Schema, Column, Partition

start_tm = datetime.now()
today_min = datetime.combine(date.today(), time.min)
cur_path = os.path.split(os.path.realpath(__file__))[0]

to_table = 'meta_instances'
odps = ODPS(access_id='<access_id>',
            secret_access_key='<access_key>',
            project='<project>',
            endpoint='http://service.odps.aliyun.com/api',
            tunnel_endpoint='http://dt.odps.aliyun.com')

columns = [Column(name='ins_name', type='string', comment='实例名'),
           Column(name='start_tm', type='datetime', comment='开始时间'),
           Column(name='end_tm', type='datetime', comment='结束时间'),
           Column(name='cost_tm', type='bigint', comment='总耗时(秒)'),
           Column(name='status', type='string', comment='实例状态'),
           Column(name='ins_owner', type='string', comment='作者'),
           Column(name='tsk_name', type='string', comment='子任务'),
           Column(name='tbl_in', type='string', comment='输入表(以,分割)'),
           Column(name='tbl_out', type='string', comment='输出表(以,分割)'),
           Column(name='etl_tm', type='datetime', comment='ETL时间')]
partitions = [Partition(name='pt', type='string', comment='按日期分区')]
schema = Schema(columns=columns, partitions=partitions)

records = []
try:
    for ins in odps.list_instances(start_time=today_min,
                                   end_time=start_tm,
                                   only_owner=False,
                                   status='Terminated'):
        tsk_name_filter = [re.match('console_query_task', tsk) for tsk in ins.get_task_names()]
        try:
            tsk_output_filter = [ins.get_task_summary(tsk) if not ins.get_task_summary(tsk)
                                 else ins.get_task_summary(tsk).get('Outputs')
                                 for tsk in ins.get_task_names()]
        except:
            continue
        else:
            pass
        # 这里过滤了没有输入表、输出表的实例。这段代码初衷就是提取表间依赖关系,所以没有考虑所有实例
        if ins.is_successful() and any(tsk_name_filter) and any(tsk_output_filter):
            start_time = ins.start_time + timedelta(hours=8)
            end_time = ins.end_time + timedelta(hours=8)
            tbl_in = set()
            tbl_out = set()
            for tsk in ins.get_task_names():
                smy = ins.get_task_summary(tsk)
                tbl_in.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) for key in smy['Inputs'].keys()])
                tbl_out.update([re.match(r'^[\w].*\.([\w]+).*$', key).group(1) for key in smy['Outputs'].keys()])
            records.append([ins.name,
                            start_time.strftime('%Y-%m-%d %H:%M:%S'),
                            end_time.strftime('%Y-%m-%d %H:%M:%S'),
                            (end_time - start_time).seconds,
                            ins.status.value.lower(),
                            ins.owner.split(':')[-1],
                            ','.join(ins.get_task_names()) if ins.get_task_names() else None,
                            ','.join(tbl_in) if tbl_in else None,
                            ','.join(tbl_out) if tbl_out else None,
                            datetime.now().strftime('%Y-%m-%d %H:%M:%S')])
    partition = '%s=%s' % (partitions[0].name, start_tm.strftime('%Y%m%d'))
    to_tbl = odps.create_table(to_table, schema, if_not_exists=True)
    to_tbl.delete_partition(partition, if_exists=True)
    odps.write_table(to_table, records, partition=partition, create_partition=True)

except:
    status = 'failed'
    n = 0
else:
    status = 'succeed'
    n = len(records)

end_tm = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log = {'status': status, 'n': n, 'start': start_tm, 'end': end_tm}
f = open(os.path.join(cur_path, 'get_field_meta.log'), 'a')
f.write("Update {status} with {n} instances from {start} to {end}\n".format(**log))
f.close()
时间: 2024-09-20 06:44:18

利用PyODPS 获取项目的元信息的相关文章

利用公路地图获取信息-利用公路地图写最短路径,怎么获取地图里的信息?百度搜索什么地图文件?

问题描述 利用公路地图写最短路径,怎么获取地图里的信息?百度搜索什么地图文件? 学软件的对地图一点不懂,就是想获取地图里的信息.比如有哪几个路口,各个路口的距离,然后写最短路径,做一个软件.为毕业设计做基础.我该找哪些东西呢?主要是怎么利用地图里的数据为我的代码服务. 解决方案 要二次开发用ArcGis这个库,地图有很多,比如http://download.csdn.net/detail/shaojun007/1630938,Google arcgis 地图数据

android利用ContentResolver访问者获取手机短信信息

利用ContentResolver访问者获取手机短信信息,在此记录一下,一遍以后查询. 首先看一下结果,结果如下: activity_message.xml类: <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:tools="http://schemas.android.com/tools" android:id="@+id/activity_

利用“PHP彩蛋”进行敏感信息获取

关于"PHP彩蛋"的说法也许很多老PHPer已经都知道或听说了,好像是早在PHP4版本的时候就有彩蛋了,挺好玩儿的,可能近年来逐渐被人们遗忘了,其实彩蛋功能在PHP脚本引擎默认情况下是开启. 写个phpinfo();然后访问,再加上以下的GET值即可查阅 下面就用Discuz官方论坛做一下测试: http://www.discuz.net/?=PHPE9568F34-D428-11d2-A769-00AA001ACF42 http://www.discuz.net/?=PHPE9568

利用Node.js获取项目根目录的小技巧_node.js

假设我们的js文件写在server目录中,但是我们的资源文件存储在app/img目录中. 实现功能 如下图,我们需要在server/index.js文件中使用fs读取app/img/favicon.ico文件. 实现方法 在node.js只提供了一个 dirname全局变量.通过 dirname可以获得"C:\wwwroot\yidata\server".这时需要用到path. 首先 import path from 'path'; (ES6)或var path = require (

[Qt教程] 第35篇 网络(五)获取本机网络信息

[Qt教程] 第35篇 网络(五)获取本机网络信息 楼主  发表于 2013-9-5 11:32:58 | 查看: 278| 回复: 2 获取本机网络信息 版权声明 该文章原创于作者yafeilinux,转载请注明出处! 导语 前面讲完了HTTP和FTP,下面本来该讲解UDP和TCP了.不过,在讲解它们之前,我们先在这一节里讲解一个以后要经常用到的名词,那就是IP地址.        对于IP地址,其实,会上网的人都应该听说过它.如果你实在很不属性,那么简单的说:IP即InternetProto

.NET获取枚举DescriptionAttribute描述信息性能改进的多种方法_实用技巧

一. DescriptionAttribute的普通使用方式 1.1 使用示例 DescriptionAttribute特性可以用到很多地方,比较常见的就是枚举,通过获取枚举上定义的描述信息在UI上显示,一个简单的枚举定义: public enum EnumGender { None, [System.ComponentModel.Description("男")] Male, [System.ComponentModel.Description("女")] Fem

在C#中获取页面元素布局信息的代码

最近研究一个如何在网页定位验证码并截图的问题时, 用SS写了一段C#小脚本可以轻松获取页面任 意元素的布局信息 (top, left, width, height). 10行功能代码, 觉得有点用, 现分享给大家: public dynamic GetRect(JQueryContext node) { node.Attr("pos_top", "0"); node.Attr("pos_left", "0"); Default

Hadoop中使用FileStatus类来查看HDFS中文件或目录的元信息

Hadoop中的FileStatus类可以用来查看HDFS中文件或者目录的元信息,任意的文件或者目录都可以拿到对应的FileStatus, 我们这里简单的演示下这个类的相关API: /* */ package com.charles.hadoop.fs; import java.net.URI; import java.sql.Timestamp; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.F

编写Python脚本来获取mp3文件tag信息的教程

  这篇文章主要介绍了编写Python脚本来获取mp3文件tag信息的教程,代码基于Python2.x,文中的注释很详细,需要的朋友可以参考下 下面利用一个python的实例程序,来学习python.这个程序的目的就是分析出所有MP3文件的Tag信息并输出. import os # 导入os模块,提供文件路径,列出文件等方法 import sys # 导入sys模块,使用sys.modules获取模块中的所有内容,类似反射的功能 from UserDict import UserDict # 这