У меня странная проблема.Когда я запускаю свое задание Hadoop над большим набором данных (> 1 ТБ сжатых текстовых файлов), некоторые из задач сокращения завершаются неудачно с такими трассировками стека:
java.io.IOException: Task: attempt_201104061411_0002_r_000044_0 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:139)
at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
at org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:335)
at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350)
at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2698)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:375)
at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:241)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125)
... 8 more
java.io.IOException: Task: attempt_201104061411_0002_r_000056_0 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:123)
at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:50)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:447)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:107)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:93)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2689)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:180)
at org.apache.hadoop.io.Text.readString(Text.java:402)
at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:240)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122)
... 9 more
Не все мои редукторы дают сбой.Некоторые часто преуспевают прежде, чем я вижу неудачи с другими.Как вы можете видеть, трассировки стека, кажется, всегда происходят из IPAndIPCookieCount.readFields()
и всегда в стадии слияния в памяти, но не всегда из одной и той же части readFields
.
Это задание успешно выполняется при работе на меньшемнаборы данных (около 1/30 размера).Выходных данных для задания почти столько же, сколько и каждой выходной записи.По сути, эта работа является реализацией вторичной сортировки.
Мы используем дистрибутив Hadoop CDH3.
Вот моя пользовательская реализация WritableComparable
:
public static class IpAndIpCookieCount implements WritableComparable<IpAndIpCookieCount> {
private String ip;
private int ipCookieCount;
public IpAndIpCookieCount() {
// empty constructor for hadoop
}
public IpAndIpCookieCount(String ip, int ipCookieCount) {
this.ip = ip;
this.ipCookieCount = ipCookieCount;
}
public String getIp() {
return ip;
}
public int getIpCookieCount() {
return ipCookieCount;
}
@Override
public void readFields(DataInput in) throws IOException {
ip = Text.readString(in);
ipCookieCount = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, ip);
out.writeInt(ipCookieCount);
}
@Override
public int compareTo(IpAndIpCookieCount other) {
int firstComparison = ip.compareTo(other.getIp());
if (firstComparison == 0) {
int otherIpCookieCount = other.getIpCookieCount();
if (ipCookieCount == otherIpCookieCount) {
return 0;
} else {
return ipCookieCount < otherIpCookieCount ? 1 : -1;
}
} else {
return firstComparison;
}
}
@Override
public boolean equals(Object o) {
if (o instanceof IpAndIpCookieCount) {
IpAndIpCookieCount other = (IpAndIpCookieCount) o;
return ip.equals(other.getIp()) && ipCookieCount == other.getIpCookieCount();
} else {
return false;
}
}
@Override
public int hashCode() {
return ip.hashCode() ^ ipCookieCount;
}
}
readFields
метод очень прост, и я не вижу никаких проблем в этом классе.Кроме того, я видел других людей, получающих, по сути, ту же трассировку стека:
Никто, похоже, на самом деле не понял, что стоит за этим.Последние два, кажется, предполагают, что это может быть проблема с памятью (хотя эти трассировки стека не OutOfMemoryException
с).Как и в последнем посте в этом списке ссылок, я попытался установить большее количество редукторов (до 999), но все равно получаю сбои.Я (пока) не пытался выделить больше памяти для сокращения задач, поскольку это потребовало бы от нас перенастройки нашего кластера.
Это ошибка в Hadoop?Или я что-то не так делаю?
РЕДАКТИРОВАТЬ : Мои данные разбиты по дням.Если я выполню работу 7 раз, по одному разу на каждый день, все 7 выполнятся.Если я выполняю одну работу в течение всех 7 дней, она терпит неудачу.Большой отчет за все 7 дней будет содержать те же ключи, что и меньшие (в совокупности), но, очевидно, не в том же порядке, с теми же редукторами и т. Д.