Я новичок в искре.У меня есть следующая таблица в Кассандре:
CREATE TABLE cust_actions (
orgid text,
empid int,
custid int,
date timestamp,
action text
PRIMARY KEY (orgid, empid, custid, date)
) WITH CLUSTERING ORDER BY (empid ASC, custid ASC, date DESC)
Эта таблица содержит данные для каждого из действий, которые сотрудник выполняет с клиентом.Таблица получает более 10 миллионов вставок в день.У меня есть кластер cassandra с 3 узлами, работающий на 18 основных компьютерах по 32 гБ.
Я хочу агрегировать данные за день, т. Е. В определенный день, сколько действий было предпринято для клиента.Для этого я создал еще одну таблицу:
CREATE TABLE daily_cust_actions (
custid int,
date date,
action text,
count int,
PRIMARY KEY (custid, date, action)
) WITH CLUSTERING ORDER BY (date ASC, action ASC)
. Для этого я хочу использовать искру (пожалуйста, предложите, если это не так, или есть и другие альтернативы).Я запускаю искру на одной из этих кассандровых машин (упомянутых выше), у мастера и раба по 9 исполнителей, у каждого из этих исполнителей по 1 г оперативной памяти и 2 ядра в каждом.
Размер стола около 70 г.Я не могу агрегировать эти данные.Это работает хорошо для меньшего набора данных, хотя.Вот мой скрипт искры:
object DailyAggregation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "host1,host2,host3")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.input.split.size_in_mb", "10") //have tried multiple options here
val sc = new SparkContext("spark://host", "spark-cassandra", conf)
val rdd = sc.cassandraTable("mykeyspace","cust_actions")
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val df = new SimpleDateFormat("yyyy-MM-dd")
val startDate = df.parse("2018-08-13")
val endDate = df.parse("2018-09-14")
sc.parallelize(
rdd.select("custid", "date", "action")
.where("date >= ? and date < ?", startDate, endDate)
.keyBy(row => (
row.getInt("custid"),
df.format(row.getLong("date")),
row.getString("action"))).map { case (key, value) => (key, 1) }
.reduceByKey(_ + _).collect()
.map { case (key, value) => (key._1, key._2, key._3, value) })
.saveToCassandra("mykeyspace", "daily_cust_actions")
sc.stop()
}
}
Я пробовал разные вещи, увеличивая / уменьшая память / исполнителей, увеличивая / уменьшая значение spark.cassandra.input.split.size_in_mb
и настраивая некоторые переменные среды искры.Но каждый раз я получаю разные ошибки.Он показывает 2 этапа, первый этап всегда проходит гладко, но на втором этапе он всегда завершается неудачей.
Я видел много разных ошибок.В настоящее время я получаю следующую ошибку:
2018-09-15 16:36:05 INFO TaskSetManager:54 - Task 158.1 in stage 1.1 (TID 1293) failed, but the task will not be re-executed (either because t
he task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already
succeeded).
2018-09-15 16:36:05 WARN TaskSetManager:66 - Lost task 131.1 in stage 1.1 (TID 1286, 127.0.0.1, executor 18): FetchFailed(null, shuffleId=0, m
apId=-1, reduceId=131, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
Любая помощь здесь будет принята с благодарностью.