OSS文件上传及OSS与ODPS之间数据连通

场景描述

       有这样一种场景,用户在自建服务器上存有一定数量级的CSV格式业务数据,某一天用户了解到阿里云的OSS服务存储性价比高(嘿嘿,颜值高),于是想将CSV数据迁移到云上OSS中,并且未来还想对这些数据做一些离线分析,挖掘其中存在价值,因此需要将OSS中文件再通过一种方式同步到ODPS数加平台上,面对这样需求,小编我经过参考文档,实践,调试并修复Bug,实现出以下一种解决方案。

实现目标

    通过OSS的Java SDK以及批量数据通道tunnel SDK实现以下两个功能:

     (1)将本地CSV文件上传到OSS;

     (2)将OSS中文件同步到ODPS;

准备工作

     在具体实操之前,有必要对OSS有个了解,OSS是个什么东东,为什么要选用OSS呢,OSS控制台限制条件,需要注意事项?

OSS是个什么东东?

      阿里云对象存储(Object Storage Service,简称OSS),是阿里云对外提供的海量,安全,低成本,高可靠的云存储服务。通过网络随时存储和调用包括文本、图片、音频、和视频在内的各种结构化或非结构化数据文件。 

为什么选用云产品OSS服务呢?

是什么原因致使用户放弃使用自建服务器存储数据,而转向云产品OSS呢?

     这方面我深有感触,我以前在上海一家公司工作,原公司所有数据都是存放在自建的五六台服务器上,从规划,采购到部署,这其间过程复杂,人力部署也不简单,而且服务器价格昂贵,开发维护成本高,数据可靠性还低,总之耗时、耗力最重要是影响业务进展。接触了解到OSS后才发现,之前的自建服务器存储真是太out啦,呵呵,OSS颜值高额,这里颜值具体有以下几个方面:

       可靠性高:数据自动多重冗余备份,规模自动扩展,不影响对外服务;

      安全:提供企业级、用户级多层次安全保护,授权机制及白名单、防盗链、主子账号功能;

       成本:省去人工扩容硬盘以及运维成本;

       数据处理能力:提供丰富的数据处理服务,比如图片处理、视频转码、CDN内容加速分发。

OSS控制台限制条件?

   通过 OSS 控制台可以上传小于 500 MB 文件。如要上传的文件大于 500 MB,控制台会给出超过大小限制警告,并且在任务管理列表,失败并尝试上传请求三次。异常警告如下图所示:

解决方法:可以通过 OSS的SDK 进行上传。

需要注意几点

(1) 在OSS中,用户操作基本数据单元是object,单个对象大小限制为48.8TB,一个存储空间中可以有无 

    限量对象。

(2) 新建Bucket,输入存储空间名称,创建后不支持更改存储空间名称,上传到OSS后不能移动文件存储位

    置;

(3) 在所属地域框中,下拉选择该存储空间的数据中心。订购后不支持更换地域。

(4) 删除存储空间之前请确保尚未完成的分片上传文件产生的碎片文件全部清空,否则无法删除存储空间。

(5) 通过web控制台上传文件,一刷新页面,任务管理中显示的上传任务就会消失不见,所以在上传过程中

    不要刷新页面。

本地大文件分片上传到OSS

       因为使用单次HTTP请求,Object过大会导致上传时间长。在这段时间出现网络原因造成超时或者链接断开错误的时候,上传容易失败,可以考虑断点续传上传(分片上传)。当Object大于5GB,这种情况下只能使用断点续传上传(分片上传),具体参考断点续传上传,下面代码实现上传本地路径下ratings.csv文件到OSS object管理中:

见附件中 源代码.rar 压缩文件中的 MultipartUploadDemo 类实现 

单线程实现将OSS文件上传至ODPS(OSS java-SDK与tunnel SDK结合)

      下面代码实现目标:将OSS中bucket名为qf-test,object对象为ratings.csv文件数据导入到ODPS平台中项目名为dtstack_dev,表名为ratings,分区字段为ds=20160612中。

见附件中 源代码.rar 压缩文件中的 OSSToODPS_Upload 类实现 

多线程实现将OSS文件上传至ODPS(OSS java-SDK与tunnel SDK结合)

      下面代码实现目标:将OSS中bucket名为qf-test,object对象为data_test/movies.csv文件数据导入到ODPS平台中项目名为dtstack_dev,表名为movies_odps2中。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.OSSObject;

 class UploadThread implements Callable<Boolean> {
	     private long id;
	     private TableSchema schema = null;
	     private RecordWriter recordWriter = null;
	     private Record record = null;
	     private BufferedReader reader = null;

         public UploadThread(long id, RecordWriter recordWriter, Record record,
                         TableSchema schema,BufferedReader reader) {
        	     this.id = id;
                 this.recordWriter = recordWriter;
                 this.record = record;
                 this.schema = schema;
                 this.reader = reader;
         }

		public Boolean call() throws Exception {
		       while (true) {
		           String line = reader.readLine();
		           if (line == null) break;
		           if(id == 0){  //第一行是字段名,忽略掉
			    	      id++;
			    	      continue;
			       }
		           System.out.println(line);
		           String[] s = line.split(",");
		           for (int i = 0; i < schema.getColumns().size(); i++) {
		               Column column = schema.getColumn(i);
		               switch (column.getType()) {
		                 case BIGINT:
		                       record.setBigint(i, Long.valueOf(s[i]));
		                       break;
		//               case BOOLEAN:
		//                       record.setBoolean(i, str);
		//                       break;
		//               case DATETIME:
		//                       record.setDatetime(i, str);
		//                       break;
		                 case DOUBLE:
		                       record.setDouble(i, Double.valueOf(s[i]));
		                       break;
		               case STRING:
		                       record.setString(i,s[i]);
		                       break;
		                 default:
		                       throw new RuntimeException("Unknown column type: "
		                                       + column.getType());
		               }
		           }
				   recordWriter.write(record);
		       }
			   recordWriter.close();
			   return true;
		}
}

 public class OSSToODPS_UploadThread {
	    private static String accessKeyId = "UQV2yoSSWNgquhhe";
		private static String accessKeySecret = "bG8xSLwhmKYRmtBoE3HbhOBYXvknG6";

		private static String endpoint = "http://oss-cn-hangzhou.aliyuncs.com";
		private static String bucketName = "qf-test";
	    private static String key = "data_test/movies.csv";

	    private static String tunnelUrl = "http://dt.odps.aliyun.com";
	    private static String odpsUrl = "http://service.odps.aliyun.com/api";
	    private static String project = "dtstack_dev";
	    private static String table = "movies_odps2";
	    //private static String partition = "ds=20160612";

         private static int threadNum = 10;

         public static void main(String args[]) {
	        	 /*
	              * Constructs a client instance with your account for accessing OSS
	              */
	             OSSClient client = new OSSClient(endpoint, accessKeyId, accessKeySecret);
	             System.out.println("Downloading an object");
	             OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));
	             BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));

                 Account account = new AliyunAccount(accessKeyId, accessKeySecret);
                 Odps odps = new Odps(account);
                 odps.setEndpoint(odpsUrl);
                 odps.setDefaultProject(project);
                 try {
                         TableTunnel tunnel = new TableTunnel(odps);
                         tunnel.setEndpoint(tunnelUrl);
                         //PartitionSpec partitionSpec = new PartitionSpec(partition);
                         UploadSession uploadSession = tunnel.createUploadSession(project,table);
//                       UploadSession uploadSession = tunnel.createUploadSession(project,
//                                 table, partitionSpec);  //分区

                         System.out.println("Session Status is : "
                                         + uploadSession.getStatus().toString());

                         ExecutorService pool = Executors.newFixedThreadPool(threadNum);
                         ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
                         for (int i = 0; i < threadNum; i++) {
                                 RecordWriter recordWriter = uploadSession.openRecordWriter(i);
                                 Record record = uploadSession.newRecord();
                                 callers.add(new UploadThread(i, recordWriter, record,
                                                 uploadSession.getSchema(),reader));
                         }
                         pool.invokeAll(callers);
                         pool.shutdown();

                         Long[] blockList = new Long[threadNum];
                         for (int i = 0; i < threadNum; i++)
                                 blockList[i] = Long.valueOf(i);
                         uploadSession.commit(blockList);
                         reader.close();
                         System.out.println("upload success!");
                 } catch (TunnelException e) {
                         e.printStackTrace();
                 } catch (IOException e) {
                         e.printStackTrace();
                 } catch (InterruptedException e) {
                         e.printStackTrace();
                 }
         }
}

编程实现中遇到Bug

Apache httpclient包冲突

Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE

    at org.apache.http.conn.ssl.SSLConnectionSocketFactory.<clinit>(SSLConnectionSocketFactory.java:144)

    at com.aliyun.oss.common.comm.DefaultServiceClient.createHttpClientConnectionManager(DefaultServiceClient.java:232)

    at com.aliyun.oss.common.comm.DefaultServiceClient.<init>(DefaultServiceClient.java:78)

    at com.aliyun.oss.OSSClient.<init>(OSSClient.java:273)

    at com.aliyun.oss.OSSClient.<init>(OSSClient.java:194)

    at UploadToODPS.main(UploadToODPS.java:53)

工程里可能有包冲突。原因是OSS Java SDK使用了Apache httpclient 4.4.1,而个人工程使用了与Apache httpclient 4.4.1冲突的Apache httpclient。如上述发生错误的工程里,使用了Apache httpclient 4.1.2:

使用统一版本。如果个人工程里使用与Apache httpclient 4.4.1冲突版本,请也使用4.4.1版本。去掉其它版本的Apache httpclient依赖。

recordWriter.write(record) 写入位置不正确

在单线程编码实现从OSS传数据到ODPS代码中 recordWriter.write(record) 写入位置不正确,如下代码显示:

        for (int i = 0; i < schema.getColumns().size(); i++) {

                Column column = schema.getColumn(i);

                switch (column.getType()) {

                  case BIGINT:

                        record.setBigint(i, Long.valueOf(s[i]));

                        break;

                  case DOUBLE:

                        record.setDouble(i, Double.valueOf(s[i]));

                        break;

                  default:

                        throw new RuntimeException("Unknown column type: "

                                        + column.getType());

                  recordWriter.write(record);  //写入位置不正确

                }

      }

      // recordWriter.write(record);  //放到for循环外,写入位置正确

recordWriter.write(record)写入位置不对,将recordWriter.write(record)放置到for循环内,会出现以下奇怪异常:

正确位置是:将recordWriter.write(record)放置到for循环外,结果如下表显示:

上传代码中 partition="20160612" 字符串写法不对

需要注意,指定分区字符串在程序中正确写法:

private static String partition = "ds=20160612"; (必须加上分区字段名)

PartitionSpec partitionSpec = new PartitionSpec(partition);

不正确写法如下:

private static String partition = "20160612";(缺少分区字段名)

多线程上传任务无故中断,如下是异常截图


通过多线程将OSS中文件同步到ODPS表中时,实现多任务的并发执行,在编码实现时要注意reader.close()位置要放正确:

UploadSession uploadSession = tunnel.createUploadSession(project,table, partitionSpec);

OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));

BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));

Long[] blockList = new Long[threadNum];

uploadSession.commit(blockList);

将reader.close()放到Callable接口中call()方法里是不对滴,call方法是线程异步执行地方,开启的所有线程不断地异步从OSS的缓冲字符输入流reader中读取OSS中数据,如果在call()方法中就将reader关闭,也就是说将输入数据源关闭,直接导致线程读取失败。因此reader.close()应该放在线程外部,即uploadSession.commit()位置后边,如下。

uploadSession.commit(blockList);

reader.close();  //正确位置

System.out.println("upload success!");

时间: 2024-08-01 08:33:00

OSS文件上传及OSS与ODPS之间数据连通的相关文章

android-Android 文件上传出错服务器却可以得到数据

问题描述 Android 文件上传出错服务器却可以得到数据 用的AsyncHttpClient 文件上传时报错:org.apache.http.client.httpresponseExcrption:not found 但是服务器那边可以得到数据

php判断文件上传类型及过滤不安全数据的方法_php技巧

本文实例讲述了php判断文件上传类型及过滤不安全数据的方法.分享给大家供大家参考.具体如下: 禁止上传除图片文件以外的文件,提示,不要获取文件扩展名来判断类型,这样是最不安全的,我们用$_FIlES['form']['type']. 这个可以读取文件内容来识别文件类型,但它能识别的有限,不过如果你用图片就足够了解.函数,过滤不安全字符,实例函数代码如下: 复制代码 代码如下: function s_addslashes($string, $force = 0) {  if(!get_magic_

有关文件上传 非ajax提交 得到后台数据问题_javascript技巧

下文给大家介绍文件上传非ajax提交得到后台数据的操作方法,具体详情如下所示: <form name="configForm" id="configForm" method="post" action="" > .......... </form> 根据id获得表单数据然后发送ajax请求,获得后台返回数据,处理数据,完美. 但是如果需要上传文件, <tr> <td class=&qu

php 判断文件上传类型与过滤不安全数据

这函数 过滤不安全字符 function s_addslashes($string, $force = 0) {  if(!get_magic_quotes_gpc()) {   if(is_array($string)) {    foreach($string as $key => $val) {     $string[$key] = s_addslashes($val, $force);    }   } else {    $string=str_replace("&#x

Fine Uploader文件上传组件

原文 Fine Uploader文件上传组件 最近在处理后台数据时需要实现文件上传.考虑到对浏览器适配上采用Fine Uploader. Fine Uploader 采用ajax方式实现对文件上传.同时在浏览器中直接支持文件拖拽[对浏览器版本有要求类似IE版本必须是9或是更高的IE10].在不同浏览器中提供统 一用户体验.该组件基本覆盖目前所有主流浏览器.同时没有任何第三方组件依赖.相当Clear.在服务器端已经覆盖支持了 ASP.NET/ColdFusion/Java/Node.js/Perl

php文件上传类(该类支持单个或者多个文件上传)(1/2)

<!doctype html public "-//w3c//dtd xhtml 1.0 transitional//en" "http://www.w3.org/tr/xhtml1/dtd/xhtml1-transitional.dtd"> <html xmlns="http://www.111cn.net/1999/xhtml"> <head> <meta http-equiv="conte

php版阿里云OSS图片上传类详解_php技巧

本文实例讲述了php版阿里云OSS图片上传类.分享给大家供大家参考,具体如下: 1.阿里云基本函数 /** * 把本地变量的内容到文件 * 简单上传,上传指定变量的内存值作为object的内容 */ public function putObject($imgPath,$object) { $content = file_get_contents($imgPath); // 把当前文件的内容获取到传入文件中 $options = array(); try { $this->ossClient->

winform上传到 OSS 总包这错

问题描述 winform上传到 OSS 总包这错 怎么解决 求大神

JavaBean实现多文件上传的两种方法

上传 摘要:本文介绍了JavaBean实现多个文件上传的两种方法,分别是使用http协议和ftp协议实现.首先讲述了http协议传送多个文件的基本格式和实现上传的详细过程,之后简单介绍了使用ftpclient 类实现了ftp方式的上传,最后对这两种方法进行了比较. 关键字:JavaBean .http .ftp .ftpclient JavaBean是一种基于Java的软件组件.JSP对于在Web 应用中集成JavaBean组件提供了完善的支持.这种支持不仅能缩短开发时间(可以直接利用经测试和可