Я новичок с pyspark
и хочу работать с SQL
данными. Вот что я сделал до сих пор.
Я могу импортировать JSON, TXT, CSV
форматы файлов в spark без проблем.
По какой-то причине я не могу подключиться к mysql (размещен локально: 127.0.0.1
)
Я использовал следующее соединение:
from os.path import expanduser, join, abspath
from pyspark.sql import SparkSession, Row,DataFrameReader, SQLContext
from pyspark import SparkContext, SparkConf
sparkClassPath = os.getenv('SPARK_CLASSPATH', './mysql-connector-java-8.0.11/mysql-connector-java-8.0.11.jar')
sqlContext = SQLContext(SparkContext.getOrCreate())
sqlContext.read.format("jdbc").options(url="jdbc:mysql://127.0.0.1/",driver = "com.mysql.jdbc.Driver",
dbtable = "product",user="root",password='').load()
Генерирует следующую ошибку:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-141-bfc4c079a8b3> in <module>()
3 from pyspark import SparkContext, SparkConf
4 sparkClassPath = os.getenv('SPARK_CLASSPATH', './mysql-connector-java-8.0.11/mysql-connector-java-8.0.11.jar')
----> 5 sqlContext = SQLContext(SparkContext.getOrCreate())
6 sqlContext.read.format("jdbc").options(url="jdbc:mysql://127.0.0.1/",driver = "com.mysql.jdbc.Driver",
7 dbtable = "product",user="root",password='').load()
~/Documents/spark/spark-2.2.1-bin-hadoop2.7/python/pyspark/context.py in getOrCreate(cls, conf)
332 with SparkContext._lock:
333 if SparkContext._active_spark_context is None:
--> 334 SparkContext(conf=conf or SparkConf())
335 return SparkContext._active_spark_context
336
~/Documents/spark/spark-2.2.1-bin-hadoop2.7/python/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
116 try:
117 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
--> 118 conf, jsc, profiler_cls)
119 except:
120 # If an error occurs, clean up in order to allow future SparkContext creation:
~/Documents/spark/spark-2.2.1-bin-hadoop2.7/python/pyspark/context.py in _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
178
179 # Create the Java SparkContext through Py4J
--> 180 self._jsc = jsc or self._initialize_context(self._conf._jconf)
181 # Reset the SparkConf to the one actually used by the SparkContext in JVM.
182 self._conf = SparkConf(_jconf=self._jsc.sc().conf())
~/Documents/spark/spark-2.2.1-bin-hadoop2.7/python/pyspark/context.py in _initialize_context(self, jconf)
271 Initialize SparkContext in function to allow subclass specific initialization
272 """
--> 273 return self._jvm.JavaSparkContext(jconf)
274
275 @classmethod
~/Documents/spark/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1399 answer = self._gateway_client.send_command(command)
1400 return_value = get_return_value(
-> 1401 answer, self._gateway_client, None, self._fqn)
1402
1403 for temp_arg in temp_args:
~/Documents/spark/spark-2.2.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~/Documents/spark/spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:127)
at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:501)
at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1218)
at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:496)
at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:481)
at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:965)
at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:210)
at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:353)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Обратите внимание, что mysql запущен и работает. Снимок экрана ниже
Так же как и pyspark