простой структурированный потоковый код генерирует исключение потока - PullRequest
0 голосов
/ 16 февраля 2020

У меня есть очень простой фрагмент кода, пытающийся вызвать данные схемы:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types
import org.apache.spark.sql.functions._
object wordCountSimple {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession
        .builder
        .master("local")
        .appName("wordCountSimple")
        .getOrCreate()

        spark.sparkContext.setLogLevel("WARN")
        import spark.sqlContext.implicits._
        val df = Seq(("abc", "2019-07-01 12:01:19.000"),
        ("xyz", "2019-06-24 12:01:19.000"),
        ("abc", "2019-11-16 16:44:55.406"),
        ("abc", "2019-11-16 16:50:59.406")).toDF("value", "date")
        val res = df.select(
            $"value",
            $"date",
            unix_timestamp($"date", "yyyy/MM/dd HH:mm:ss").as("timestamp")
        )
        res.printSchema
        res.show(false)
        spark.close()
    }
}

maven compile OK и запустите его:

root
|-- value: string (nullable = true)
|-- date: string (nullable = true)
|-- timestamp: long (nullable = true)

+-----+-----------------------+---------+
|value|date                   |timestamp|
+-----+-----------------------+---------+
|abc  |2019-07-01 12:01:19.000|null     |
|xyz  |2019-06-24 12:01:19.000|null     |
|abc  |2019-11-16 16:44:55.406|null     |
|abc  |2019-11-16 16:50:59.406|null     |
+-----+-----------------------+---------+

20/02/16 16:08:14 WARN FileSystem: exception in the cleaner thread but it will continue to run
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
        at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:2989)
        at java.lang.Thread.run(Thread.java:748)
[WARNING] thread Thread[org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner,5,wordCountSimple] was interrupted but is still alive after waiting at least 12896msecs
[WARNING] thread Thread[org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner,5,wordCountSimple] will linger despite being asked to die via interruption
[WARNING] NOTE: 1 thread(s) did not finish despite being asked to  via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
[WARNING] Couldn't destroy threadgroup org.codehaus.mojo.exec.ExecJavaMojo$IsolatedThreadGroup[name=wordCountSimple,maxpri=10]
java.lang.IllegalThreadStateException
    at java.lang.ThreadGroup.destroy (ThreadGroup.java:778)
    at org.codehaus.mojo.exec.ExecJavaMojo.execute (ExecJavaMojo.java:321)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
    at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
    at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
    at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
    at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)

Как происходит это исключение и как почини это? Большое спасибо.

...