Hadoop: промежуточное слияние не удалось - PullRequest
7 голосов
/ 07 апреля 2011

У меня странная проблема.Когда я запускаю свое задание Hadoop над большим набором данных (> 1 ТБ сжатых текстовых файлов), некоторые из задач сокращения завершаются неудачно с такими трассировками стека:

java.io.IOException: Task: attempt_201104061411_0002_r_000044_0 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
    at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
    at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:139)
    at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
    at org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:335)
    at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350)
    at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2698)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:375)
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:241)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125)
    ... 8 more
java.io.IOException: Task: attempt_201104061411_0002_r_000056_0 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
    at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
    at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:123)
    at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:50)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:447)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:107)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:93)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2689)
    ... 1 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at org.apache.hadoop.io.Text.readString(Text.java:402)
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:240)
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122)
    ... 9 more

Не все мои редукторы дают сбой.Некоторые часто преуспевают прежде, чем я вижу неудачи с другими.Как вы можете видеть, трассировки стека, кажется, всегда происходят из IPAndIPCookieCount.readFields() и всегда в стадии слияния в памяти, но не всегда из одной и той же части readFields.

Это задание успешно выполняется при работе на меньшемнаборы данных (около 1/30 размера).Выходных данных для задания почти столько же, сколько и каждой выходной записи.По сути, эта работа является реализацией вторичной сортировки.

Мы используем дистрибутив Hadoop CDH3.

Вот моя пользовательская реализация WritableComparable:

public static class IpAndIpCookieCount implements WritableComparable<IpAndIpCookieCount> {

        private String ip;
        private int ipCookieCount;

        public IpAndIpCookieCount() {
            // empty constructor for hadoop
        }

        public IpAndIpCookieCount(String ip, int ipCookieCount) {
            this.ip = ip;
            this.ipCookieCount = ipCookieCount;
        }

        public String getIp() {
            return ip;
        }

        public int getIpCookieCount() {
            return ipCookieCount;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            ip = Text.readString(in);
            ipCookieCount = in.readInt();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            Text.writeString(out, ip);
            out.writeInt(ipCookieCount);
        }

        @Override
        public int compareTo(IpAndIpCookieCount other) {
            int firstComparison = ip.compareTo(other.getIp());
            if (firstComparison == 0) {
                int otherIpCookieCount = other.getIpCookieCount();
                if (ipCookieCount == otherIpCookieCount) {
                    return 0;
                } else {
                    return ipCookieCount < otherIpCookieCount ? 1 : -1;
                }
            } else {
                return firstComparison;
            }
        }

        @Override
        public boolean equals(Object o) {
            if (o instanceof IpAndIpCookieCount) {
                IpAndIpCookieCount other = (IpAndIpCookieCount) o;
                return ip.equals(other.getIp()) && ipCookieCount == other.getIpCookieCount();
            } else {
                return false;
            }
        }

        @Override
        public int hashCode() {
            return ip.hashCode() ^ ipCookieCount;
        }

    }

readFields метод очень прост, и я не вижу никаких проблем в этом классе.Кроме того, я видел других людей, получающих, по сути, ту же трассировку стека:

Никто, похоже, на самом деле не понял, что стоит за этим.Последние два, кажется, предполагают, что это может быть проблема с памятью (хотя эти трассировки стека не OutOfMemoryException с).Как и в последнем посте в этом списке ссылок, я попытался установить большее количество редукторов (до 999), но все равно получаю сбои.Я (пока) не пытался выделить больше памяти для сокращения задач, поскольку это потребовало бы от нас перенастройки нашего кластера.

Это ошибка в Hadoop?Или я что-то не так делаю?

РЕДАКТИРОВАТЬ : Мои данные разбиты по дням.Если я выполню работу 7 раз, по одному разу на каждый день, все 7 выполнятся.Если я выполняю одну работу в течение всех 7 дней, она терпит неудачу.Большой отчет за все 7 дней будет содержать те же ключи, что и меньшие (в совокупности), но, очевидно, не в том же порядке, с теми же редукторами и т. Д.

1 Ответ

1 голос
/ 13 мая 2011

Я думаю, что это артефакт бэкпорта Клоудеры от MAPREDUCE-947 до CDH3.Этот патч приводит к формированию файла _SUCCESS для успешного выполнения задания.

Также в выходной папке создается файл _SUCCESS для успешного выполнения задания.Для параметра конфигурации mapreduce.fileoutputcommitter.marksuccessfuljobs можно установить значение false, чтобы отключить создание файла _SUCCESS, или значение true, чтобы разрешить создание файла _SUCCESS.

Глядя на вашу ошибку,

Caused by: java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)

и сравнивая ее с ошибками, которые я видел в этой проблеме ранее,

Exception in thread "main" java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at java.io.DataInputStream.readFully(DataInputStream.java:152)
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1465)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1437)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419)
    at org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders(SequenceFileOutputFormat.java:89)
    at org.apache.nutch.crawl.CrawlDbReader.processStatJob(CrawlDbReader.java:323)
    at org.apache.nutch.crawl.CrawlDbReader.main(CrawlDbReader.java:511)

и на Список рассылки Mahout

Exception in thread "main" java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:180)
    at java.io.DataInputStream.readFully(DataInputStream.java:152)
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1457)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1435)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424)
    at
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419)
    at
org.apache.mahout.df.mapreduce.partial.Step0Job.parseOutput(Step0Job.java:145)
    at
org.apache.mahout.df.mapreduce.partial.Step0Job.run(Step0Job.java:119)
    at
org.apache.mahout.df.mapreduce.partial.PartialBuilder.parseOutput(PartialBuilder.java:115)
    at org.apache.mahout.df.mapreduce.Builder.build(Builder.java:338)
    at
org.apache.mahout.df.mapreduce.BuildForest.buildForest(BuildForest.java:195)

кажется, что DataInputStream.readFully задушен этим файлом.

Я бы предложил установить для mapreduce.fileoutputcommitter.marksuccessfuljobs значение false и повторить попытку- это должно работать.

...