问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

几种数据库的大数据批量插入【转】

发布网友 发布时间:2022-04-07 16:44

我来回答

1个回答

热心网友 时间:2022-04-07 18:13

首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了。///<summary>/// 提供数据批量处理的方法。 ///</summary>publicinterface IBatcherProvider : IProviderService { ///<summary>/// 将<see cref="DataTable"/> 的数据批量插入到数据库中。 ///</summary>///<param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>///<param name="batchSize">每批次写入的数据量。</param>void Insert(DataTable dataTable, int batchSize = 10000); }一、SqlServer数据批量插入SqlServer的批量插入很简单,使用SqlBulkCopy就可以,以下是该类的实现:///<summary>/// 为System.Data.SqlClient 提供的用于批量操作的方法。 ///</summary>publicsealedclass MsSqlBatcher : IBatcherProvider { ///<summary>/// 获取或设置提供者服务的上下文。 ///</summary>public ServiceContext ServiceContext { get; set; } ///<summary>/// 将<see cref="DataTable"/> 的数据批量插入到数据库中。 ///</summary>///<param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>///<param name="batchSize">每批次写入的数据量。</param>publicvoid Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); //给表名加上前后导符var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) { DestinationTableName = tableName, BatchSize = batchSize }) { //循环所有列,为bulk添加映射 dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); bulk.WriteToServer(dataTable); bulk.Close(); } } catch (Exception exp) { thrownew BatcherException(exp); } finally { connection.TryClose(); } } } }以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。二、Oracle数据批量插入System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess组件来作为提供者。///<summary>/// Oracle.Data.Access 组件提供的用于批量操作的方法。 ///</summary>publicsealedclass OracleAccessBatcher : IBatcherProvider { ///<summary>/// 获取或设置提供者服务的上下文。 ///</summary>public ServiceContext ServiceContext { get; set; } ///<summary>/// 将<see cref="DataTable"/> 的数据批量插入到数据库中。 ///</summary>///<param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>///<param name="batchSize">每批次写入的数据量。</param>publicvoid Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null) { thrownew BatcherException(new ArgumentException("command")); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); command.ExecuteNonQuery(); } } catch (Exception exp) { thrownew BatcherException(exp); } finally { connection.TryClose(); } } } ///<summary>/// 生成插入数据的sql语句。 ///</summary>///<param name="database"></param>///<param name="command"></param>///<param name="table"></param>///<returns></returns>privatestring GenerateInserSql(IDatabase database, DbCommand command, DataTable table) { var names = new StringBuilder(); var values = new StringBuilder(); //将一个DataTable的数据转换为数组的数组var data = table.ToArray(); //设置ArrayBindCount属性 command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); var syntax = database.Provider.GetService<ISyntaxProvider>(); for (var i = 0; i < table.Columns.Count; i++) { var column = table.Columns[i]; var parameter = database.Provider.DbProviderFactory.CreateParameter(); if (parameter == null) { continue; } parameter.ParameterName = column.ColumnName; parameter.Direction = ParameterDirection.Input; parameter.DbType = column.DataType.GetDbType(); parameter.Value = data[i]; if (names.Length > 0) { names.Append(","); values.Append(","); } names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); command.Parameters.Add(parameter); } returnstring.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); } }以上最重要的一步,就是将DataTable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环Columns将后数组作为Parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。三、SQLite数据批量插入SQLite的批量插入只需开启事务就可以了,这个具体的原理不得而知。publicsealedclass SQLiteBatcher : IBatcherProvider { ///<summary>/// 获取或设置提供者服务的上下文。 ///</summary>public ServiceContext ServiceContext { get; set; } ///<summary>/// 将<see cref="DataTable"/> 的数据批量插入到数据库中。 ///</summary>///<param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>///<param name="batchSize">每批次写入的数据量。</param>publicvoid Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = ServiceContext.Database.CreateConnection()) { DbTransaction transcation = null; try { connection.TryOpen(); transcation = connection.BeginTransaction(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null) { thrownew BatcherException(new ArgumentException("command")); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); if (command.CommandText == string.Empty) { return; } var flag = new AssertFlag(); dataTable.EachRow(row => { var first = flag.AssertTrue(); ProcessCommandParameters(dataTable, command, row, first); command.ExecuteNonQuery(); }); } transcation.Commit(); } catch (Exception exp) { if (transcation != null) { transcation.Rollback(); } thrownew BatcherException(exp); } finally { connection.TryClose(); } } } privatevoid ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) { for (var c = 0; c < dataTable.Columns.Count; c++) { DbParameter parameter; //首次创建参数,是为了使用缓存if (first) { parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); parameter.ParameterName = dataTable.Columns[c].ColumnName; command.Parameters.Add(parameter); } else { parameter = command.Parameters[c]; } parameter.Value = row[c]; } } ///<summary>/// 生成插入数据的sql语句。 ///</summary>///<param name="database"></param>///<param name="table"></param>///<returns></returns>privatestring GenerateInserSql(IDatabase database, DataTable table) { var syntax = database.Provider.GetService<ISyntaxProvider>(); var names = new StringBuilder(); var values = new StringBuilder(); var flag = new AssertFlag(); table.EachColumn(column => { if (!flag.AssertTrue()) { names.Append(","); values.Append(","); } names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); }); returnstring.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); } } 四、MySql数据批量插入///<summary>/// 为MySql.Data 组件提供的用于批量操作的方法。 ///</summary>publicsealedclass MySqlBatcher : IBatcherProvider { ///<summary>/// 获取或设置提供者服务的上下文。 ///</summary>public ServiceContext ServiceContext { get; set; } ///<summary>/// 将<see cref="DataTable"/> 的数据批量插入到数据库中。 ///</summary>///<param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param>///<param name="batchSize">每批次写入的数据量。</param>publicvoid Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null) { thrownew BatcherException(new ArgumentException("command")); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); if (command.CommandText == string.Empty) { return; } command.ExecuteNonQuery(); } } catch (Exception exp) { thrownew BatcherException(exp); } finally { connection.TryClose(); } } } ///<summary>/// 生成插入数据的sql语句。 ///</summary>///<param name="database"></param>///<param name="command"></param>///<param name="table"></param>///<returns></returns>privatestring GenerateInserSql(IDatabase database, DbCommand command, DataTable table) { var names = new StringBuilder(); var values = new StringBuilder(); var types = new List<DbType>(); var count = table.Columns.Count; var syntax = database.Provider.GetService<ISyntaxProvider>(); table.EachColumn(c => { if (names.Length > 0) { names.Append(","); } names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); types.Add(c.DataType.GetDbType()); }); var i = 0; foreach (DataRow row in table.Rows) { if (i > 0) { values.Append(","); } values.Append("("); for (var j = 0; j < count; j++) { if (j > 0) { values.Append(", "); } var isStrType = IsStringType(types[j]); var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); if (parameter != null) { values.Append(parameter.ParameterName); command.Parameters.Add(parameter); } elseif (isStrType) { values.AppendFormat("'{0}'", row[j]); } else { values.Append(row[j]); } } values.Append(")"); i++; } returnstring.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); } ///<summary>/// 判断是否为字符串类别。 ///</summary>///<param name="dbType"></param>///<returns></returns>privatebool IsStringType(DbType dbType) { return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; } ///<summary>/// 创建参数。 ///</summary>///<param name="provider"></param>///<param name="isStrType"></param>///<param name="dbType"></param>///<param name="value"></param>///<param name="parPrefix"></param>///<param name="row"></param>///<param name="col"></param>///<returns></returns>private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) { //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime) { var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); var parameter = provider.DbProviderFactory.CreateParameter(); parameter.ParameterName = name; parameter.Direction = ParameterDirection.Input; parameter.DbType = dbType; parameter.Value = value; return parameter; } returnnull; } }MySql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。五、测试接下来写一个测试用例来看一下使用批量插入的效果。
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
说课包括哪些方面 说课内容包括()。 如何在手机百度上删除对话记录? 结核病是什么样的疾病? 曹丕17岁得了肺痨,明知自己命不长久,还要强争王位,是不是很自私呢?_百... 古代小说常出现的病名 急求一篇"生活小窍门"(500字)的作文 至今最有什么小妙招 健康的戒烟方法 笔记本电池锁死是什么原因引起的? 上海java培训完工作难找吗? 上海上班,想学点java方面的知识,但是培训班学费有点高,怎么办? 怎么自学Java?Java前景如何? 去上海工作java需要学到什么程度 上海Java培训机构四个月真能学习吗 没有基础,在上海学习Java可以吗? 勤诚达正大城这个楼盘怎么样呢? 深圳勤诚达·正大城花园优劣势? 请问勤诚达正大城的住宅什么时候加推? iwatch2.1为什么手表不能添加qq 为什么我的Iwatch3安装应用没有QQ 现在iwatch为什么不能上QQ了? 苹果iwatchse为什么微信能显示联系人QQ不行呢? 苹果手机更新了qq和微信在iwatch里不能显示了 用什么方法可以去掉乳胶漆 这道题是什么思路来做?或者说属于行测中哪一部分的题?能在哪里看到解题思路 高速公路没有油怎么紧急处理? apple watch5 收不到qq群消息? 万一高速路上堵车没油了我们应该怎么办? iwatch6的qq不接收消息 android与ios的app测试有什么区别? 小米手环三有nfc吗 小米手环3nfc刷卡有优惠吗 睡觉的时候梦见老爸打儿子媳妇(也就是打我媳妇),是什么兆头? 网贷好几个逾期好几天了,有啥办法大额度贷款,全还上,然后还一个啊,骗子别来 叶罗丽娃娃的魔法咒语齐娜 想把网贷还清 叶萝莉战士魔法口号是什么 我梦到我爸打我儿子 梦见老爸打小孩和我发生战争 学生坐飞机,要带什么证件? 做飞机需要带哪些证件 开刀后能吃韭菜吗 手术后能吃韭菜饺子吗 坐飞机要准备什么证件吗? 有伤口可以吃韭菜吗? 吃韭菜对伤口愈合有影响吗 做完手术能吃韭菜鸡蛋吗 动了手术的人,可不可以吃韭菜炒蛋 做了整形手术后可以吃韭菜吗