python编写api调用ceph对象网关

#_*_coding:utf-8_*_

#yum install python-boto

import boto

import boto.s3.connection

#pip install filechunkio

from filechunkio import  FileChunkIO

import math

import  threading

import os

import Queue

class Chunk(object):

    num = 0

    offset = 0

    len = 0

    def __init__(self,n,o,l):

        self.num=n

        self.offset=o

        self.length=l

 

 

 

class CONNECTION(object):

    def __init__(self,access_key,secret_key,ip,port,is_secure=False,chrunksize=8<<20): #chunksize最小8M否则上传过程会报错

        self.conn=boto.connect_s3(

        aws_access_key_id=access_key,

        aws_secret_access_key=secret_key,

        host=ip,port=port,

        is_secure=is_secure,

        calling_format=boto.s3.connection.OrdinaryCallingFormat()

        )

        self.chrunksize=chrunksize

        self.port=port

 

    #查询

    def list_all(self):

        all_buckets=self.conn.get_all_buckets()

        for bucket in all_buckets:

            print u'容器名: %s' %(bucket.name)

            for key in bucket.list():

                print ' '*5,"%-20s%-20s%-20s%-40s%-20s" %(key.mode,key.owner.id,key.size,key.last_modified.split('.')[0],key.name)

 

    def list_single(self,bucket_name):

        try:

            single_bucket = self.conn.get_bucket(bucket_name)

        except Exception as e:

            print 'bucket %s is not exist' %bucket_name

            return

        print u'容器名: %s' % (single_bucket.name)

        for key in single_bucket.list():

            print ' ' * 5"%-20s%-20s%-20s%-40s%-20s" % (key.mode, key.owner.id, key.size, key.last_modified.split('.')[0], key.name)

 

    #普通小文件下载:文件大小<=8M

    def dowload_file(self,filepath,key_name,bucket_name):

        all_bucket_name_list = [i.name for in self.conn.get_all_buckets()]

        if bucket_name not in all_bucket_name_list:

            print 'Bucket %s is not exist,please try again' % (bucket_name)

            return

        else:

            bucket = self.conn.get_bucket(bucket_name)

 

        all_key_name_list = [i.name for in bucket.get_all_keys()]

        if key_name not in all_key_name_list:

            print 'File %s is not exist,please try again' % (key_name)

            return

        else:

            key = bucket.get_key(key_name)

 

        if not os.path.exists(os.path.dirname(filepath)):

            print 'Filepath %s is not exists, sure to create and try again' % (filepath)

            return

 

        if os.path.exists(filepath):

            while True:

                d_tag = raw_input('File %s already exists, sure you want to cover (Y/N)?' % (key_name)).strip()

                if d_tag not in ['Y''N'or len(d_tag) == 0:

                    continue

                elif d_tag == 'Y':

                    os.remove(filepath)

                    break

                elif d_tag == 'N':

                    return

        os.mknod(filepath)

        try:

            key.get_contents_to_filename(filepath)

        except Exception:

            pass

 

    # 普通小文件上传:文件大小<=8M

    def upload_file(self,filepath,key_name,bucket_name):

        try:

            bucket = self.conn.get_bucket(bucket_name)

        except Exception as e:

            print 'bucket %s is not exist' % bucket_name

            tag = raw_input('Do you want to create the bucket %s: (Y/N)?' % bucket_name).strip()

            while tag not in ['Y''N']:

                tag = raw_input('Please input (Y/N)').strip()

            if tag == 'N':

                return

            elif tag == 'Y':

                self.conn.create_bucket(bucket_name)

                bucket = self.conn.get_bucket(bucket_name)

        all_key_name_list = [i.name for in bucket.get_all_keys()]

        if key_name in all_key_name_list:

            while True:

                f_tag = raw_input(u'File already exists, sure you want to cover (Y/N)?: ').strip()

                if f_tag not in ['Y''N'or len(f_tag) == 0:

                    continue

                elif f_tag == 'Y':

                    break

                elif f_tag == 'N':

                    return

        key=bucket.new_key(key_name)

        if not os.path.exists(filepath):

            print 'File %s does not exist, please make sure you want to upload file path and try again' %(key_name)

            return

        try:

            f=file(filepath,'rb')

            data=f.read()

            key.set_contents_from_string(data)

        except Exception:

            pass

 

    def delete_file(self,key_name,bucket_name):

        all_bucket_name_list = [i.name for in self.conn.get_all_buckets()]

        if bucket_name not in all_bucket_name_list:

            print 'Bucket %s is not exist,please try again' % (bucket_name)

            return

        else:

            bucket = self.conn.get_bucket(bucket_name)

 

        all_key_name_list = [i.name for in bucket.get_all_keys()]

        if key_name not in all_key_name_list:

            print 'File %s is not exist,please try again' % (key_name)

            return

        else:

            key = bucket.get_key(key_name)

 

        try:

            bucket.delete_key(key.name)

        except Exception:

            pass

 

    def delete_bucket(self,bucket_name):

        all_bucket_name_list = [i.name for in self.conn.get_all_buckets()]

        if bucket_name not in all_bucket_name_list:

            print 'Bucket %s is not exist,please try again' % (bucket_name)

            return

        else:

            bucket = self.conn.get_bucket(bucket_name)

 

        try:

            self.conn.delete_bucket(bucket.name)

        except Exception:

            pass

 

 

    #队列生成

    def init_queue(self,filesize,chunksize):   #8<<20 :8*2**20

        chunkcnt=int(math.ceil(filesize*1.0/chunksize))

        q=Queue.Queue(maxsize=chunkcnt)

        for in range(0,chunkcnt):

            offset=chunksize*i

            length=min(chunksize,filesize-offset)

            c=Chunk(i+1,offset,length)

            q.put(c)

        return q

 

    #分片上传object

    def upload_trunk(self,filepath,mp,q,id):

        while not q.empty():

            chunk=q.get()

            fp=FileChunkIO(filepath,'r',offset=chunk.offset,bytes=chunk.length)

            mp.upload_part_from_file(fp,part_num=chunk.num)

            fp.close()

            q.task_done()

 

    #文件大小获取---->S3分片上传对象生成----->初始队列生成(--------------->文件切,生成切分对象)

    def upload_file_multipart(self,filepath,key_name,bucket_name,threadcnt=8):

        filesize=os.stat(filepath).st_size

        try:

            bucket=self.conn.get_bucket(bucket_name)

        except Exception as e:

            print 'bucket %s is not exist' % bucket_name

            tag=raw_input('Do you want to create the bucket %s: (Y/N)?' %bucket_name).strip()

            while tag not in ['Y','N']:

                tag=raw_input('Please input (Y/N)').strip()

            if tag == 'N':

                return

            elif tag == 'Y':

                self.conn.create_bucket(bucket_name)

                bucket = self.conn.get_bucket(bucket_name)

        all_key_name_list=[i.name for in bucket.get_all_keys()]

        if key_name  in all_key_name_list:

            while True:

                f_tag=raw_input(u'File already exists, sure you want to cover (Y/N)?: ').strip()

                if f_tag not in ['Y','N'or len(f_tag) == 0:

                    continue

                elif f_tag == 'Y':

                    break

                elif f_tag == 'N':

                    return

 

        mp=bucket.initiate_multipart_upload(key_name)

        q=self.init_queue(filesize,self.chrunksize)

        for in range(0,threadcnt):

            t=threading.Thread(target=self.upload_trunk,args=(filepath,mp,q,i))

            t.setDaemon(True)

            t.start()

        q.join()

        mp.complete_upload()

 

    #文件分片下载

    def download_chrunk(self,filepath,key_name,bucket_name,q,id):

        while not q.empty():

            chrunk=q.get()

            offset=chrunk.offset

            length=chrunk.length

            bucket=self.conn.get_bucket(bucket_name)

            resp=bucket.connection.make_request('GET',bucket_name,key_name,headers={'Range':"bytes=%d-%d" %(offset,offset+length)})

            data=resp.read(length)

            fp=FileChunkIO(filepath,'r+',offset=chrunk.offset,bytes=chrunk.length)

            fp.write(data)

            fp.close()

            q.task_done()

 

    def download_file_multipart(self,filepath,key_name,bucket_name,threadcnt=8):

        all_bucket_name_list=[i.name for in self.conn.get_all_buckets()]

        if bucket_name not in all_bucket_name_list:

            print 'Bucket %s is not exist,please try again' %(bucket_name)

            return

        else:

            bucket=self.conn.get_bucket(bucket_name)

 

        all_key_name_list = [i.name for in bucket.get_all_keys()]

        if key_name not in all_key_name_list:

            print 'File %s is not exist,please try again' %(key_name)

            return

        else:

            key=bucket.get_key(key_name)

 

        if not os.path.exists(os.path.dirname(filepath)):

            print 'Filepath %s is not exists, sure to create and try again' % (filepath)

            return

 

        if os.path.exists(filepath):

            while True:

                d_tag = raw_input('File %s already exists, sure you want to cover (Y/N)?' % (key_name)).strip()

                if d_tag not in ['Y''N'or len(d_tag) == 0:

                    continue

                elif d_tag == 'Y':

                    os.remove(filepath)

                    break

                elif d_tag == 'N':

                    return

        os.mknod(filepath)

        filesize=key.size

        q=self.init_queue(filesize,self.chrunksize)

        for in range(0,threadcnt):

            t=threading.Thread(target=self.download_chrunk,args=(filepath,key_name,bucket_name,q,i))

            t.setDaemon(True)

            t.start()

        q.join()

 

    def generate_object_download_urls(self,key_name,bucket_name,valid_time=0):

        all_bucket_name_list = [i.name for in self.conn.get_all_buckets()]

        if bucket_name not in all_bucket_name_list:

            print 'Bucket %s is not exist,please try again' % (bucket_name)

            return

        else:

            bucket = self.conn.get_bucket(bucket_name)

 

        all_key_name_list = [i.name for in bucket.get_all_keys()]

        if key_name not in all_key_name_list:

            print 'File %s is not exist,please try again' % (key_name)

            return

        else:

            key = bucket.get_key(key_name)

 

        try:

            key.set_canned_acl('public-read')

            download_url = key.generate_url(valid_time, query_auth=False, force_http=True)

            if self.port != 80:

                x1=download_url.split('/')[0:3]

                x2=download_url.split('/')[3:]

                s1=u'/'.join(x1)

                s2=u'/'.join(x2)

 

                s3=':%s/' %(str(self.port))

                download_url=s1+s3+s2

                print download_url

 

        except Exception:

            pass

 

 

 

if __name__ == '__main__':

    #约定:

    #1:filepath指本地文件的路径(上传路径or下载路径),指的是绝对路径

    #2:bucket_name相当于文件在对象存储中的目录名或者索引名

    #3:key_name相当于文件在对象存储中对应的文件名或文件索引

 

    access_key = "65IY4EC1BSFYNH6SHWGW"

    secret_key = "viNfIftLHhrPt2MYK44DkWGvxZb82aYqLrCzGYLx"

    ip='172.16.201.36'

    port=8080

    conn=CONNECTION(access_key,secret_key,ip,port)

    #查看所有bucket以及其包含的文件

    #conn.list_all()

 

    #简单上传,用于文件大小<=8M

    # conn.upload_file('/etc/passwd','passwd','test_bucket01')

    #查看单一bucket下所包含的文件信息

    # conn.list_single('test_bucket01')

 

 

    #简单下载,用于文件大小<=8M

    # conn.dowload_file('/lhf_test/test01','passwd','test_bucket01')

    # conn.list_single('test_bucket01')

 

    #删除文件

    # conn.delete_file('passwd','test_bucket01')

    # conn.list_single('test_bucket01')

    #

    #删除bucket

    # conn.delete_bucket('test_bucket01')

    # conn.list_all()

 

    #切片上传(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小

    # conn.upload_file_multipart('/etc/passwd','passwd_multi_upload','test_bucket01')

    # conn.list_single('test_bucket01')

 

    # 切片下载(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小

    # conn.download_file_multipart('/lhf_test/passwd_multi_dowload','passwd_multi_upload','test_bucket01')

 

    #生成下载url

    #conn.generate_object_download_urls('passwd_multi_upload','test_bucket01')

    #conn.list_all()

时间: 2024-09-20 20:33:36

python编写api调用ceph对象网关的相关文章

Ceph对象存储网关中的索引工作原理&lt;转&gt;

Ceph 对象存储网关允许你通过 Swift 及 S3 API 访问 Ceph .它将这些 API 请求转化为 librados 请求.Librados 是一个非常出色的对象存储(库)但是它无法高效的列举对象.对象存储网关维护自有索引来提升列举对象的响应性能并维护了其他的一些元信息.有关对象存储网关索引工作原理的文章很少,所以我写了这篇博文,权当抛砖迎玉. 我们先来看看一个已存在的 bucket 这个 bucket 的对象列表存储在一个单独的 rados 对象中.这个对象的名字是 .dir. 加

基于python编写的微博应用_python

本文实例讲述了基于python编写的微博应用,分享给大家供大家参考.具体如下: 在编写自己的微博应用之前,先要到weibo开放平台申请应用的公钥和私钥. 下载python版的SDK,打开example目录,仿照oauthSetTokenUpdate.py进行编码, 复制代码 代码如下: # -*- coding: utf-8 -*- from weibopy.auth import OAuthHandler from weibopy.api import API consumer_key= '应

Ceph分布式存储学习指南1.7 Ceph对象存储

1.7 Ceph对象存储 对象存储是一种以对象形式而不是传统文件和块形式存储数据的方法.基于对象的存储已经引起了行业界的大量关注.为灵活地使用它们的巨量数据,这些组织正快速采用对象存储解决方案.Ceph是一个众所周知的真正的对象存储系统. Ceph是一个分布式对象存储系统,通过它的对象网关(object gateway),也就是RADOS网关(radosgw)提供对象存储接口.RADOS网关利用librgw(RADOS网关库)和librados这些库,允许应用程序跟Ceph对象存储建立连接.Ce

详情Python编写shellcode注入程序入门教程

背景 本文为入门基础,纯科普,大牛莫喷~ 教程中所有内容仅供学习研究,请勿用于非法用途,否则....我也帮不了你啊... 说起注入,大家第一印象可能还习惯性的停留在sql注入,脚本注入(XSS)等.今天light同(jiao)学(shou)带大家从web端回到操作系统,一起探讨Windows下的经典注入--内存注入,使用python编写一个简单的代码注入程序. 内存注入常见的方法有dll注入和代码注入.Dll注入通俗地讲就是把我们自己的dll注入到目标进程的地址空间内,"寄生"在目标进

使用 .NET 框架类替代 API 调用 (一)

使用 .NET 框架类替代 API 调用 升级到 Microsoft .NET Ken GetzMCW Technologies 2002 年 2 月 摘要:通过学习 Microsoft .NET 框架中某些特定而有用的类,可以减少您对 Win32 API 调用的依赖.本文讨论的每个类都可以代替一个或多个 Win32 API 调用,而在 Microsoft Visual Basic 6.0 中,您必须调用一个或多个 Win32 API 才能完成相同的任务. 目标 查找现有 Win32 API 调

使用 .NET 框架类替代 API 调用 (二)

使用 System.Environment 类 System.Environment 类提供了若干不同的信息,如果没有这些信息,就需要进行多次 Windows API 调用.使用 System.Environment 可以检索: 有关可用驱动器的信息(GetLogicalDrives 方法) Windows 启动后的毫秒数(TickCount 属性) 一般环境设置(由 CurrentDirectory.MachineName.OSVersion.SystemDirectory.UserDomai

使用.NET框架类替代API调用择

.net框架 使用 .NET 框架类替代 API 调用 升级到 Microsoft .NET Ken GetzMCW Technologies2002 年 2 月 摘要:通过学习 Microsoft .NET 框架中某些特定而有用的类,可以减少您对 Win32 API 调用的依赖.本文讨论的每个类都可以代替一个或多个 Win32 API 调用,而在 Microsoft Visual Basic 6.0 中,您必须调用一个或多个 Win32 API 才能完成相同的任务. 目标 查找现有 Win32

python编写网页爬虫脚本并实现APScheduler调度_python

前段时间自学了python,作为新手就想着自己写个东西能练习一下,了解到python编写爬虫脚本非常方便,且最近又学习了MongoDB相关的知识,万事具备只欠东风. 程序的需求是这样的,爬虫爬的页面是京东的电子书网站页面,每天会更新一些免费的电子书,爬虫会把每天更新的免费的书名以第一时间通过邮件发给我,通知我去下载. 一.编写思路: 1.爬虫脚本获取当日免费书籍信息 2.把获取到的书籍信息与数据库中的已有信息作比较,如果书籍存在不做任何操作,书籍不存在,执行插入数据库的操作,把数据的信息存入Mo

Python标准库05 存储对象 (pickle包,cPickle包)

原文:Python标准库05 存储对象 (pickle包,cPickle包) 作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明.谢谢! 谢谢reverland纠错   在之前对Python对象的介绍中 (面向对象的基本概念,面向对象的进一步拓展),我提到过Python"一切皆对象"的哲学,在Python中,无论是变量还是函数,都是一个对象.当Python运行时,对象存储在内存中,随时等待系统的调用.然而,内存里的数据会随着计算