python实现rest请求api示例_python

该代码参考新浪python api,适用于各个开源api请求

复制代码 代码如下:

# -*- coding: utf-8 -*-
import collections
import gzip
import urllib
import urllib2
from urlparse import urlparse

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO

try:
    import json
except ImportError:
    import simplejson as json

__author__ = 'myth'

_HTTP_GET = 1
_HTTP_POST = 2
# 超时时间(秒)
TIMEOUT = 45
RETURN_TYPE = {"json": 0, "xml": 1, "html": 2, "text": 3}
_METHOD_MAP = {'GET': _HTTP_GET, 'POST': _HTTP_POST}

class APIError(StandardError):
    """
    raise APIError if receiving json message indicating failure.
    """
    def __init__(self, error_code, error, request):
        self.error_code = error_code
        self.error = error
        self.request = request
        StandardError.__init__(self, error)

    def __str__(self):
        return 'APIError: %s: %s, request: %s' % (self.error_code, self.error, self.request)

# def callback_type(return_type='json'):
#
#     default_type = "json"
#     default_value = RETURN_TYPE.get(default_type)
#     if return_type:
#         if isinstance(return_type, (str, unicode)):
#             default_value = RETURN_TYPE.get(return_type.lower(), default_value)
#     return default_value

def _format_params(_pk=None, encode=None, **kw):
    """

    :param kw:
    :type kw:
    :param encode:
    :type encode:
    :return:
    :rtype:
    """

    __pk = '%s[%%s]' % _pk if _pk else '%s'
    encode = encode if encode else urllib.quote

    for k, v in kw.iteritems():
        _k = __pk % k
        if isinstance(v, basestring):
            qv = v.encode('utf-8') if isinstance(v, unicode) else v
            _v = encode(qv)
            yield _k, _v
        elif isinstance(v, collections.Iterable):
            if isinstance(v, dict):
                for _ck, _cv in _format_params(_pk=_k, encode=encode, **v):
                    yield _ck, _cv
            else:
                for i in v:
                    qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
                    _v = encode(qv)
                    yield _k, _v
        else:
            qv = str(v)
            _v = encode(qv)
            yield _k, _v

def encode_params(**kw):
    """
    do url-encode parameters

    >>> encode_params(a=1, b='R&D')
    'a=1&b=R%26D'
    >>> encode_params(a=u'\u4e2d\u6587', b=['A', 'B', 123])
    'a=%E4%B8%AD%E6%96%87&b=A&b=B&b=123'

    >>> encode_params(**{
        'a1': {'aa1': 1, 'aa2': {'aaa1': 11}},
        'b1': [1, 2, 3, 4],
        'c1': {'cc1': 'C', 'cc2': ['Q', 1, '@'], 'cc3': {'ccc1': ['s', 2]}}
    })
    'a1[aa1]=1&a1[aa2][aaa1]=11&c1[cc1]=C&c1[cc3][ccc1]=s&c1[cc3][ccc1]=2&c1[cc2]=Q&c1[cc2]=1&c1[cc2]=%40&b1=1&b1=2&b1=3&b1=4'
    """
    # args = []
    # for k, v in kw.iteritems():
    #     if isinstance(v, basestring):
    #         qv = v.encode('utf-8') if isinstance(v, unicode) else v
    #         args.append('%s=%s' % (k, urllib.quote(qv)))
    #     elif isinstance(v, collections.Iterable):
    #         for i in v:
    #             qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
    #             args.append('%s=%s' % (k, urllib.quote(qv)))
    #     else:
    #         qv = str(v)
    #         args.append('%s=%s' % (k, urllib.quote(qv)))
    # return '&'.join(args)

    args = []
    _urlencode = kw.pop('_urlencode', urllib.quote)
    for k, v in _format_params(_pk=None, encode=_urlencode, **kw):
        args.append('%s=%s' % (k, v))

    args = sorted(args, key=lambda s: s.split("=")[0])
    return '&'.join(args)

def _read_body(obj):
    using_gzip = obj.headers.get('Content-Encoding', '') == 'gzip'
    body = obj.read()
    if using_gzip:
        gzipper = gzip.GzipFile(fileobj=StringIO(body))
        fcontent = gzipper.read()
        gzipper.close()
        return fcontent
    return body

class JsonDict(dict):
    """
    general json object that allows attributes to be bound to and also behaves like a dict
    """

    def __getattr__(self, attr):
        try:
            return self[attr]
        except KeyError:
            raise AttributeError(r"'JsonDict' object has no attribute '%s'" % attr)

    def __setattr__(self, attr, value):
        self[attr] = value

def _parse_json(s):
    """
    parse str into JsonDict
    """

    def _obj_hook(pairs):
        """
        convert json object to python object
        """
        o = JsonDict()
        for k, v in pairs.iteritems():
            o[str(k)] = v
        return o
    return json.loads(s, object_hook=_obj_hook)

def _parse_xml(s):
    """
    parse str into xml
    """

    raise NotImplementedError()

def _parse_html(s):
    """
    parse str into html
    """

    raise NotImplementedError()

def _parse_text(s):
    """
    parse str into text
    """

    raise NotImplementedError()

def _http_call(the_url, method, return_type="json", request_type=None, request_suffix=None, headers={}, _timeout=30, **kwargs):
    """
    the_url: 请求地址
    method 请求方法(get,post)
    return_type: 返回格式解析
    request_suffix: 请求地址的后缀,如jsp,net
    _timeout: 超时时间
    kwargs: 请求参数
    """

    http_url = "%s.%s" (the_url, request_suffix) if request_suffix else the_url

    if request_type == 'json':
        headers['Content-Type'] = 'application/json'
        # method = _HTTP_POST
        # json_data = json.dumps(kwargs)
        # # convert str to bytes (ensure encoding is OK)
        # params = json_data.encode('utf-8')
        params = json.dumps(kwargs)
    else:
        params = encode_params(**kwargs)
        http_url = '%s?%s' % (http_url, params) if method == _HTTP_GET else http_url
    print http_url
    # u = urlparse(http_url)
    # headers.setdefault("host", u.hostname)
    http_body = None if method == _HTTP_GET else params

    req = urllib2.Request(http_url, data=http_body, headers=headers)

    callback = globals().get('_parse_{0}'.format(return_type))
    if not hasattr(callback, '__call__'):
        print "return '%s' unable to resolve" % return_type
        callback = _parse_json
    try:

        resp = urllib2.urlopen(req, timeout=_timeout if _timeout else TIMEOUT)
        body = _read_body(resp)
        r = callback(body)
        # if hasattr(r, 'error_code'):
        #     raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
        return r
    except urllib2.HTTPError, e:
        try:
            body = _read_body(e)
            r = callback(body)
            return r
        except:
            r = None
        # if hasattr(r, 'error_code'):
        #     raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
            raise e

class HttpObject(object):

    def __init__(self, client, method):
        self.client = client
        self.method = method

    def __getattr__(self, attr):

        def wrap(**kw):
            if attr:
                the_url = '%s/%s' % (self.client.api_url, attr.replace('__', '/'))
            else:
                the_url = self.client.api_url
            return _http_call(the_url, self.method, **kw)
        return wrap

    def __call__(self, **kw):
        return _http_call(self.client.api_url, self.method, **kw)

class APIClient(object):
    """
    使用方法:
        比如:api 请求地址为:http://api.open.zbjdev.com/kuaiyinserv/kuaiyin/billaddress
             请求方式为: GET
             需要的参数为:user_id 用户的UID
                         is_all 是否查询所有数据,0为默认邮寄地址 1为全部邮寄地址
                         access_token 平台认证
             返回数据为:json

        那么此时使用如下:
        domain = "api.open.zbjdev.com"
        #如果是https请求,需要将is_https设置为True
        client = APIClient(domain)
        data = {"user_id": "14035462", "is_all": 1, "access_token": "XXXXXXXXXX"}
        # 如果是post请求,请将get方法改为post方法
        result = client.kuaiyinserv.kuaiyin.billaddress.get(return_type="json", **data)
        #等同于
        # result = client.kuaiyinserv__kuaiyin__billaddress__get(return_type="json", **data)
        # result = client.kuaiyinserv__kuaiyin__billaddress(return_type="json", **data)
    """

    def __init__(self, domain, is_https=False):

        http = "http"
        if domain.startswith("http://") or domain.startswith("https://"):
            http, domain = domain.split("://")
        # else:
        if is_https:
            http = "https"

        self.api_url = ('%s://%s' % (http, domain)).rstrip("/")
        self.get = HttpObject(self, _HTTP_GET)
        self.post = HttpObject(self, _HTTP_POST)

    def __getattr__(self, attr):
        if '__' in attr:

            method = self.get
            if attr[-6:] == "__post":
                attr = attr[:-6]
                method = self.post

            elif attr[-5:] == "__get":
                attr = attr[:-5]

            if attr[:2] == '__':
                attr = attr[2:]
            return getattr(method, attr)
        return _Callable(self, attr)

class _Executable(object):

    def __init__(self, client, method, path):
        self._client = client
        self._method = method
        self._path = path

    def __call__(self, **kw):
        method = _METHOD_MAP[self._method]
        return _http_call('%s/%s' % (self._client.api_url, self._path), method, **kw)

    def __str__(self):
        return '_Executable (%s %s)' % (self._method, self._path)

    __repr__ = __str__

class _Callable(object):

    def __init__(self, client, name):
        self._client = client
        self._name = name

    def __getattr__(self, attr):
        if attr == 'get':
            return _Executable(self._client, 'GET', self._name)
        if attr == 'post':
            return _Executable(self._client, 'POST', self._name)
        name = '%s/%s' % (self._name, attr)
        return _Callable(self._client, name)

    def __str__(self):
        return '_Callable (%s)' % self._name

    __repr__ = __str__

def test_logistics():

    domain = "https://api.kuaidi100.com/api"

    #如果是https请求,需要将is_https设置为True
    client = APIClient(domain)

    data = {"id": "45f2d1f2sds", "com": "yunda", "nu": "1500066330925"}
    result = client.__get(_timeout=2, **data)
    print result
    print result["message"]
    print result.get("message")
    print result.message

if __name__ == '__main__':

    # test_logistics()

    data = {
        "data":{
            "id": 1,
            "pid": 3,
            "name": u'中的阿斯顿'
        },
        "sign": 'asdfdsdsfsdf',
    }

    domain = "kuaiyin.zhubajie.com"

    client = APIClient(domain)
    # headers = {'Content-Type': 'application/json'}
    headers = {"host": domain}

    result = client.api.zhubajie.fz.info.post(return_type="json", request_type="json", headers=headers, **data)
    print result
    print result['data']['msg']

    c = APIClient('task.6.zbj.cn')
    r = getattr(c.api, 'kuaiyin-action-delcache').post(request_type="json",
                                                       headers={},
                                                       **{"sign": "",
                                                          "data": {
                                                              "product": "pvc",
                                                              "category": "card",
                                                              "req_type": "pack_list"
                                                          }})
    print r
    print r['data']['msg']

时间: 2024-07-31 00:51:16

python实现rest请求api示例_python的相关文章

快速排序的算法思想及Python版快速排序的实现示例_python

快速排序是C.R.A.Hoare于1962年提出的一种划分交换排序.它采用了一种分治的策略,通常称其为分治法(Divide-and-ConquerMethod). 1.分治法的基本思想 分治法的基本思想是:将原问题分解为若干个规模更小但结构与原问题相似的子问题.递归地解这些子问题,然后将这些子问题的解组合为原问题的解. 2.快速排序的基本思想 设当前待排序的无序区为R[low..high],利用分治法可将快速排序的基本思想描述为: (1)分解: 在R[low..high]中任选一个记录作为基准(

python代码制作configure文件示例_python

在lua中,一直用lua作为config文件,或承载数据的文件 - 好处是lua本身就很好阅读,然后无需额外写解析的代码,还支持在configure文件中读环境变量,条件判断等. 在lua中通过loadfile, setfenv实现) python: cat config.py bar = 10 foo=100 cat python_as_config.py: ns = {} execfile('config.py', ns) print "\n".join(sorted(dir(ns

python实现保存网页到本地示例_python

学习python示例:实现保存网页到本地 复制代码 代码如下: #coding=utf-8__auther__ = 'xianbao'import urllibimport osdef reporthook(blocks_read, block_size, total_size): if not blocks_read:  print '打开连接'  return if total_size < 0:  print "%d正在读取(%dbytes完成)"%(blocks_read

python实现绘制树枝简单示例_python

python是解释型语言,本文介绍了Python下利用turtle实现绘图功能的示例,本例所示为Python绘制一个树枝,具体实现代码如下:      python是解释型语言,本文介绍了Python下利用turtle实现绘图功能的示例,本例所示为Python绘制一个树枝,具体实现代码如下: import turtle def branch(length,level): if level<=0: return turtle.forward(length) turtle.left(45) bran

python类参数self使用示例_python

复制代码 代码如下: #coding:utf-8"""__new__和__init__到底是怎么一回事,看下面的代码如果类没有定义__new__方法,就从父类继承这个__new__方法.__new__先于__init__执行,类带括号调用时,发生这样的一件事,先调用类的__new__方法,放回该类的实例对象,这个实例对象就是__init__方法的第一个参数.请看代码中tmp,self,p的内存地址都是一样的,都是类的实例对象.""" class

python实现socket端口重定向示例_python

可以很轻松的在端口12345开启共享,效果如下: 要实现我想要的功能,只需要将端口重定向就行了,代码如下: 复制代码 代码如下: #! /usr/bin/python'''      File      : redirect.py      Author    : Mike''' import socket,osbufLen = 4*1024 sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  sock1.bind(('192.1

Python中list初始化方法示例_python

本文实例讲述了Python中list初始化方法.分享给大家供大家参考,具体如下: 1.基本方法. lst = [1, 2, 3, 4, 5] 2.初始化连续数字. >>> lst = [n for n in range(5, 10)] >>> print(lst) [5, 6, 7, 8, 9] 3.初始化n个相同值.(两种方式) >>> lst = ['x' for n in range(5)] >>> print(lst) ['x

python实现巡检系统(solaris)示例_python

使用python + shell 编写,是一个简易solaris系统巡检程序 复制代码 代码如下: #!/usr/bin/python -u#-*- coding:utf-8 -*-'''程序:solaris_status.pyauthor: gyh9711功能: 系统状态信息获取 语言:  sh + python 注意: 部分调用命令需要用到root权限 测试情况: 系统版本:solaris10 系统测试ok   测试服务器型号:sun 6900 6800 v445 v440 M3000 M5

python获得图片base64编码示例_python

  复制代码 代码如下: #!/usr/bin/env python # -*- coding: utf-8 -*- import os, base64 icon = open('ya.png','rb') iconData = icon.read() iconData = base64.b64encode(iconData) LIMIT = 60 liIcon = [] while True:         sLimit = iconData[:LIMIT]         iconData