Как преобразовать итератор в сплитератор - PullRequest
0 голосов
/ 26 мая 2020

У меня есть 4 больших файла (около 1,5 ГБ каждый), и я хочу обработать эти файлы, прочитать каждую строку файла и преобразовать ее в объект клиента. У меня есть следующая реализация.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;

import static java.nio.charset.StandardCharsets.UTF_8;

public class CustomerDataAccess {

    public static void main(String[] args) throws IOException {
        CustomerFileItem john = new CustomerFileItem("CustFile1", "http://w.customer1.com");
        CustomerFileItem sarah = new CustomerFileItem("CustFile2", "http://w.customer2.com");
        CustomerFileItem charles = new CustomerFileItem("CustFile3", "http://w.customer3.com");
        List<CustomerFileItem> customers = Arrays.asList(john, sarah, charles);

        Iterator<CustomerFileLineItem> custList = new CustIterator(customers);
    }

    public static class CustIterator implements Iterator<CustomerFileLineItem> {

        private static final int HEADER_LINES = 9; // 8 + 1 blank line
        BufferedReader bufferedReader;

        private int index = 0;
        private final List<CustomerFileItem> custFileItems = new ArrayList<>();


        public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
            this.custFileItems.addAll(custFileItems);
            processNext();
        }

        private void processNext() throws IOException {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
            if (index < custFileItems.size()) { // only update if there's another file
                CustomerFileItem custFileItem = custFileItems.get(index);
                GZIPInputStream gis = new GZIPInputStream(new URL(custFileItem.url).openStream());
                // default buffer size is 8 KB
                bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
                // read the first few lines
                for (int i = 0; i < HEADER_LINES; i++) {
                    bufferedReader.readLine();
                }
            }
            index++;
        }

        @Override
        public boolean hasNext() {
            try {
                boolean currentReaderStatus = bufferedReader.ready();
                if (currentReaderStatus) {
                    return true;
                } else if (index < custFileItems.size()) {
                    // at end of current file, try to get the next one
                    processNext();
                    return hasNext();
                } else { // no more files left
                    return false;
                }
            } catch (IOException e) {
                try {
                    bufferedReader.close();
                } catch (IOException e1) {
                    throw new UncheckedIOException(e1);
                }
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public CustomerFileLineItem next() {
            try {
                String line = bufferedReader.readLine();
                if (line != null) {
                    return new CustomerFileLineItem(line);
                } else {
                    return null;
                }
            } catch (IllegalArgumentException exception) {
                return null;
            } catch (IOException e) {
                try {
                    bufferedReader.close();
                } catch (IOException e1) {
                    throw new UncheckedIOException(e1);
                }
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
            throw new UnsupportedOperationException();
        }
    }


    public static class CustomerFileLineItem {
        private static final int NUMBER_OF_FIELDS = 4;

        final String id;
        final String productNumber;
        final String usageType;
        final String operation;


        public CustomerFileLineItem(final String line) {
            String[] strings = line.split(",");
            if (strings.length != NUMBER_OF_FIELDS) {
                throw new IllegalArgumentException(String.format("Malformed customer file line: %s", line));
            }
            this.id = strings[0];
            this.productNumber = strings[1];
            this.usageType = strings[3];
            this.operation = strings[4];
        }
    }

    static class CustomerFileItem {
        private String fileName;
        private String url;

        public CustomerFileItem(String fileName, String url) {
            this.fileName = fileName;
            this.url = url;
        }
    }


}

В одном из вариантов использования я хочу использовать потоки в списке вывода (custList). Но я знаю, что не могу использовать потоки с Iterator. Как мне преобразовать его в Spliterator? Или как я могу реализовать то же самое, что я реализую с помощью Iterator в Spliterator?

Ответы [ 2 ]

4 голосов
/ 26 мая 2020

TL; DR Вам не нужно реализовывать Iterator или Spliterator, вы можете просто использовать Stream в первую очередь:

private static final int HEADER_LINES = 9; // 8 + 1 blank line
Stream<CustomerFileLineItem> stream = customers.stream()
    .flatMap(custFileItem -> {
        try {
            GZIPInputStream gis
                = new GZIPInputStream(new URL(custFileItem.url).openStream());
            BufferedReader br = new BufferedReader(new InputStreamReader(gis, UTF_8));
            // read the first few lines
            for (int i = 0; i < HEADER_LINES; i++) br.readLine();
            return br.lines().onClose(() -> {
              try { br.close(); }
              catch(IOException ex) { throw new UncheckedIOException(ex); }
            });
        } catch(IOException ex) {
            throw new UncheckedIOException(ex);
        }
    })
    .map(CustomerFileLineItem::new);

Но для полноты, рассмотрим вопрос буквально:

Прежде всего, вы не должны добавлять определение метода, например

@Override
public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
    throw new UnsupportedOperationException();
}

Этот метод наверняка даст обратный эффект при использовании Stream API, так как это где завершится большинство операций без короткого замыкания.

Нет даже причины его добавлять. Если вы не объявляете метод, вы получите разумный метод по умолчанию из интерфейса Iterator.

Когда вы исправите эту проблему, вы можете легко преобразовать Iterator в Spliterator, используя Spliterators.pliteratorUnknownSize(Iterator, int).

Но для этого нет причин. Ваш код становится проще при реализации Spliterator в первую очередь:

public static class CustIterator
                    extends Spliterators.AbstractSpliterator<CustomerFileLineItem> {
    private static final int HEADER_LINES = 9; // 8 + 1 blank line
    BufferedReader bufferedReader;

    private final ArrayDeque<CustomerFileItem> custFileItems;

    public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
        super(Long.MAX_VALUE, ORDERED|NONNULL);
        this.custFileItems = new ArrayDeque<>(custFileItems);
        processNext();
    }

    @Override
    public boolean tryAdvance(Consumer<? super CustomerFileLineItem> action) {
        if(bufferedReader == null) return false;
        try {
            String line = bufferedReader.readLine();
            while(line == null) {
                processNext();
                if(bufferedReader == null) return false;
                line = bufferedReader.readLine();
            }
            action.accept(new CustomerFileLineItem(line));
            return true;
        }
        catch(IOException ex) {
            if(bufferedReader != null) try {
                bufferedReader.close();
                bufferedReader = null;
            }
            catch(IOException ex2) {
                ex.addSuppressed(ex2);
            }
            throw new UncheckedIOException(ex);
        }
    }

    private void processNext() throws IOException {
        if (bufferedReader != null) {
            bufferedReader.close();
            bufferedReader = null;
        }
        if (!custFileItems.isEmpty()) { // only update if there's another file
            CustomerFileItem custFileItem = custFileItems.remove();
            GZIPInputStream gis
                = new GZIPInputStream(new URL(custFileItem.url).openStream());
            // default buffer size is 8 KB
            bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
            // read the first few lines
            for (int i = 0; i < HEADER_LINES; i++) {
                bufferedReader.readLine();
            }
        }
    }
}

Но, как было сказано в начале, вам даже не нужно реализовывать Spliterator здесь .

1 голос
/ 26 мая 2020

Каждый объект Iterable<T> имеет следующие методы:

Следовательно, вы хотите создать Iterable<T> обратно из Iterator<T>, что требует переопределения только одного нестандартного и абстрактного метода :

Iterable<CustomerFileLineItem> iterable = new Iterable<CustomerFileLineItem>() {
    @Override
    public Iterator<CustomerFileLineItem> iterator() {
        return custList;
    }
};

Это может быть сокращено до лямбда-выражения, в результате получится:

Iterable<CustomerFileLineItem> iterable = () -> custList;
Spliterator<CustomerFileLineItem> spliterator = iterable.spliterator();

... так что Stream легко создать:

Stream<CustomerFileLineItem> stream = StreamSupport.stream(spliterator, false);
...