Как реализовать интерфейс IDataReader для обработки данных перед вставкой? - PullRequest
3 голосов
/ 10 апреля 2019

У меня есть хранимая процедура, которая дает мне набор результатов, состоящий из одного столбца, который содержит миллионы необработанных строк. Мне нужно перенести эти данные на другой сервер с помощью SqlBulkCopy, но проблема в том, что я не могу просто сделать следующее:

using (var con = new SqlConnection(sqlConnectionStringSource))
{
    using (var cmd = new SqlCommand("usp_GetUnprocessedData", con))
    {
        cmd.CommandType = CommandType.StoredProcedure;
        con.Open();
        using (var reader = cmd.ExecuteReader())
        {
            using (var sqlBulk = new SqlBulkCopy(sqlConnectionStringDestination))
            {
                sqlBulk.DestinationTableName = "BulkCopy";
                sqlBulk.BulkCopyTimeout = 0;
                sqlBulk.BatchSize = 200000;
                sqlBulk.WriteToServer(reader);
            }
        }
    }
}

потому что данные вообще не будут обрабатываться.

В моем случае n-я строка набора результатов выглядит следующим образом:

value1_n,value2_n,value3_n

где n - это просто нижний индекс, который я ввел, чтобы различать различные строки.

В таблице назначения, которую я назвал BulkCopy, я хотел бы иметь:

╔══════════╦══════════╦══════════╗
║  Field1  ║  Field2  ║  Field3  ║
╠══════════╬══════════╬══════════╣
║ Value1_1 ║ Value2_1 ║ Value3_1 ║
║ Value1_2 ║ Value2_2 ║ Value3_2 ║
║ ...      ║ ...      ║ ...      ║
║ Value1_n ║ Value2_n ║ Value3_n ║
╚══════════╩══════════╩══════════╝

Мне сказали использовать пользовательский DataReader через реализацию интерфейса IDataReader, чтобы обрабатывать данные построчно до того, как SqlBulkCopy скопирует данные из него, используя EnableStreamingProperty = true, чтобы гарантировать, что только небольшое количество данных находится в памяти, но я понятия не имею, с чего начать. Можете ли вы помочь мне, пожалуйста?

Ответы [ 2 ]

1 голос
/ 10 апреля 2019

Давайте обратим проблему. Вместо того, чтобы искать общее решение, создайте одну конкретную для этой проблему. Потратив несколько дней на создание оболочки IDataReader, я знаю, что это не , что тривиально.

Мы знаем, сколько полей, мы не заботимся о других полях в результатах. Вместо того, чтобы пытаться правильно реализовать оболочку IDataReader, мы могли бы создать метод итератора для разделения данных и возврата записей по очереди в потоковом режиме. ObjectMader FastMember может обернуть IDataReader интерфейс через любой IEnumerable:

class MyDTO
{
    public string Field1{get;set;}
    public string Field2{get;set;}
    public string Field3{get;set;}
}

public IEnumerable<MyDTO> ReaderToStream(IDataReader reader)
{
    while(reader.Read())
    {
        var line=reader.GetString(0);
        var fields=String.Split(",",line);
        yield return new MyDTO{Field1=fields[0];Field2=fields[1];Field3=fields[2]};
    }
}

Метод импорта может измениться на:

using (var con = new SqlConnection(sqlConnectionStringSource))
{
    ...
    using (var reader = cmd.ExecuteReader())
    {
        var recordStream=ReaderToStream(reader);
        using(var rd=ObjectReader(recordStream))
        using (var sqlBulk = new SqlBulkCopy(sqlConnectionStringDestination))
        {
            ...
            sqlBulk.WriteToServer(rd);
        }
    }
}

Итератор вызывает Read() только тогда, когда SqlBulkCopy запрашивает новую запись, поэтому мы не в итоге загружаем все в память.

И оболочка IDataReader

Resharper и Visual Studio 2019 предлагают реализовать интерфейс, делегируя вызовы обернутому классу. В Visual Studio 2019 это называется Implement interface through 'field_name'.

Начиная с этого кода:

class ReaderWrapper:IDataReader
{
    private readonly IDataReader _inner ;
    public ReaderWrapper(IDataReader inner)
    {
        _inner = inner;
    }
}

Применение рефакторинга дает:

class ReaderWrapper:IDataReader
{
    private readonly IDataReader _inner ;
    public ReaderWrapper(IDataReader inner)
    {
        _inner = inner;
    }

    public object this[int i] => _inner[i];

    public object this[string name] => _inner[name];

    public int Depth => _inner.Depth;

    public bool IsClosed => _inner.IsClosed;

    public int RecordsAffected => _inner.RecordsAffected;

    public int FieldCount => _inner.FieldCount;

    public void Close() => _inner.Close();
    public void Dispose() => _inner.Dispose();
    public bool GetBoolean(int i) => _inner.GetBoolean(i);
    public byte GetByte(int i) => _inner.GetByte(i);
    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) => _inner.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
    public char GetChar(int i) => _inner.GetChar(i);
    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) => _inner.GetChars(i, fieldoffset, buffer, bufferoffset, length);
    public IDataReader GetData(int i) => _inner.GetData(i);
    public string GetDataTypeName(int i) => _inner.GetDataTypeName(i);
    public DateTime GetDateTime(int i) => _inner.GetDateTime(i);
    public decimal GetDecimal(int i) => _inner.GetDecimal(i);
    public double GetDouble(int i) => _inner.GetDouble(i);
    public Type GetFieldType(int i) => _inner.GetFieldType(i);
    public float GetFloat(int i) => _inner.GetFloat(i);
    public Guid GetGuid(int i) => _inner.GetGuid(i);
    public short GetInt16(int i) => _inner.GetInt16(i);
    public int GetInt32(int i) => _inner.GetInt32(i);
    public long GetInt64(int i) => _inner.GetInt64(i);
    public string GetName(int i) => _inner.GetName(i);
    public int GetOrdinal(string name) => _inner.GetOrdinal(name);
    public DataTable GetSchemaTable() => _inner.GetSchemaTable();
    public string GetString(int i) => _inner.GetString(i);
    public object GetValue(int i) => _inner.GetValue(i);
    public int GetValues(object[] values) => _inner.GetValues(values);
    public bool IsDBNull(int i) => _inner.IsDBNull(i);
    public bool NextResult() => _inner.NextResult();
    public bool Read() => _inner.Read();
}

Чтобы создать разделительную оболочку, нам нужно заменить Read() нашей собственной версией:

    private string[] _values;

    public bool Read()
    {
        var ok = _inner.Read();
        if (ok)
        {
            //It *could be null*
            if (_inner.IsDBNull(0))
            {
                //What to do? Store an empty array for now
                _values = new string[0];
            }
            var fieldValue = _inner.GetString(0);                
            _values= fieldValue.Split(',');
        }
        return ok;
    }

Это разбивает значения CSV и сохраняет их в строке. Это показывает, почему реализация обертки немного беспокоит - нам нужно разобраться с несколькими вещами и решить, что делать в непредвиденных ситуациях, таких как нули, пустые строки и т. Д.

После этого нам нужно добавить наши собственные реализации для методов, вызываемых SqlBulkCopy. GetValue() определенно называется, как и FieldCount. Другие члены вызываются на основе типов сопоставления столбцов, по имени или по порядковому номеру.

public int FieldCount => _values.Length;

public string GetString(int ordinal) => _values[ordinal];

public object GetValue(int ordinal)=> _values[ordinal];

//What if we have more values than expected?
public int GetValues(object[] values)
{
    if (_values.Length > 0)
    {
        Array.Copy(_values, values,_values.Length);
        return _values.Length;
    }
    return 0;
}

А теперь "забавные" части. А как насчет GetName()? Вероятно:

public string GetName(int ordinal) => $"Field{ordinal}";

GetOrdinal? Это может быть вызвано в отображении имени. Хитрость:

public int GetOrdinal(string name) => int.Parse(name.Substring(5));

Будем надеяться, что это работает.

Нам также нужно переопределить индексы:

    public object this[string name] => _values[GetOrdinal(name)];

    public object this[int i] => _values[i];

Что я забыл? ... Еще нужно обрабатывать произвольные числа значений. Нужно обрабатывать нули. Нет GetSchemaTable, что, вероятно, означает, что сопоставления столбцов должны быть указаны явно, возможно, по порядковому номеру.

Быстрая и грязная IsDbNull реализация может быть:

public bool IsDBNull(int i)
{  
    //Covers the "null" case too, when `Length` is 0
    if (i>_values.Length-1)
    {
        return true;
    }
    return _inner.IsDBNull(i);
}

GetSchemaTable сложнее, потому что мы не знаем, сколько значений в каждой записи. В таблице более 20 столбцов, поэтому я бы предпочел , а не писать этот код, пока не увижу, что он необходим.

public DataTable GetSchemaTable() => throw new NotImplementedException();

Leave it as an excercise to the reader как говорится

PPS: реализации интерфейса по умолчанию, потому что почему бы не

Все это, вероятно, хороший, но запутанный случай, когда методы интерфейса C # 8 по умолчанию могут быть использованы для создания обернутой черты читателя. По умолчанию отложите на завернутый внутренний ридер. Это исключило бы все отложенные вызовы в реализации.

interface IReaderWrapper:IDataReader
{
    //Gives access to the wrapped reader in the concrete classes
    abstract IDataReader Inner();

    override object this[int i] => Inner()[i];

    override object this[string name] => Inner()[name];

    override int Depth => Inner().Depth;

    override bool IsClosed => Inner().IsClosed;
    ...
}

class SplitterWrapper:IReaderWrapper
{

    private readonly IDataReader _inner ;
    public SplitterWrapper(IDataReader inner)
    {
        _inner = inner;
    }

    IDataReader Inner()=> _inner;

    string[] _values;
    public object this[int i] => _values[i];
    ...
}

Эта функция не работает в компиляторе C # 8, поставляемом с VS 2019, и каким-то образом приводит к сбою Sharplab.io. Не знаю, скомпилируется ли он или действительно нужны переопределения.

0 голосов
/ 10 апреля 2019

Я нашел следующий код проекта: https://www.codeproject.com/script/Articles/ViewDownloads.aspx?aid=1095790. Похоже, вам нужно взять данные CSV и разделить на объекты.Я изменил код проекта с кодом ниже.Многие типы не реализованы, и вам может потребоваться реализовать некоторые дополнительные методы.Также не уверен, какого типа должны быть результаты Value.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;



namespace ConsoleApplication108
{
    class Program
    {
        static void Main(string[] args)
        {

        }
    }
    public class MyDataReader : IDataReader 
    {
        private SqlConnection conn { get; set; }
        private SqlCommand cmd { get; set; }
        private SqlDataReader reader { get; set; }
        private DataTable schemaTable { get; set; }

        private string data { get; set; }
        private object[] arrayData { get; set; }
        private IEnumerator<object> m_dataEnumerator { get; set; }


        public MyDataReader(string commandText, string connectionString, List<KeyValuePair<string, Type>> columns)
        {
            conn = new SqlConnection(connectionString);
            conn.Open();
            cmd = new SqlCommand(commandText, conn);
            reader = cmd.ExecuteReader();

            schemaTable = new DataTable();
            foreach(KeyValuePair<string,Type> col in columns)
            {
                schemaTable.Columns.Add(col.Key, col.Value);
            }
        }
        public Boolean NextResult()
        {
            return reader.Read();
        }
        public int RecordsAffected
        {
            get { return -1; }
        }
        public int Depth
        {
            get { return -1; }
        }
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        private void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (m_dataEnumerator != null)
                {
                    m_dataEnumerator.Dispose();
                    m_dataEnumerator = null;
                }
            }
        }

        public Boolean IsClosed {
            get { return reader.IsClosed; }
        }
        public Boolean Read()
        {

            if (IsClosed)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
            else
            {
                arrayData = reader.GetString(0).Split(new char[] { ',' }).ToArray();
            }
            return m_dataEnumerator.MoveNext();

        }
        public DataTable GetSchemaTable()
        {
            return schemaTable;
        }
        public void Close()
        {
            Dispose();
        }


        public object this[string name]
        {
            get { throw new NotImplementedException(); }

        }

        public object this[int i]
        {
            get { return arrayData[i]; }
        }
        public int FieldCount
        {
            get { return arrayData.Length; }
        }
        public bool IsDBNull(int i)
        {
              throw new NotImplementedException();
        }
        public bool GetBoolean(int i)
        {
            throw new NotImplementedException();
        }

        public byte GetByte(int i)
        {
            throw new NotImplementedException();
        }

        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        public char GetChar(int i)
        {
            throw new NotImplementedException();
        }

        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        public IDataReader GetData(int i)
        {
            throw new NotImplementedException();
        }

        public string GetDataTypeName(int i)
        {
            throw new NotImplementedException();
        }

        public DateTime GetDateTime(int i)
        {
            throw new NotImplementedException();
        }

        public decimal GetDecimal(int i)
        {
            throw new NotImplementedException();
        }

        public double GetDouble(int i)
        {
            throw new NotImplementedException();
        }

        public Type GetFieldType(int i)
        {
            throw new NotImplementedException();
        }

        public float GetFloat(int i)
        {
            throw new NotImplementedException();
        }

        public Guid GetGuid(int i)
        {
            throw new NotImplementedException();
        }

        public short GetInt16(int i)
        {
            throw new NotImplementedException();
        }

        public int GetInt32(int i)
        {
            throw new NotImplementedException();
        }

        public long GetInt64(int i)
        {
            throw new NotImplementedException();
        }

        public string GetName(int i)
        {
            throw new NotImplementedException();
        }

        public string GetString(int i)
        {
            throw new NotImplementedException();
        }

        public int GetValues(object[] values)
        {
            values = arrayData;

            return arrayData.Length;
        }
        public int GetOrdinal(string name)
        {
            throw new NotImplementedException();
        }

        public object GetValue(int i)
        {
            return arrayData[i];
        }



    }
}
...