Ошибка при попытке загрузить данные из Ignite в фрейм данных Spark - PullRequest
1 голос
/ 21 октября 2019

У меня двухузловой искровой кластер. На обоих основных / подчиненных узлах я работаю ./bin/ignite.sh, и оба узла могут обнаруживать друг друга.

Версия Spark: 2.4.3

Версия Ignite: 2.7.6

Теперь мой файл spark-defaults.conf выглядит следующим образом

spark.jars.packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1,com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3,org.apache.ignite:ignite-spark:2.7.6
spark.driver.memory 8G
spark.executor.memory 4G
spark.driver.maxResultsSize 4G

Я запускаю pyspark следующим образом:

pyspark --jars $IGNITE_HOME/libs/*.jar,$IGNITE_HOME/libs/optional/ignite-spark/*jar,$IGNITE_HOME/libs/ignite-spring/*jar,$IGNITE_HOME/libs/ignite-indexing/*jar

и

>>> df = spark.read.format('mongo').option('uri' , 'mongodb://ec2-X-XXX-XXX-XX.compute-1.amazonaws.com/cit.table_a').load()
19/10/21 20:07:30 WARN MongoInferSchema: Field 'authors' contains conflicting types converting to StringType
19/10/21 20:07:30 WARN MongoInferSchema: Field 'venue' contains conflicting types converting to StringType
>>> df.count()
2616
>>> import os
>>> configFile = os.environ['IGNITE_HOME'] + "/config/default-config.xml"
>>> df.write.format("ignite").option("table", "test1").option("primaryKeyFields", "_id").option("config", configFile).save()
19/10/21 20:08:22 WARN G: Ignite work directory is not provided, automatically resolved to: /home/ubuntu/apache-ignite/work
19/10/21 20:08:23 WARN IgniteKernal: Please set system property '-Djava.net.preferIPv4Stack=true' to avoid possible problems in mixed environments.
19/10/21 20:08:23 WARN GridDiagnostic: Initial heap size is 126MB (should be no less than 512MB, use -Xms512m -Xmx512m).
19/10/21 20:08:23 WARN TcpCommunicationSpi: Message queue limit is set to 0 which may lead to potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes due to message queues growth on sender and receiver sides.
19/10/21 20:08:23 WARN NoopCheckpointSpi: Checkpoints are disabled (to enable configure any GridCheckpointSpi implementation)
19/10/21 20:08:23 WARN GridCollisionManager: Collision resolution is disabled (all jobs will be activated upon arrival).
19/10/21 20:08:24 WARN PartitionsEvictManager: Logging at INFO level without checking if INFO level is enabled: Evict partition permits=2
19/10/21 20:08:24 WARN TcpDiscoveryMulticastIpFinder: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is recommended in production to specify at least one address in TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
19/10/21 20:08:26 WARN IgniteKernal: Nodes started on local machine require more than 80% of physical RAM what can lead to significant slowdown due to swapping (please decrease JVM heap size, data region size or checkpoint buffer size) [required=10749MB, available=7975MB]
>>> igdf = spark.read.format("ignite").option("config", configFile).option("table", "test1").load()
>>> igdf.count()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.6/dist-packages/pyspark/sql/dataframe.py", line 523, in count
    return int(self._jdf.count())
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o56.count.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.AttributeReference.withQualifier(Lscala/Option;)Lorg/apache/spark/sql/catalyst/expressions/AttributeReference;
    at org.apache.spark.sql.ignite.IgniteOptimization$$anonfun$pushDownOperators$1$$anonfun$applyOrElse$2.apply(IgniteOptimization.scala:86)
    at org.apache.spark.sql.ignite.IgniteOptimization$$anonfun$pushDownOperators$1$$anonfun$applyOrElse$2.apply(IgniteOptimization.scala:86)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:296)
    at org.apache.spark.sql.ignite.IgniteOptimization$$anonfun$pushDownOperators$1.applyOrElse(IgniteOptimization.scala:86)
    at org.apache.spark.sql.ignite.IgniteOptimization$$anonfun$pushDownOperators$1.applyOrElse(IgniteOptimization.scala:63)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:281)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:281)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:280)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:278)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
    at org.apache.spark.sql.ignite.IgniteOptimization$.pushDownOperators(IgniteOptimization.scala:63)
    at org.apache.spark.sql.ignite.IgniteOptimization$.apply(IgniteOptimization.scala:40)
    at org.apache.spark.sql.ignite.IgniteOptimization$.apply(IgniteOptimization.scala:33)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Может кто-нибудь помочь, что я здесь не так делаю? Я попробовал пару вещей, но не повезло. Любая помощь будет оценена.

Ответы [ 2 ]

1 голос
/ 28 октября 2019

Да, Spark 2.4 пока не поддерживается.

Вот билет на Apache Ignite JIRA, вы можете отследить статус здесь https://issues.apache.org/jira/browse/IGNITE-12054

1 голос
/ 21 октября 2019

Из-за множества внутренних изменений Spark 2.4.x в настоящее время не поддерживается. Это должно работать с 2.3.x, хотя.

Поддержка более поздних версий находится в разработке, мы надеемся, готовы к Ignite 2.8.

...