ClosedChannelException при запуске штормового болта HDFS для хранения данных твита в виде блоков .txt - PullRequest
0 голосов
/ 05 апреля 2020

Эта топология сохраняет 100 твитов в блоке .txt в файловой системе HDFS. Это создает блоки, но они пусты и не показывают желаемый вывод твита. Код генерирует одно и то же исключение примерно 10 раз, пока jvm не сдается. Используя storm-core-0.9.5, storm-hdfs-2.1.0.jar, имел oop 2.7.1, имел oop -common-2.7.4.jar, имел oop -hdfs-2.7.4 .jar

STACK TRACE:

32973 [Thread-17-hdfs] FATAL com.example.HDFSBolt  - IO Exception thrown while closing HDFS
java.nio.channels.ClosedChannelException
    at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1715)
    at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
    at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:316)
    at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
    at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
    at java.io.BufferedWriter.close(BufferedWriter.java:266)
    at com.example.HDFSBolt.writeToHDFS(StormTwitter.java:603)
    at com.example.HDFSBolt.execute(StormTwitter.java:563)
    at backtype.storm.daemon.executor$fn__6647$tuple_action_fn__6649.invoke(executor.clj:633)
    at backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:401)
    at backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
    at backtype.storm.daemon.executor$fn__6647$fn__6659$fn__6706.invoke(executor.clj:748)
    at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:748)

STORMTWITTER STORM CLASSTER ЛОКАЛЬНАЯ ТОПОЛОГИЯ КЛАССА:

public class StormTwitter
{
    private final Logger LOGGER = Logger.getLogger(this.getClass());


    public static void main(String[] args) throws Exception
    {
        BasicConfigurator.configure();

        if (args != null && args.length > 0)
        {
            StormSubmitter.submitTopology(
                    args[0],
                    createConfig(false),
                    createTopology());
        }
        else
        {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(
                    "StormTwitter",
                    createConfig(true),
                    createTopology());
            Thread.sleep(60000);
            cluster.shutdown();
        }
    }

    private static StormTopology createTopology()
    {
        SpoutConfig kafkaConf = new SpoutConfig(
                new ZkHosts("localhost:2181"),
                "Twitter-API",
                "/Twitter-API",
                "id");
        kafkaConf.bufferSizeBytes = 1024*1024*4;
        kafkaConf.fetchSizeBytes = 1024*1024*4;
        kafkaConf.forceFromStart = true;
        kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());

        TopologyBuilder topology = new TopologyBuilder();
        topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);
        topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4).shuffleGrouping("kafka_spout");
        topology.setBolt("text_filter", new TextFilterBolt(), 4).shuffleGrouping("twitter_filter");
        topology.setBolt("stemming", new StemmingBolt(), 4).shuffleGrouping("text_filter");
        topology.setBolt("positive", new PositiveSentimentBolt(), 4).shuffleGrouping("stemming");
        topology.setBolt("negative", new NegativeSentimentBolt(), 4).shuffleGrouping("stemming");
        topology.setBolt("join", new JoinSentimentsBolt(), 4).fieldsGrouping("positive", new Fields("tweet_id")).fieldsGrouping("negative", new Fields("tweet_id"));
        topology.setBolt("score", new SentimentScoringBolt(), 4).shuffleGrouping("join");
        topology.setBolt("hdfs", new HDFSBolt(), 4).shuffleGrouping("score");

        return topology.createTopology();
    }

    private static Config createConfig(boolean local)
    {
        int workers = 4;
        Config conf = new Config();
        conf.setDebug(true);
        if (local)
            conf.setMaxTaskParallelism(workers);
        else
            conf.setNumWorkers(workers);
        return conf;
    }
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}

КЛАСС HDFSBOLT:

class HDFSBolt extends BaseRichBolt{
    private static final long serialVersionUID = 42L;
    private static final Logger LOGGER = Logger.getLogger(HDFSBolt.class);
    private OutputCollector collector;
    private int id;
    private List<String> tweet_scores;

    @SuppressWarnings("rawtypes")
    public void prepare(
            Map stormConf,
            TopologyContext context,
            OutputCollector collector)
    {
        this.id = context.getThisTaskId();
        this.collector = collector;
        this.tweet_scores = new ArrayList<String>(100);
    }

    public void execute(Tuple input)
    {
        LOGGER.debug("Writing to HDFS after scoring");
        Long id = input.getLong(input.fieldIndex("tweet_id"));
        String tweet = input.getString(input.fieldIndex("tweet_text"));
        Float pos = input.getFloat(input.fieldIndex("pos_score"));
        Float neg = input.getFloat(input.fieldIndex("neg_score"));
        String score = input.getString(input.fieldIndex("score"));
        String tweet_score =
                String.format("%s,%s,%f,%f,%s\n", id, tweet, pos, neg, score);
        this.tweet_scores.add(tweet_score);
        if (this.tweet_scores.size() >= 100)
        {
            writeToHDFS();
            this.tweet_scores = new ArrayList<String>(100);
        }
    }

    private void writeToHDFS()
    {
        FileSystem hdfs = null;
        Path file = null;
        OutputStream os = null;
        BufferedWriter wd = null;

        try
        {
            Configuration conf = new Configuration();
            conf.addResource(new Path("C:/Spark/spark-3.0.0-preview2-bin-hadoop2.7/etc/hadoop/core-site.xml"));
            conf.addResource(new Path("C:/Spark/spark-3.0.0-preview2-bin-hadoop2.7/etc/hadoop/hdfs-site.xml"));
            hdfs = FileSystem.get(conf);
            file = new Path(
                    "hdfs://0.0.0.0:19000/twitter_realtime_data/tweets/score/"+this.id+".txt");
            if (hdfs.exists(file))
                os = hdfs.append(file);
            else
                os = hdfs.create(file);
            wd = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
            for (String tweet_score : tweet_scores)
            {
                wd.write(tweet_score);
            }
        }
        catch (IOException ex)
        {
            LOGGER.error("Failed to write tweet score to HDFS", ex);
            LOGGER.trace(null, ex);
        }
        finally
        {
            try
            {
                if (os != null) os.close();
                if (wd != null) wd.close();
                /*if (hdfs != null) hdfs.close();*/
            }
            catch (IOException ex)
            {
                LOGGER.fatal("IO Exception thrown while closing HDFS", ex);
                LOGGER.trace(null, ex);
            }
        }
    }

ЗАВИСИМОСТЬ :

commons-cli-1.4.jar

commons-codec-1.11.jar

commons-codec-1.4.jar

commons-codec-1.6.jar

commons-collections-3.2.2.jar

commons-compress-1.18.jar

commons-configuration-1.10.jar

commons-exec-1.1.jar

commons-exec-1.3.jar

commons-fileupload-1.2.1.jar

commons-fileupload-1.3.3.jar

commons-io-2.4.jar

commons-lang-2.5.jar

commons-lang-2.6.jar

commons-lang3-3.8.1.jar

commons-logging-1.1.1.jar

commons-logging-1.1.3.jar

commons-logging-1.2.jar

htrace-core-3.1.0-incubating.jar

storm-core-0.9.5.jar

accessors-smart-1.2.jar

animal-sniffer-annotations-1.17.jar

asm-4.0.jar

asm-5.0.3.jar

audience-annotations-0.5.0.jar

carbonite-1.3.2.jar

checker-qual-2.5.2.jar

chill-java-0.3.5.jar

chill-java-0.8.0.jar

clj-stacktrace-0.2.2.jar

clj-time-0.4.1.jar

clojure-1.5.0.jar

clojure-1.5.1.jar

clojure-1.6.0.jar

clout-1.0.1.jar

compojure-1.1.3.jar

core.incubator-0.1.0.jar

core.specs.alpha-0.2.44.jar

curator-client-1.0.1.jar

curator-client-2.3.0.jar

curator-framework-1.0.1.jar

curator-framework-2.2.0-incubating.jar

dependencies2.txt

disruptor-2.10.1.jar

error_prone_annotations-2.2.0.jar

failureaccess-1.0.1.jar

guava-13.0.jar

guava-27.0.1-jre.jar

hadoop-auth-2.7.4.jar

hadoop-auth-2.8.5.jar

hadoop-client-2.7.4.jar

hiccup-0.3.6.jar

httpclient-4.1.1.jar

httpclient-4.5.6.jar

httpcore-4.1.jar

httpcore-4.4.10.jar

j2objc-annotations-1.1.jar

jackson-annotations-2.5.4.jar

jackson-core-2.10.0.jar

jackson-databind-2.10.3.jar

jackson-databind-2.4.4.jar

jackson-dataformat-smile-2.9.8.jar

jackson-xml-databind-0.6.2.jar

javax.annotation-api-1.3.2.jar

javax.inject-1.jar

javax.servlet-api-3.1.0.jar

jaxb-api-2.3.0.jar

jcip-annotations-1.0-1.jar

jcl-over-slf4j-1.7.25.jar

jetty-6.1.26.jar

jetty-continuation-9.4.14.v20181114.jar

jetty-http-9.4.14.v20181114.jar

jetty-io-9.4.14.v20181114.jar

jetty-security-9.4.14.v20181114.jar

jetty-server-9.4.14.v20181114.jar

jetty-servlet-9.4.14.v20181114.jar

jetty-servlets-9.4.14.v20181114.jar

jetty-util-6.1.26.jar

jetty-util-9.4.14.v20181114.jar

jgrapht-core-0.9.0.jar

jline-0.9.94.jar

jline-2.11.jar

joda-time-2.0.jar

joda-time-2.3.jar

json-simple-1.1.1.jar

json-smart-2.3.jar

jsr305-3.0.2.jar

junit-3.8.1.jar

kafka-clients-0.8.2.1.jar

kafka_2.10-0.8.2.1.jar

kryo-2.21.jar

libthrift7-0.7.0.jar

listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar

log4j-1.2.17.jar

log4j-api-2.8.2.jar

log4j-core-2.8.2.jar

log4j-over-slf4j-1.6.6.jar

log4j-slf4j-impl-2.8.2.jar

lz4-1.3.0.jar

math.numeric-tower-0.0.1.jar

maven-artifact-3.6.0.jar

maven-builder-support-3.6.0.jar

maven-model-3.6.0.jar

maven-model-builder-3.6.0.jar

maven-repository-metadata-3.6.0.jar

maven-resolver-api-1.3.3.jar

maven-resolver-connector-basic-1.3.3.jar

maven-resolver-impl-1.3.3.jar

maven-resolver-provider-3.6.0.jar

maven-resolver-spi-1.3.3.jar

maven-resolver-transport-file-1.3.3.jar

maven-resolver-transport-http-1.3.3.jar

maven-resolver-util-1.3.3.jar

meat-locker-0.3.1.jar

metrics-core-2.2.0.jar

metrics-graphite-3.2.6.jar

minlog-1.2.jar

minlog-1.3.0.jar

mockito-all-1.9.5.jar

netty-3.10.6.Final.jar

netty-3.6.3.Final.jar

nimbus-jose-jwt-4.41.1.jar

objenesis-1.2.jar

objenesis-2.1.jar

plexus-component-annotations-1.7.1.jar

plexus-interpolation-1.25.jar

plexus-utils-3.1.0.jar

reflectasm-1.07-shaded.jar

reflectasm-1.10.1.jar

ring-core-1.1.5.jar

ring-devel-0.3.11.jar

ring-jetty-adapter-0.3.11.jar

ring-servlet-0.3.11.jar

rocksdbjni-5.18.3.jar

scala-library-2.10.4.jar

servlet-api-2.5-20081211.jar

servlet-api-2.5.jar

slf4j-api-1.6.5.jar

slf4j-api-1.7.26.jar

slf4j-api-1.7.5.jar

snakeyaml-1.11.jar

snappy-java-1.1.2.6.jar

spec.alpha-0.2.176.jar

storm-clojure-2.1.0.jar

storm-hdfs-2.1.0.jar

storm-kafka-0.9.3.jar

storm-kafka-monitor-2.1.0.jar

storm-server-2.1.0.jar

storm-shaded-deps-2.1.0.jar

storm-submit-tools-2.1.0.jar

tools.cli-0.2.2.jar

tools.cli-0.2.4.jar

tools.logging-0.2.3.jar

tools.macro-0.1.0.jar

tools.nrepl-0.2.3.jar

zookeeper-3.5.4-beta.jar

activation-1.1.1.jar

aircompressor-0.10.jar

algebra_2.12-2.0.0-M2.jar

antlr-runtime-3.5.2.jar

antlr4-runtime-4.7.1.jar

aopalliance-1.0.jar

aopalliance-repackaged-2.6.1.jar

apacheds-i18n-2.0.0-M15.jar

apacheds-kerberos-codec-2.0.0-M15.jar

api-asn1-api-1.0.0-M20.jar

api-util-1.0.0-M20.jar

arpack_combined_all-0.1.jar

arrow-format-0.15.1.jar

arrow-memory-0.15.1.jar

arrow-vector-0.15.1.jar

audience-annotations-0.5.0.jar

automaton-1.11-8.jar

avro-1.8.2.jar

avro-ipc-1.8.2.jar

avro-mapred-1.8.2-hadoop2.jar

bonecp-0.8.0.RELEASE.jar

breeze-macros_2.12-1.0.jar

breeze_2.12-1.0.jar

cats-kernel_2.12-2.0.0-M4.jar

chill-java-0.9.3.jar

chill_2.12-0.9.3.jar

commons-beanutils-1.9.4.jar

commons-cli-1.2.jar

commons-codec-1.10.jar

commons-collections-3.2.2.jar

commons-compiler-3.0.15.jar

commons-compress-1.8.1.jar

commons-configuration-1.6.jar

commons-crypto-1.0.0.jar

commons-dbcp-1.4.jar

commons-digester-1.8.jar

commons-httpclient-3.1.jar

commons-io-2.4.jar

commons-lang-2.6.jar

commons-lang3-3.9.jar

commons-logging-1.1.3.jar

commons-math3-3.4.1.jar

commons-net-3.1.jar

commons-pool-1.5.4.jar

commons-text-1.6.jar

compress-lzf-1.0.3.jar

core-1.1.2.jar

curator-client-2.7.1.jar

curator-framework-2.7.1.jar

curator-recipes-2.7.1.jar

datanucleus-api-jdo-4.2.4.jar

datanucleus-core-4.1.17.jar

datanucleus-rdbms-4.1.19.jar

derby-10.12.1.1.jar

dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar

flatbuffers-java-1.9.0.jar

generex-1.0.2.jar

gson-2.2.4.jar

guava-14.0.1.jar

guice-3.0.jar

guice-servlet-3.0.jar

hadoop-annotations-2.7.4.jar

hadoop-auth-2.7.4.jar

hadoop-client-2.7.4.jar

hadoop-common-2.7.4.jar

hadoop-hdfs-2.7.4.jar

hadoop-mapreduce-client-app-2.7.4.jar

hadoop-mapreduce-client-common-2.7.4.jar

hadoop-mapreduce-client-core-2.7.4.jar

hadoop-mapreduce-client-jobclient-2.7.4.jar

hadoop-mapreduce-client-shuffle-2.7.4.jar

hadoop-yarn-api-2.7.4.jar

hadoop-yarn-client-2.7.4.jar

hadoop-yarn-common-2.7.4.jar

hadoop-yarn-server-common-2.7.4.jar

hadoop-yarn-server-web-proxy-2.7.4.jar

HikariCP-2.5.1.jar

hive-beeline-2.3.6.jar

hive-cli-2.3.6.jar

hive-common-2.3.6.jar

hive-exec-2.3.6-core.jar

hive-jdbc-2.3.6.jar

hive-llap-common-2.3.6.jar

hive-metastore-2.3.6.jar

hive-serde-2.3.6.jar

hive-service-rpc-2.3.6.jar

hive-shims-0.23-2.3.6.jar

hive-shims-2.3.6.jar

hive-shims-common-2.3.6.jar

hive-shims-scheduler-2.3.6.jar

hive-storage-api-2.6.0.jar

hive-vector-code-gen-2.3.6.jar

hk2-api-2.6.1.jar

hk2-locator-2.6.1.jar

hk2-utils-2.6.1.jar

htrace-core-3.1.0-incubating.jar

httpclient-4.5.6.jar

httpcore-4.4.12.jar

istack-commons-runtime-3.0.8.jar

ivy-2.4.0.jar

jackson-annotations-2.10.0.jar

jackson-core-2.10.0.jar

jackson-core-asl-1.9.13.jar

jackson-databind-2.10.0.jar

jackson-dataformat-yaml-2.10.0.jar

jackson-jaxrs-1.9.13.jar

jackson-mapper-asl-1.9.13.jar

jackson-module-jaxb-annotations-2.10.0.jar

jackson-module-paranamer-2.10.0.jar

jackson-module-scala_2.12-2.10.0.jar

jackson-xc-1.9.13.jar

jakarta.activation-api-1.2.1.jar

jakarta.annotation-api-1.3.5.jar

jakarta.inject-2.6.1.jar

jakarta.validation-api-2.0.2.jar

jakarta.ws.rs-api-2.1.6.jar

jakarta.xml.bind-api-2.3.2.jar

janino-3.0.15.jar

javassist-3.22.0-CR2.jar

javax.inject-1.jar

javax.jdo-3.2.0-m3.jar

javax.servlet-api-3.1.0.jar

javolution-5.5.1.jar

jaxb-api-2.2.2.jar

jaxb-runtime-2.3.2.jar

jcl-over-slf4j-1.7.16.jar

jdo-api-3.0.1.jar

jersey-client-2.29.1.jar

jersey-common-2.29.1.jar

jersey-container-servlet-2.29.1.jar

jersey-container-servlet-core-2.29.1.jar

jersey-hk2-2.29.1.jar

jersey-media-jaxb-2.29.1.jar

jersey-server-2.29.1.jar

jetty-6.1.26.jar

jetty-sslengine-6.1.26.jar

jetty-util-6.1.26.jar

JLargeArrays-1.5.jar

jline-2.14.6.jar

joda-time-2.10.5.jar

jodd-core-3.5.2.jar

jpam-1.1.jar

json-1.8.jar

json4s-ast_2.12-3.6.6.jar

json4s-core_2.12-3.6.6.jar

json4s-jackson_2.12-3.6.6.jar

json4s-scalap_2.12-3.6.6.jar

jsp-api-2.1.jar

jsr305-3.0.0.jar

jta-1.1.jar

JTransforms-3.1.jar

jul-to-slf4j-1.7.16.jar

kryo-shaded-4.0.2.jar

kubernetes-client-4.6.4.jar

kubernetes-model-4.6.4.jar

kubernetes-model-common-4.6.4.jar

leveldbjni-all-1.8.jar

libfb303-0.9.3.jar

libthrift-0.12.0.jar

log4j-1.2.17.jar

logging-interceptor-3.12.6.jar

lz4-java-1.7.0.jar

machinist_2.12-0.6.8.jar

macro-compat_2.12-1.1.1.jar

mesos-1.4.0-shaded-protobuf.jar

metrics-core-4.1.1.jar

metrics-graphite-4.1.1.jar

metrics-jmx-4.1.1.jar

metrics-json-4.1.1.jar

metrics-jvm-4.1.1.jar

minlog-1.3.0.jar

netty-all-4.1.42.Final.jar

objenesis-2.5.1.jar

okapi-shade-0.4.2.jar

okhttp-3.12.6.jar

okio-1.15.0.jar

opencsv-2.3.jar

orc-core-1.5.8.jar

orc-mapreduce-1.5.8.jar

orc-shims-1.5.8.jar

oro-2.0.8.jar

osgi-resource-locator-1.0.3.jar

paranamer-2.8.jar

parquet-column-1.10.1.jar

parquet-common-1.10.1.jar

parquet-encoding-1.10.1.jar

parquet-format-2.4.0.jar

parquet-hadoop-1.10.1.jar

parquet-jackson-1.10.1.jar

protobuf-java-2.5.0.jar

py4j-0.10.8.1.jar

pyrolite-4.30.jar

RoaringBitmap-0.7.45.jar

scala-collection-compat_2.12-2.1.1.jar

scala-compiler-2.12.10.jar

scala-library-2.12.10.jar

scala-parser-combinators_2.12-1.1.2.jar

scala-reflect-2.12.10.jar

scala-xml_2.12-1.2.0.jar

shapeless_2.12-2.3.3.jar

shims-0.7.45.jar

slf4j-api-1.7.16.jar

slf4j-log4j12-1.7.16.jar

snakeyaml-1.24.jar

snappy-java-1.1.7.3.jar

spark-catalyst_2.12-3.0.0-preview2.jar

spark-core_2.12-3.0.0-preview2.jar

spark-cypher_2.12-3.0.0-preview2.jar

spark-graph-api_2.12-3.0.0-preview2.jar

spark-graphx_2.12-3.0.0-preview2.jar

spark-graph_2.12-3.0.0-preview2.jar

spark-hive-thriftserver_2.12-3.0.0-preview2.jar

spark-hive_2.12-3.0.0-preview2.jar

spark-kubernetes_2.12-3.0.0-preview2.jar

spark-kvstore_2.12-3.0.0-preview2.jar

spark-launcher_2.12-3.0.0-preview2.jar

spark-mesos_2.12-3.0.0-preview2.jar

spark-mllib-local_2.12-3.0.0-preview2.jar

spark-mllib_2.12-3.0.0-preview2.jar

spark-network-common_2.12-3.0.0-preview2.jar

spark-network-shuffle_2.12-3.0.0-preview2.jar

spark-repl_2.12-3.0.0-preview2.jar

spark-sketch_2.12-3.0.0-preview2.jar

spark-sql_2.12-3.0.0-preview2.jar

spark-streaming_2.12-3.0.0-preview2.jar

spark-tags_2.12-3.0.0-preview2-tests.jar

spark-tags_2.12-3.0.0-preview2.jar

spark-unsafe_2.12-3.0.0-preview2.jar

spark-yarn_2.12-3.0.0-preview2.jar

spire-macros_2.12-0.17.0-M1.jar

spire-platform_2.12-0.17.0-M1.jar

spire-util_2.12-0.17.0-M1.jar

spire_2.12-0.17.0-M1.jar

ST4-4.0.4.jar

stax-api-1.0-2.jar

stax-api-1.0.1.jar

stream-2.9.6.jar

super-csv-2.2.0.jar

transaction-api-1.1.jar

univocity-parsers-2.8.3.jar

velocity-1.5.jar

xbean-asm7-shaded-4.15.jar

xercesImpl-2.9.1.jar

xmlenc-0.52.jar

xz-1.5.jar

zjsonpatch-0.3.0.jar

zookeeper-3.4.14.jar

zstd-jni-1.4.4-3.jar

...