Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
2.4k views
in Technique[技术] by (71.8m points)

c# - Bulk insert / copy iEnumerable into table with npgsql

I have a method that takes in an IEnumerable, and loops through the IEnumerable and inserts each into a table in the DB. like so:

    public void Write(IEnumerable<foo> fooData)
    {
            for (var index = 0; index < fooData.Count(); index++)
            {
               var sql = @"insert into foo (col_id, col_name) values (@col_id, col_name)";
            }
    }

foo is a class that reflects the table in the db:

      public class Foo
      {
        public int col_id {get;set;}
        public string col_name {get;set;}
      }

its come to light that using a for loop is not efficient if my row has thousands of entries. What is more efficient way of copying all this data when an IEnumerable is concerned?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I have written a class that behaves like the inbuilt SqlBulkCopy class for Postgres. It wraps the COPY command to provide fast uploads. The method for IEnumerable looks like this (there is a similar one for DataTable).

public void WriteToServer<T>(IEnumerable<T> data)
{
    try
    {
        if (DestinationTableName == null || DestinationTableName == "")
        {
            throw new ArgumentOutOfRangeException("DestinationTableName", "Destination table must be set");
        }
        PropertyInfo[] properties = typeof(T).GetProperties();
        int colCount = properties.Length;

        NpgsqlDbType[] types = new NpgsqlDbType[colCount];
        int[] lengths = new int[colCount];
        string[] fieldNames = new string[colCount];

        using (var cmd = new NpgsqlCommand("SELECT * FROM " + DestinationTableName + " LIMIT 1", conn))
        {
            using (var rdr = cmd.ExecuteReader())
            {
                if (rdr.FieldCount != colCount)
                {
                    throw new ArgumentOutOfRangeException("dataTable", "Column count in Destination Table does not match column count in source table.");
                }
                var columns = rdr.GetColumnSchema();
                for (int i = 0; i < colCount; i++)
                {
                    types[i] = (NpgsqlDbType)columns[i].NpgsqlDbType;
                    lengths[i] = columns[i].ColumnSize == null ? 0 : (int)columns[i].ColumnSize;
                    fieldNames[i] = columns[i].ColumnName;
                }
            }

        }
        var sB = new StringBuilder(fieldNames[0]);
        for (int p = 1; p < colCount; p++)
        {
            sB.Append(", " + fieldNames[p]);
        }
        using (var writer = conn.BeginBinaryImport("COPY " + DestinationTableName + " (" + sB.ToString() + ") FROM STDIN (FORMAT BINARY)"))
        {
            foreach (var t in data)
            {
                writer.StartRow();

                for (int i = 0; i < colCount; i++)
                {
                    if (properties[i].GetValue(t) == null)
                    {
                        writer.WriteNull();
                    }
                    else
                    {
                        switch (types[i])
                        {
                            case NpgsqlDbType.Bigint:
                                writer.Write((long)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Bit:
                                if (lengths[i] > 1)
                                {
                                    writer.Write((byte[])properties[i].GetValue(t), types[i]);
                                }
                                else
                                {
                                    writer.Write((byte)properties[i].GetValue(t), types[i]);
                                }
                                break;
                            case NpgsqlDbType.Boolean:
                                writer.Write((bool)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Bytea:
                                writer.Write((byte[])properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Char:
                                if (properties[i].GetType() == typeof(string))
                                {
                                    writer.Write((string)properties[i].GetValue(t), types[i]);
                                }
                                else if (properties[i].GetType() == typeof(Guid))
                                {
                                    var value = properties[i].GetValue(t).ToString();
                                    writer.Write(value, types[i]);
                                }


                                else if (lengths[i] > 1)
                                {
                                    writer.Write((char[])properties[i].GetValue(t), types[i]);
                                }
                                else
                                {

                                    var s = ((string)properties[i].GetValue(t).ToString()).ToCharArray();
                                    writer.Write(s[0], types[i]);
                                }
                                break;
                            case NpgsqlDbType.Time:
                            case NpgsqlDbType.Timestamp:
                            case NpgsqlDbType.TimestampTz:
                            case NpgsqlDbType.Date:
                                writer.Write((DateTime)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Double:
                                writer.Write((double)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Integer:
                                try
                                {
                                    if (properties[i].GetType() == typeof(int))
                                    {
                                        writer.Write((int)properties[i].GetValue(t), types[i]);
                                        break;
                                    }
                                    else if (properties[i].GetType() == typeof(string))
                                    {
                                        var swap = Convert.ToInt32(properties[i].GetValue(t));
                                        writer.Write((int)swap, types[i]);
                                        break;
                                    }
                                }
                                catch (Exception ex)
                                {
                                    string sh = ex.Message;
                                }

                                writer.Write((object)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Interval:
                                writer.Write((TimeSpan)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Numeric:
                            case NpgsqlDbType.Money:
                                writer.Write((decimal)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Real:
                                writer.Write((Single)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Smallint:

                                try
                                {
                                    if (properties[i].GetType() == typeof(byte))
                                    {
                                        var swap = Convert.ToInt16(properties[i].GetValue(t));
                                        writer.Write((short)swap, types[i]);
                                        break;
                                    }
                                    writer.Write((short)properties[i].GetValue(t), types[i]);
                                }
                                catch (Exception ex)
                                {
                                    string ms = ex.Message;
                                }

                                break;
                            case NpgsqlDbType.Varchar:
                            case NpgsqlDbType.Text:
                                writer.Write((string)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Uuid:
                                writer.Write((Guid)properties[i].GetValue(t), types[i]);
                                break;
                            case NpgsqlDbType.Xml:
                                writer.Write((string)properties[i].GetValue(t), types[i]);
                                break;
                        }
                    }
                }
            }
            writer.Complete();
        }
    }
    catch (Exception ex)
    {
        throw new Exception("Error executing NpgSqlBulkCopy.WriteToServer().  See inner exception for details", ex);
    }
}

You need to set the Property DestinationTableName first and conn needs to be an open connection.

Essentially the method uses Reflection to get the properties of the type of the passed list. Obviously the data types of the table being filled have to match! The writer builds up by iterating through the list, and then does one bulk insert at the end. I may not have dealt with all the types that you need, but it should be clear how to add them, where they are missing.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to WuJiGu Developer Q&A Community for programmer and developer-Open, Learning and Share
...