SQLServer · 最佳实践 · 数据库实现大容量插入的几种方式

背景

很多用户在使用阿里云云数据库SQL Server时,为了加快插入速度,都尝试使用大容量插入的方式,大家都知道,对于完整恢复模式下的数据库,大容量导入执行的所有行插入操作都会完整地记录在事务日志中。如果使用完整恢复模式,大型数据导入会导致填充事务日志的速度很快。相反,对于简单恢复模式或大容量日志恢复模式,大容量导入操作的按最小方式记录日志减少了大容量导入操作填满日志空间的可能性。另外,按最小方式记录日志的效率也比按完整方式记录日志高 。
但实际上,当大容量导入与数据库镜像共存时,会出现镜像 Suspend的情况,这个情况是由于微软在2008 R2上的BUG导致,微软已经明确表示在2008 R2不会FIXED,那么如何正确在RDS使用大容量导入并避免镜像异常,下面介绍几种方式.

实现方法

  • 通过ADO.NET SQLBulkCopy 方式
  • 只需要将SqlBulkCopy 指定SqlBulkCopyOptions.CheckConstraints就好,即:SqlBulkCopy blkcpy = new SqlBulkCopy(desConnString, SqlBulkCopyOptions.CheckConstraints)
    例如:将本地的一个大表通过SQLBulkCopy方式导入到RDS的实例中
    
    static void Main()
    {
      string srcConnString = "Data Source=(local);Integrated Security=true;
      Initial Catalog=testdb";
      string desConnString = "Data Source=****.sqlserver.rds.aliyuncs.com,3433;
      UserID=**;Password=**;Initial Catalog=testdb";
    
      SqlConnection srcConnection = new SqlConnection();
      SqlConnection desConnection = new SqlConnection();
    
      SqlCommand sqlcmd = new SqlCommand();
      SqlDataAdapter da = new SqlDataAdapter();
      DataTable dt = new DataTable();
    
      srcConnection.ConnectionString = srcConnString;
      desConnection.ConnectionString = desConnString;
      sqlcmd.Connection = srcConnection;
    
      sqlcmd.CommandText = @"
      SELECT top 1000000 [PersonType],[NameStyle],[Title],[FirstName],[MiddleName],
      [LastName] ,[Suffix],[EmailPromotion],[AdditionalContactInfo],[Demographics],NULL
      as rowguid,[ModifiedDate] FROM [testdb].[dbo].[Person]";
    
      sqlcmd.CommandType = CommandType.Text;
      sqlcmd.Connection.Open();
      da.SelectCommand = sqlcmd;
      da.Fill(dt);
    
      using (SqlBulkCopy blkcpy =
      new  SqlBulkCopy(desConnString,SqlBulkCopyOptions.CheckConstraints))
      // using (SqlBulkCopy blkcpy =
      // new SqlBulkCopy(desConnString, SqlBulkCopyOptions.Default))
      {
        blkcpy.BatchSize = 2000;
        blkcpy.BulkCopyTimeout = 5000;
        blkcpy.SqlRowsCopied += new SqlRowsCopiedEventHandler(OnSqlRowsCopied);
        blkcpy.NotifyAfter = 2000;
    
        foreach (DataColumn dc in dt.Columns)
        {
          blkcpy.ColumnMappings.Add(dc.ColumnName, dc.ColumnName);
        }
    
        try
        {
          blkcpy.DestinationTableName = "Person";
          blkcpy.WriteToServer(dt);
        }
        catch (Exception ex)
        {
          Console.WriteLine(ex.Message);
        }
        finally
        {
          sqlcmd.Clone();
          srcConnection.Close();
          desConnection.Close();
        }
      }
    }
    
    private static void OnSqlRowsCopied(
        object sender, SqlRowsCopiedEventArgs e)
    {
      Console.WriteLine("Copied {0} so far...", e.RowsCopied);
    }
    
    
  • 通过JDBC SQLServerBulkCopy 方式
  • 同样的道理,需要在copyOptions指定检查约束性
    SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions();
    copyOptions.setCheckConstraints(true);
    测试时,请用Microsoft JDBC Drivers 6.0 的sqljdbc41.jar,sqljdbc4.jar及更老版本没有SQLServerBulkCopy 实现。
    
    例如: 将本地的一个大表通过SQLServerBulkCopy方式导入到RDS的实例中
    
    import java.sql.*;
    import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy;
    import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;
    
    public class Program {
      public static void main(String[] args)
      {
        String sourceConnectionString  = "jdbc:sqlserver://localhost:1433;" +
                "databaseName=testdb;user=****;password=****";
        String destConnectionString  = "jdbc:sqlserver://*****.sqlserver.rds.aliyuncs.com:3433;" +
                "databaseName=testdb;user=****;password=**** ";
        try
        {
          Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
          try (Connection sourceConnection =
               DriverManager.getConnection(sourceConnectionString))
          {
            try (Statement stmt = sourceConnection.createStatement())
            {
              try (ResultSet rsSourceData = stmt.executeQuery(
                    " SELECT top 1000000 " +
                    "[PersonType],[NameStyle],[Title],[FirstName],[MiddleName],[LastName] ," +
                    "[Suffix],[EmailPromotion],[AdditionalContactInfo]," +
                    "[Demographics],NULL as rowguid,[ModifiedDate] " +
                    "FROM [testdb].[dbo].[Person]"))
              {
                try (Connection destinationConnection =  DriverManager.getConnection(destConnectionString))
                {
                  Statement stmt1 = destinationConnection.createStatement();
    
                  long countStart = 0;
                  try (ResultSet rsRowCount = stmt1.executeQuery(
                          "SELECT COUNT(*) FROM dbo.Person;"))
                  {
                    rsRowCount.next();
                    countStart = rsRowCount.getInt(1);
                    System.out.println("Starting row count = " + countStart);
                  }  
    
                  try (SQLServerBulkCopy bulkCopy =   new SQLServerBulkCopy(destinationConnection))
                  {
                    SQLServerBulkCopyOptions copyOptions = new SQLServerBulkCopyOptions();
                    copyOptions.setKeepIdentity(true);
                    copyOptions.setBatchSize(2000);
                    copyOptions.setBulkCopyTimeout(5000);
                    //this is importance setting
                    copyOptions.setCheckConstraints(true);
                    bulkCopy.setBulkCopyOptions(copyOptions);
                    bulkCopy.setDestinationTableName("dbo.Person");
                    bulkCopy.addColumnMapping("PersonType", "PersonType");
                    bulkCopy.addColumnMapping("NameStyle", "NameStyle");
                    bulkCopy.addColumnMapping("Title", "Title");
                    bulkCopy.addColumnMapping("FirstName", "FirstName");
                    bulkCopy.addColumnMapping("MiddleName", "MiddleName");
                    bulkCopy.addColumnMapping("LastName", "LastName");
                    bulkCopy.addColumnMapping("Suffix", "Suffix");
                    bulkCopy.addColumnMapping("EmailPromotion", "EmailPromotion");
                    bulkCopy.addColumnMapping("AdditionalContactInfo", "AdditionalContactInfo");
                    bulkCopy.addColumnMapping("Demographics", "Demographics");
                    bulkCopy.addColumnMapping("rowguid", "rowguid");
                    bulkCopy.addColumnMapping("ModifiedDate", "ModifiedDate");
                    try
                    {
                      bulkCopy.writeToServer(rsSourceData);
                    }
                    catch (Exception e)
                    {
                      e.printStackTrace();
                    }  
    
                    try (ResultSet rsRowCount = stmt1.executeQuery(
                          "SELECT COUNT(*) FROM dbo.Person;"))
                    {
                      rsRowCount.next();
                      long countEnd = rsRowCount.getInt(1);
                      System.out.println("Ending row count = " + countEnd);
                      System.out.println((countEnd - countStart) + " rows were added.");
                    }
                  }
                }
              }
            }
          }
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
      }
    }
  • 通过BCP方式

第一步:需要将数据BCP到本地

BCP testdb.dbo.person Out "bcp_data" /t  /N /U **** /P *** /S "****.sqlserver.rds.aliyuncs.com,3433"

第二步:将导出的文件直接导入到RDS的实例中,但需要指定提示:/h “CHECK_CONSTRAINTS”

BCP testdb.dbo.person In "bcp_data" /C /N /q /k /h "CHECK_CONSTRAINTS" /U *** /P *** /b 500 /S  "***.sqlserver.rds.aliyuncs.com,3433"  
  • 通过DTS/SSIS方式

第一种:import/export data方式需要先保存SSIS包,然后修改Connection Manager的属性 ,如下图

第二种:直接使用SQL Server Business Intelligence Development Stuidio新建 SSIS包:

  • 特别说明

不能在RDS通过下列两种方式进行大容量插入: 原因是基于安全考虑不提供上传文件到RDS 数据库服务器。

第一种:

BULK INSERT testdb.dbo.person_in
FROM N'D:\trace\bcp.txt'
WITH
(
 CHECK_CONSTRAINTS
);  

第二种:

INSERT ... SELECT * FROM OPENROWSET(BULK...)
  • 总结

大容量导入数据会带来更快的插入,解决了用户在有大量数据导入缓慢困惑,在阿里云数据库中,你可以使用五种方式来实现业务场景,但是基于镜像的主备关系,需要特别加入一个检查约束的选项,这是写这个最佳实践的目的,一旦镜像SUSPEND,不断有DUMP文件产生,一来需要时间来修正,二来DUMP文件也会不断占用空间,但不会影响用户的可用性和可靠性。有两种方式在RDS中不能实现,另外,还可以通过ODBC来实现大容量导入,具体请参见。希望这些对大家有用,特别是阿里云云数据库使用用户。

时间: 2024-11-02 22:58:32

SQLServer · 最佳实践 · 数据库实现大容量插入的几种方式的相关文章

SQLServer · 最佳实践 · 开发基于.NET CORE的LINUX版本的数据库应用

title: SQLServer · 最佳实践 · 开发基于.NET CORE的LINUX版本的数据库应用 author: 石沫 背景 最近有客户在基于.NET CORE的LINUX版本连接数据库的应用程序,在开发中,会遇到一些问题,客户会错误地将原因定位到我们的SQL SERVER,陆续收到一些工单,因此,我们需要有计划增强这个方面的能力,同事正确引导用户使用SQL SERVER. 部署环境 1. 服务器版本:ubuntu 14.04 2. .NET CORE 版本:1.0 3. 安装过程 3

SQLServer · 最佳实践 · 如何将SQL Server 2012降级到2008 R2

title: SQLServer · 最佳实践 · 如何将SQL Server 2012降级到2008 R2 author: 石沫 迁移须知 使用SQLSERVER 2012的特性在SQL 2008 R2不支持,比如新的分页方式 此迁移操作手册适用于MSSQL2012到MSSQL2008R2的迁移 迁移使用微软提供的脚本生成和导入导出工具 需要在本地将MSSQL2012迁移完成,并且应用改造完成测试通过方可上RDS 迁移到MSSQL2008R2 RDS通过备份还原的方式进行 迁移步骤 安装200

SQLServer · 最佳实践 · 透明数据加密TDE在SQLServer的应用

title: SQLServer · 最佳实践 · 透明数据加密TDE在SQLServer的应用 author: 石沫 背景 作为云计算的服务提供者,我们在向用户提供优秀的服务能力时会遇到一个合规的问题.在数据库领域,数据是极其敏感和珍贵的,保护好数据,就如保护好企业的生命线.因此,需要采取一些预防措施来帮助保护数据库的安全,如设计一个安全系统.加密机密资产以及在数据库服务器的周围构建防火墙.但是,如果遇到物理介质被盗的情况,恶意破坏方只需还原或附加数据库即可浏览数据,或者遭遇拖库情况.一种解决

SQLServer · 最佳实践 · SQL Server 2012 使用OFFSET分页遇到的问题

title: SQLServer · 最佳实践 · SQL Server 2012 使用OFFSET分页遇到的问题 author: 石沫 1. 背景 最近有一个客户遇到一个奇怪的问题,以前使用ROW_NUMBER来分页结果是正确的,但是替换为SQL SERVER 2012的OFFSET...FETCH NEXT来分页出现了问题,因此,这里简单分析一下原因,更深层次的原因还没有确切的结论,但可以提供解决办法. 在升级数据库后并且应用新功能时,这个问题可能会困扰一些同学. 2. 现象 为了复现在这个

SQLServer · 最佳实践 · RDS for SQL Server 2012 权限限制的提升与改善

title: SQLServer · 最佳实践 · RDS for SQL Server 2012 权限限制的提升与改善 author: 石沫 背景 SQL Server 作为一种强大的关系型数据库,能够提供所有场景的应用.在绝大多数云计算公司中,都提供了SQL Server作为服务的云数据库服务,譬如阿里云.但既然是服务,那么服务就需要可管理,可控制,因此,在云计算初期,都对云数据库服务进行了严格的权限控制,好处就是可控可管理,但给用户会带来一些限制,某些限制实际上是可以再细粒度管控.因此,今

SQLServer · 最佳实践 · SQL Server优化案例分享

title: SQLServer · 最佳实践 · SQL Server优化案例分享 author: 石沫 前端时间的视频分享,这里有同学文档总结下来: 请参考:https://yq.aliyun.com/articles/60119

mybatis中批量插入的两种方式(高效插入)_java

MyBatis简介 MyBatis是一个支持普通SQL查询,存储过程和高级映射的优秀持久层框架.MyBatis消除了几乎所有的JDBC代码和参数的手工设置以及对结果集的检索封装.MyBatis可以使用简单的XML或注解用于配置和原始映射,将接口和Java的POJO(Plain Old Java Objects,普通的Java对象)映射成数据库中的记录. 一.mybiats foreach标签 foreach的主要用在构建in条件中,它可以在SQL语句中进行迭代一个集合.foreach元素的属性主

mysql 忽略主键冲突、避免重复插入的几种方式

mysql 忽略主键冲突.避免重复插入的几种方式 方案一:使用 ignore 关键字 方案二:使用 replace into 方案三:ON DUPLICATE KEY UPDATE  方案一:使用 ignore 关键字 如果是用主键primary或者唯一索引unique区分了记录的唯一性,避免重复插入记录可以使用: insert ignore into table_name(email,phone,user_id) values('test9@163.com','99999','9999'),这

C# 批量插入表SQLSERVER SqlBulkCopy往数据库中批量插入数据

#region 帮助实例:SQL 批量插入数据 多种方法 /// <summary> /// SqlBulkCopy往数据库中批量插入数据 /// </summary> /// <param name="sourceDataTable">数据源表</param> /// <param name="targetTableName">服务器上目标表</param> /// <param nam