Создание компонента потока пользовательских данных SSIS + SSIS 2008 - PullRequest
1 голос
/ 06 июля 2010

Я новичок в пользовательском компоненте служб SSIS. Только что начал кодировать компонент, в котором количество входных строк никогда не будет таким же, как количество выходных строк. Для каждой входной строки выполняется некоторая проверка и генерируется n строк, которые необходимо отобразить в выходной буфер.

Так что после кодирования проверки времени проектирования все в порядке.

мой код RunTime, как показано ниже:

    public override void PreExecute()
    {
        IDTSInput100 input = ComponentMetaData.InputCollection[0];
        inputBufferColumnIndex = new int[input.InputColumnCollection.Count];

        for (int x = 0; x < input.InputColumnCollection.Count; x++)
        {
            IDTSInputColumn100 column = input.InputColumnCollection[x];
            inputBufferColumnIndex[x] = BufferManager.FindColumnByLineageID  (input.Buffer, column.LineageID);
        }

        IDTSOutput100 output = ComponentMetaData.OutputCollection[0];
        outputBufferColumnIndex = new int[output.OutputColumnCollection.Count];

        for (int x = 0; x < output.OutputColumnCollection.Count; x++)
        {
            IDTSOutputColumn100 outcol = output.OutputColumnCollection[x];
            outputBufferColumnIndex[x] = BufferManager.FindColumnByLineageID(input.Buffer, outcol.LineageID);
        }

    }


    public override void ProcessInput(int inputID, PipelineBuffer buffer)
    {
        if(!buffer.EndOfRowset)
        {
            while (buffer.NextRow())
            {
                var rec = new Record
                                 {
                                     Source = buffer[0].ToString(),
                                     Nk = buffer[1].ToString(),
                                     Guid = new Guid(buffer[2].ToString()),
                                     FromDate = Convert.ToDateTime(buffer[3].ToString()),
                                     ToDate = Convert.ToDateTime(buffer[4].ToString())
                                 };
                sourceRecords.Add(rec);
            }
            ProcessArray(sourceRecords,buffer);
        }
    }
    public void ProcessArray(List<Record> records, PipelineBuffer buffer)
    {
        //Get Distinct NKs from the source Records
        List<string> nKs = (from c in records select c.Nk).Distinct().ToList();


        foreach (var nk in nKs)
        {
            //Get all the record for particular NK
            List<Record> filteredRecords = (from c in sourceRecords where c.Nk == nk select c)
                                                .OrderBy(c => c.Source)
                                                .ThenBy(c => c.FromDate)
                                                .ThenBy(c => c.ToDate).ToList();

            foreach (var filteredRecord in filteredRecords)
            {
                _start = filteredRecord.FromDate;
                _end = filteredRecord.ToDate;
                while (filteredRecord.WriteComplete == false)
                {
                    foreach (var record in filteredRecords)
                    {
                        if (record.FromDate > _start && record.FromDate < _end) _end = record.ToDate;
                        if (record.ToDate < _end && record.ToDate > _start) _end = record.ToDate;
                    }

                    //Output0Buffer.AddRow();
                    //Output0Buffer.outSource = filteredRecord.Source;
                    //Output0Buffer.outNK = filteredRecord.Nk;
                    //Output0Buffer.outRecid = filteredRecord.Guid;
                    //Output0Buffer.outFromDate = _start;
                    //Output0Buffer.outToDate = _end;
                    buffer.SetString(5,filteredRecord.Source);
                    buffer.SetString(6,filteredRecord.Nk);
                    buffer.SetGuid(7,filteredRecord.Guid);
                    buffer.SetDateTime(8,filteredRecord.FromDate);
                    buffer.SetDateTime(9,filteredRecord.ToDate);

                    _start = _end;
                    _end = filteredRecord.ToDate;

                    if (_start == _end) filteredRecord.WriteComplete = true;
                }
            }
        }
    }
}
public class Record
{
    public Guid Guid { get; set; }
    public string Nk { get; set; }
    public string Source { get; set; }
    public DateTime FromDate { get; set; }
    public DateTime ToDate { get; set; }
    public bool WriteComplete { get; set; }
}

В моем методе ProcessArray я пытаюсь заполнить буфер вывода. Я даже не уверен, что это можно сделать.

Любое руководство будет оценено.

Спасибо

Ответы [ 2 ]

0 голосов
/ 14 июля 2010

Я не уверен, что понимаю, чего вы пытаетесь достичь, но похоже, что вы пытаетесь отсортировать все данные и затем последовательно обработать отсортированный список.Обратите внимание, что ваш метод ProcessInput вызывается несколько раз, каждый с новым буфером.Любая сортировка, которую вы выполняете с полученным буфером, применяется только к этому конкретному буферу - данные не сортируются глобально, поэтому ваши результаты могут отличаться в зависимости от границ буфера.

Это нормально для конкретного сценария?Если нет, используйте преобразование Sort для сортировки всех данных, добавьте преобразование после Sort и просто обработайте данные построчно - они уже отсортированы.Так что просто прочитайте его построчно, затем измените строку current после чтения - это то, для чего используется buffer.SetString.

Кроме того, не указывайте жесткие индексы столбцов, например, buffer.SetString (5, ...) - числа могут меняться, лучше получить и сохранить индекс столбца в PreExecute, затем использовать что-то вроде buffer.SetString (nkColumnIndex, nkColumnValue);

0 голосов
/ 09 июля 2010

Да, этот тип преобразования может быть выполнен, он называется асинхронным преобразованием.Ваш код выглядит хорошо для меня.из вашего вопроса неясно, возникла ли у вас конкретная проблема.

Возможно, вы захотите попытаться создать преобразование компонента Asynchronous Script Component, чтобы вам не пришлось возиться со всеми системами SSIS.

подробнее здесь: http://msdn.microsoft.com/en-us/library/ms136133.aspx

http://msdn.microsoft.com/en-us/library/ms135931.aspx

...