您现在的位置是:首页 > 编程语言学习 > 后端编程语言 > 文章正文 后端编程语言

利用MySqlBulkLoader实现批量插入数据的示例详解

2022-06-28 11:04:29 后端编程语言

简介最近在项目中遇到插入数据瓶颈,几万、几十万、几百万的数据保存到MYSQL数据库,使用EF插入数据速度非常慢,数据量非常大时EF插入需要几十...

最近在项目中遇到插入数据瓶颈,几万、几十万、几百万的数据保存到MYSQL数据库,使用EF插入数据速度非常慢,数据量非常大时EF插入需要几十分钟,甚至几个小时,这样子的速度肯定不是我们所期望的。

后面经过了解与研究发现MySqlBulkLoader,可以批量将数据插入到数据库并且速度上面远远优于EF。

MySqlBulkLoader主要的实现方式:将需要插入的数据转成DataTable,DataTable转成一个CSV文件,将CSV文件使用批量导入的形式导入到数据库里面去。

注意:

1).数据库连接地址需要添加配置AllowLoadLocalInfile=true,允许本地文件导入;

Data Source = 数据库地址; Port = 端口; Initial Catalog = 数据库名; User Id = 用户名; Password = 密码;AllowLoadLocalInfile=true;

2).插入的时候会返回插入行数,但是检查所有的数据都正确,也没有报异常,却返回了插入数量为0,可以检查表是否有唯一索引,插入的数据是否违反了唯一索引

(以下分块展示了代码,如果需要看完整的代码直接看5.完整的代码

1.将List转化为DataTable


  1. /// <summary> 
  2.         /// 将List转化为DataTable 
  3.         /// </summary> 
  4.         /// <returns></returns> 
  5.         public DataTable ListToDataTable<T>(List<T> data) 
  6.         { 
  7.             #region 创建一个DataTable,以实体名称作为DataTable名称 
  8.  
  9.             var tableName = typeof(T).Name; 
  10.             tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ 
  11.             DataTable dt = new DataTable 
  12.             { 
  13.                 TableName = tableName 
  14.             }; 
  15.  
  16.             #endregion 
  17.  
  18.             #region 拿取列名,以实体的属性名作为列名        
  19.  
  20.             var properties = typeof(T).GetProperties(); 
  21.             foreach (var item in properties) 
  22.             { 
  23.                 var curFileName = item.Name; 
  24.                 curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ 
  25.                 dt.Columns.Add(curFileName); 
  26.             } 
  27.  
  28.             #endregion 
  29.  
  30.             #region 列赋值 
  31.             foreach (var item in data) 
  32.             { 
  33.                 DataRow dr = dt.NewRow(); 
  34.                 var columns = dt.Columns; 
  35.  
  36.                 var curPropertyList = item.GetType().GetProperties(); 
  37.                 foreach (var p in curPropertyList) 
  38.                 { 
  39.                     var name = p.Name; 
  40.                     name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ 
  41.                     var curValue = p.GetValue(item); 
  42.  
  43.                     int i = columns.IndexOf(name); 
  44.                     dr[i] = curValue; 
  45.                 } 
  46.  
  47.                 dt.Rows.Add(dr); 
  48.             } 
  49.  
  50.             #endregion   
  51.  
  52.             return dt; 
  53.         } 

2.将DataTable转换为标准的CSV文件

  1. /// <summary> 
  2. /// csv扩展 
  3. /// </summary> 
  4. public static class CSVEx 
  5. /// <summary> 
  6. ///将DataTable转换为标准的CSV文件 
  7. /// </summary> 
  8. /// <param name="table">数据表</param> 
  9. /// <param name="tmpPath">文件地址</param> 
  10. /// <returns>返回标准的CSV</returns> 
  11. public static void ToCsv(this DataTable table, string tmpPath) 
  12. //以半角逗号(即,)作分隔符,列为空也要表达其存在。 
  13. //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。 
  14. //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。 
  15. StringBuilder sb = new StringBuilder(); 
  16. DataColumn colum; 
  17. foreach (DataRow row in table.Rows) 
  18. for (int i = 0; i < table.Columns.Count; i++) 
  19. Type _datatype = typeof(DateTime); 
  20. colum = table.Columns[i]; 
  21. if (i != 0) sb.Append("\t"); 
  22. //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(",")) 
  23. //{ 
  24. //sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); 
  25. //} 
  26. if (colum.DataType == _datatype) 
  27. sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss")); 
  28. else sb.Append(row[colum].ToString()); 
  29. sb.Append("\r\n"); 
  30. StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8); 
  31. sw.Write(sb.ToString()); 
  32. sw.Close(); 
  33.  

3.CSV文件导入数据到数据库

  1. /// <summary> 
  2. /// 批量导入mysql帮助类 
  3. /// </summary> 
  4. public static class MySqlHelper 
  5. /// <summary> 
  6. /// MySqlBulkLoader批量导入 
  7. /// </summary> 
  8. /// <param name="_mySqlConnection">数据库连接地址</param> 
  9. /// <param name="table"></param> 
  10. /// <param name="csvName"></param> 
  11. /// <returns></returns> 
  12. public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName) 
  13. var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList(); 
  14. MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection) 
  15. FieldTerminator = "\t"
  16. FieldQuotationCharacter = '"'
  17. EscapeCharacter = '"'
  18. LineTerminator = "\r\n"
  19. FileName = csvName, 
  20. NumberOfLinesToSkip = 0, 
  21. TableName = table.TableName, 
  22.  
  23. }; 
  24.  
  25. bulk.Columns.AddRange(columns); 
  26. return bulk.Load(); 

4.使用MySqlBulkLoader批量插入数据

  1. /// <summary> 
  2. /// 使用MySqlBulkLoader批量插入数据 
  3. /// </summary> 
  4. /// <typeparam name="T"></typeparam> 
  5. /// <param name="data"></param> 
  6. /// <returns></returns> 
  7. /// <exception cref="Exception"></exception> 
  8. public int BulkLoaderData<T>(List<T> data) 
  9. if (data.Count <= 0) return 0; 
  10.  
  11. var connectString = "数据库连接地址"
  12. using (MySqlConnection connection = new MySqlConnection(connectString)) 
  13. MySqlTransaction sqlTransaction = null
  14. try 
  15. if (connection.State == ConnectionState.Closed) 
  16. connection.Open(); 
  17. sqlTransaction = connection.BeginTransaction(); 
  18.  
  19.  
  20. var dt = ListToDataTable<T>(data); //将List转成dataTable 
  21. string tmpPath = Path.GetTempFileName(); 
  22. dt.ToCsv(tmpPath); //将DataTable转成CSV文件 
  23. var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据 
  24. sqlTransaction.Commit(); 
  25.  
  26. try 
  27. if (File.Exists(tmpPath)) File.Delete(tmpPath); 
  28. catch (Exception) 
  29. //删除文件失败 
  30.  
  31. return insertCount; //返回执行成功的条数 
  32. catch (Exception e) 
  33. if (sqlTransaction != null
  34. sqlTransaction.Rollback(); 
  35. //执行异常  
  36. throw e; 
  37.  

5.完整的代码

  1. namespace WebApplication1.BrantchInsert 
  2.  
  3.     /// <summary> 
  4.     /// 批量插入 
  5.     /// </summary> 
  6.     public class BulkLoader 
  7.     { 
  8.  
  9.  
  10.         /// <summary> 
  11.         /// 测试批量插入入口 
  12.         /// </summary> 
  13.         /// <returns></returns> 
  14.         public int BrantchDataTest() 
  15.         { 
  16.  
  17.             #region 模拟数据 
  18.             var data = new List<CrmCouponTestDto>() { 
  19.                  new CrmCouponTestDto { 
  20.                      Id=1, 
  21.                      CouponCode="test001"
  22.                      CouponId = 1, 
  23.                      MemberId=100, 
  24.                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), 
  25.                      UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), 
  26.                      UsageShopId=0, 
  27.                      UsageBillNo=""
  28.                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), 
  29.                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), 
  30.                      Status=0 
  31.                  }, 
  32.                  new CrmCouponTestDto { 
  33.                      Id=2, 
  34.                      CouponCode="test002"
  35.                      CouponId = 1, 
  36.                        MemberId=101, 
  37.                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), 
  38.                      UsageTime=Convert.ToDateTime("2022-06-27 14:30:00"), 
  39.                      UsageShopId=2, 
  40.                      UsageBillNo="CS202206271430001"
  41.                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), 
  42.                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), 
  43.                      Status=1 
  44.                  }, 
  45.                   new CrmCouponTestDto { 
  46.                      Id=3, 
  47.                      CouponCode="test003"
  48.                      CouponId = 1, 
  49.                      MemberId=102, 
  50.                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), 
  51.                      UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), 
  52.                      UsageShopId=0, 
  53.                      UsageBillNo=""
  54.                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), 
  55.                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), 
  56.                      Status=0 
  57.                  }, 
  58.                     new CrmCouponTestDto { 
  59.                      Id=4, 
  60.                      CouponCode="test004"
  61.                      CouponId = 1, 
  62.                      MemberId=103, 
  63.                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"), 
  64.                      UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"), 
  65.                      UsageShopId=0, 
  66.                      UsageBillNo=""
  67.                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"), 
  68.                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"), 
  69.                      Status=0 
  70.                  } 
  71.              }; 
  72.             #endregion 
  73.             var result = BulkLoaderData<CrmCouponTestDto>(data); 
  74.             return result; 
  75.  
  76.         } 
  77.  
  78.  
  79.         /// <summary> 
  80.         /// 使用MySqlBulkLoader批量插入数据 
  81.         /// </summary> 
  82.         /// <typeparam name="T"></typeparam> 
  83.         /// <param name="data"></param> 
  84.         /// <returns></returns> 
  85.         /// <exception cref="Exception"></exception> 
  86.         public int BulkLoaderData<T>(List<T> data) 
  87.         { 
  88.             if (data.Count <= 0) return 0; 
  89.  
  90.             var connectString = "数据库连接地址"
  91.             using (MySqlConnection connection = new MySqlConnection(connectString)) 
  92.             { 
  93.                 MySqlTransaction sqlTransaction = null
  94.                 try 
  95.                 { 
  96.                     if (connection.State == ConnectionState.Closed) 
  97.                     { 
  98.                         connection.Open(); 
  99.                     } 
  100.                     sqlTransaction = connection.BeginTransaction(); 
  101.  
  102.  
  103.                     var dt = ListToDataTable<T>(data); //将List转成dataTable 
  104.                     string tmpPath = Path.GetTempFileName(); 
  105.                     dt.ToCsv(tmpPath); //将DataTable转成CSV文件 
  106.                     var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据 
  107.                     sqlTransaction.Commit(); 
  108.  
  109.                     try 
  110.                     { 
  111.                         if (File.Exists(tmpPath)) File.Delete(tmpPath); 
  112.                     } 
  113.                     catch (Exception) 
  114.                     { 
  115.                         //删除文件失败 
  116.  
  117.                     } 
  118.                     return insertCount; //返回执行成功的条数 
  119.                 } 
  120.                 catch (Exception e) 
  121.                 { 
  122.                     if (sqlTransaction != null
  123.                     { 
  124.                         sqlTransaction.Rollback(); 
  125.                     } 
  126.                     //执行异常  
  127.                     throw e; 
  128.                 } 
  129.             } 
  130.  
  131.         } 
  132.  
  133.  
  134.         /// <summary> 
  135.         /// 将List转化为DataTable核心方法 
  136.         /// </summary> 
  137.         /// <returns></returns> 
  138.         public DataTable ListToDataTable<T>(List<T> data) 
  139.         { 
  140.             #region 创建一个DataTable,以实体名称作为DataTable名称 
  141.  
  142.             var tableName = typeof(T).Name; 
  143.             tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ 
  144.             DataTable dt = new DataTable 
  145.             { 
  146.                 TableName = tableName 
  147.             }; 
  148.  
  149.             #endregion 
  150.  
  151.             #region 拿取列名,以实体的属性名作为列名        
  152.  
  153.             var properties = typeof(T).GetProperties(); 
  154.             foreach (var item in properties) 
  155.             { 
  156.                 var curFileName = item.Name; 
  157.                 curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ 
  158.                 dt.Columns.Add(curFileName); 
  159.             } 
  160.  
  161.             #endregion 
  162.  
  163.             #region 列赋值 
  164.             foreach (var item in data) 
  165.             { 
  166.                 DataRow dr = dt.NewRow(); 
  167.                 var columns = dt.Columns; 
  168.  
  169.                 var curPropertyList = item.GetType().GetProperties(); 
  170.                 foreach (var p in curPropertyList) 
  171.                 { 
  172.                     var name = p.Name; 
  173.                     name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/ 
  174.                     var curValue = p.GetValue(item); 
  175.  
  176.                     int i = columns.IndexOf(name); 
  177.                     dr[i] = curValue; 
  178.                 } 
  179.  
  180.                 dt.Rows.Add(dr); 
  181.             } 
  182.  
  183.             #endregion   
  184.  
  185.             return dt; 
  186.         } 
  187.  
  188.  
  189.     } 
  190.  
  191.  
  192.     /// <summary> 
  193.     /// 批量导入mysql帮助类 
  194.     /// </summary> 
  195.     public static class MySqlHelper 
  196.     { 
  197.         /// <summary> 
  198.         /// MySqlBulkLoader批量导入 
  199.         /// </summary> 
  200.         /// <param name="_mySqlConnection">数据库连接地址</param> 
  201.         /// <param name="table"></param> 
  202.         /// <param name="csvName"></param> 
  203.         /// <returns></returns> 
  204.         public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName) 
  205.         { 
  206.             var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList(); 
  207.             MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection) 
  208.             { 
  209.                 FieldTerminator = "\t"
  210.                 FieldQuotationCharacter = '"'
  211.                 EscapeCharacter = '"'
  212.                 LineTerminator = "\r\n"
  213.                 FileName = csvName, 
  214.                 NumberOfLinesToSkip = 0, 
  215.                 TableName = table.TableName, 
  216.  
  217.             }; 
  218.  
  219.             bulk.Columns.AddRange(columns); 
  220.             return bulk.Load(); 
  221.         } 
  222.     } 
  223.  
  224.  
  225.     /// <summary> 
  226.     /// csv扩展 
  227.     /// </summary> 
  228.     public static class CSVEx 
  229.     { 
  230.         /// <summary> 
  231.         ///将DataTable转换为标准的CSV文件 
  232.         /// </summary> 
  233.         /// <param name="table">数据表</param> 
  234.         /// <param name="tmpPath">文件地址</param> 
  235.         /// <returns>返回标准的CSV</returns> 
  236.         public static void ToCsv(this DataTable table, string tmpPath) 
  237.         { 
  238.             //以半角逗号(即,)作分隔符,列为空也要表达其存在。 
  239.             //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。 
  240.             //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。 
  241.             StringBuilder sb = new StringBuilder(); 
  242.             DataColumn colum; 
  243.             foreach (DataRow row in table.Rows) 
  244.             { 
  245.                 for (int i = 0; i < table.Columns.Count; i++) 
  246.                 { 
  247.                     Type _datatype = typeof(DateTime); 
  248.                     colum = table.Columns[i]; 
  249.                     if (i != 0) sb.Append("\t"); 
  250.                     //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(",")) 
  251.                     //{ 
  252.                     //    sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); 
  253.                     //} 
  254.                     if (colum.DataType == _datatype) 
  255.                     { 
  256.                         sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss")); 
  257.                     } 
  258.                     else sb.Append(row[colum].ToString()); 
  259.                 } 
  260.                 sb.Append("\r\n"); 
  261.             } 
  262.             StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8); 
  263.             sw.Write(sb.ToString()); 
  264.             sw.Close(); 
  265.         } 
  266.  
  267.     } 
  268.  
  269.     /// <summary> 
  270.     /// 字符串转化 
  271.     /// </summary> 
  272.     public static class StringExtensions 
  273.     { 
  274.         /// <summary> 
  275.         /// 转换为 main_keys_id 这种形式的字符串方式 
  276.         /// </summary> 
  277.         public static string ToSnakeCase(this string input) 
  278.         { 
  279.             if (string.IsNullOrEmpty(input)) { return input; } 
  280.  
  281.             var startUnderscores = Regex.Match(input, @"^_+"); 
  282.             return startUnderscores + Regex.Replace(input, @"([a-z0-9])([A-Z])""$1_$2").ToLower(); 
  283.         } 
  284.     } 
  285.  
  286.  
  287.     /// <summary> 
  288.     /// 实体 
  289.     /// </summary> 
  290.     public class CrmCouponTestDto 
  291.     { 
  292.         /// <summary> 
  293.         /// ID 
  294.         /// </summary> 
  295.         public long Id { get; set; } 
  296.  
  297.         /// <summary> 
  298.         /// 卡券号 
  299.         /// </summary>      
  300.         public string CouponCode { get; set; } 
  301.  
  302.         /// <summary> 
  303.         /// 卡券ID 
  304.         /// </summary> 
  305.         public int CouponId { get; set; } 
  306.  
  307.         /// <summary> 
  308.         /// 会员ID 
  309.         /// </summary> 
  310.         public int MemberId { get; set; } 
  311.  
  312.         /// <summary> 
  313.         /// 发放时间 
  314.         /// </summary>    
  315.         public DateTime IssueTime { get; set; } 
  316.  
  317.         /// <summary> 
  318.         /// 使用时间 
  319.         /// </summary>       
  320.         public DateTime UsageTime { get; set; } 
  321.  
  322.         /// <summary> 
  323.         /// 使用店铺ID 
  324.         /// </summary>       
  325.  
  326.         public int UsageShopId { get; set; } 
  327.  
  328.         /// <summary> 
  329.         /// 使用单号 
  330.         /// </summary>       
  331.         public string UsageBillNo { get; set; } 
  332.  
  333.         /// <summary> 
  334.         /// 有效开始时间 
  335.         /// </summary>       
  336.         public DateTime EffectiveStart { get; set; } 
  337.  
  338.         /// <summary> 
  339.         /// 有效结束时间 
  340.         /// </summary>       
  341.         public DateTime EffectiveEnd { get; set; } 
  342.  
  343.         /// <summary> 
  344.         /// 状态 
  345.         /// CouponStatus 卡券状态: 
  346.         /// -1:未领用 
  347.         /// 0:未使用 
  348.         /// 1:已使用 
  349.         /// 2:已过期 
  350.         ///3:已作废 
  351.         ///4:转赠中 
  352.         /// </summary> 
  353.  
  354.         public Int16 Status { get; set; } 
  355.     } 

 

站点信息