Операция категории READ не поддерживается в состоянии ожидания - PullRequest
0 голосов
/ 16 мая 2018

Хорошо, я пытаюсь предварительно обработать фрейм данных.Интересно, что если я попробую это в своем ноутбуке jupyter, то смогу выполнить это и смогу записать CSV-файл в HDFS.

Однако, если я попытаюсь запустить это внутри моего конвейера в spark-submit Я получаю RemoteException (см. Ниже).

Это команда, которая выполняется.Для удобства чтения вставляются разрывы строк.

$ spark-submit --master yarn 
               --name PreprocessRawData 
               --py-files /home/username/workspaces/m-search/python/m.zip 
               --driver-memory 60G 
               --executor-memory 20G 
               --driver-cores 14 
               --executor-cores 14 
               --num-executors 10 
               /home/username/.local/lib/python3.5/site-packages/luigi/contrib/pyspark_runner.py 
               /tmp/PreprocessRawData4b7ollx5/PreprocessRawData.pickle

Задача отправляется, и рабочие запускаются, но через некоторое время они терпят неудачу.Все они пишут одинаковые ошибки (RemoteException).Я не уверен, почему это так и что я могу с этим поделать.

Давайте поговорим о некотором коде.

Это задание luigi:

class PreprocessRawData(PySparkTask):

    """ Input parquet file """
    input_file = luigi.Parameter()
    output_dir = luigi.Parameter()

    def requires(self):
        return TransformToParquet()

    def output(self):
        return [
            HdfsTarget(os.path.join(self.output_dir, 'volltext.csv'))
        ]

    def main(self, sc, *args):

        try:

            sql = SQLContext(sc)
            spark = sql.sparkSession

            df_data = spark.read.csv(self.input_file, schema=DATASET_SCHEMA_DEFAULT, maxCharsPerColumn=200000000)

            df_cleaned = preprocess_text(df_data, id_column='dokumentId', text_column='volltext')

            outfile = os.path.join(self.output_dir, 'volltext.csv')

            df_cleaned.write.csv(outfile)

        except Exception as e:
            print(e)
            if os.path.isdir(outfile):
                print('Deleting ' + str(outfile))
                shutil.rmtree(outfile)
            raise e

Вот где работа сделана:

def preprocess_text(df_data, id_column, text_column):

    # Get only documents with non-null (raw) volltext
    df_volltext_raw = df_data.where(getattr(df_data, text_column).isNotNull())

    df_volltext = df_volltext_raw.select(
        [id_column] + [udf_remove_special_tokens()(F.col(text_column)).alias(text_column)])

    pos_whitelist = ['NOUN', 'ADJ', 'VERB']
    df_filtered = df_volltext.select([id_column] + [udf_token_filter(pos_whitelist=pos_whitelist,
                                                                     lemmatize=True)(F.col(text_column)).alias(text_column)])

    df_cleaned = df_filtered.select(
        [id_column] + [udf_clean_documents(lowercase=True, min_length=3)(F.col(text_column)).alias(text_column)])

    return df_cleaned

Я определенно могу опубликовать код для функций udf_, но так как они работают без проблем в моем ноутбуке Jupyter, я совершенно уверен, что это не проблема в моемкод.Я предпочитаю, что это проблема с конфигурацией или чем-то, но сначала мне нужно понять, в чем проблема.


18/05/16 15:55:53 WARN Client: Exception encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
    at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:375)
    at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:595)
    at org.apache.hadoop.ipc.Client$Connection.access$2000(Client.java:397)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:762)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:758)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:757)
    at org.apache.hadoop.ipc.Client$Connection.access$3200(Client.java:397)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1618)
    at org.apache.hadoop.ipc.Client.call(Client.java:1449)
    at org.apache.hadoop.ipc.Client.call(Client.java:1396)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
    at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:270)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176)
    at com.sun.proxy.$Proxy19.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1236)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1223)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1211)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1536)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:330)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:326)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:782)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:85)
    at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:44)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:114)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:111)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:279)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(fileSourceInterfaces.scala:263)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1132)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
...