Эта топология сохраняет 100 твитов в блоке .txt в файловой системе HDFS. Это создает блоки, но они пусты и не показывают желаемый вывод твита. Код генерирует одно и то же исключение примерно 10 раз, пока jvm не сдается. Используя storm-core-0.9.5, storm-hdfs-2.1.0.jar, имел oop 2.7.1, имел oop -common-2.7.4.jar, имел oop -hdfs-2.7.4 .jar
STACK TRACE:
32973 [Thread-17-hdfs] FATAL com.example.HDFSBolt - IO Exception thrown while closing HDFS
java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1715)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:316)
at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
at java.io.BufferedWriter.close(BufferedWriter.java:266)
at com.example.HDFSBolt.writeToHDFS(StormTwitter.java:603)
at com.example.HDFSBolt.execute(StormTwitter.java:563)
at backtype.storm.daemon.executor$fn__6647$tuple_action_fn__6649.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__6647$fn__6659$fn__6706.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:748)
STORMTWITTER STORM CLASSTER ЛОКАЛЬНАЯ ТОПОЛОГИЯ КЛАССА:
public class StormTwitter
{
private final Logger LOGGER = Logger.getLogger(this.getClass());
public static void main(String[] args) throws Exception
{
BasicConfigurator.configure();
if (args != null && args.length > 0)
{
StormSubmitter.submitTopology(
args[0],
createConfig(false),
createTopology());
}
else
{
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(
"StormTwitter",
createConfig(true),
createTopology());
Thread.sleep(60000);
cluster.shutdown();
}
}
private static StormTopology createTopology()
{
SpoutConfig kafkaConf = new SpoutConfig(
new ZkHosts("localhost:2181"),
"Twitter-API",
"/Twitter-API",
"id");
kafkaConf.bufferSizeBytes = 1024*1024*4;
kafkaConf.fetchSizeBytes = 1024*1024*4;
kafkaConf.forceFromStart = true;
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder topology = new TopologyBuilder();
topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);
topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4).shuffleGrouping("kafka_spout");
topology.setBolt("text_filter", new TextFilterBolt(), 4).shuffleGrouping("twitter_filter");
topology.setBolt("stemming", new StemmingBolt(), 4).shuffleGrouping("text_filter");
topology.setBolt("positive", new PositiveSentimentBolt(), 4).shuffleGrouping("stemming");
topology.setBolt("negative", new NegativeSentimentBolt(), 4).shuffleGrouping("stemming");
topology.setBolt("join", new JoinSentimentsBolt(), 4).fieldsGrouping("positive", new Fields("tweet_id")).fieldsGrouping("negative", new Fields("tweet_id"));
topology.setBolt("score", new SentimentScoringBolt(), 4).shuffleGrouping("join");
topology.setBolt("hdfs", new HDFSBolt(), 4).shuffleGrouping("score");
return topology.createTopology();
}
private static Config createConfig(boolean local)
{
int workers = 4;
Config conf = new Config();
conf.setDebug(true);
if (local)
conf.setMaxTaskParallelism(workers);
else
conf.setNumWorkers(workers);
return conf;
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
КЛАСС HDFSBOLT:
class HDFSBolt extends BaseRichBolt{
private static final long serialVersionUID = 42L;
private static final Logger LOGGER = Logger.getLogger(HDFSBolt.class);
private OutputCollector collector;
private int id;
private List<String> tweet_scores;
@SuppressWarnings("rawtypes")
public void prepare(
Map stormConf,
TopologyContext context,
OutputCollector collector)
{
this.id = context.getThisTaskId();
this.collector = collector;
this.tweet_scores = new ArrayList<String>(100);
}
public void execute(Tuple input)
{
LOGGER.debug("Writing to HDFS after scoring");
Long id = input.getLong(input.fieldIndex("tweet_id"));
String tweet = input.getString(input.fieldIndex("tweet_text"));
Float pos = input.getFloat(input.fieldIndex("pos_score"));
Float neg = input.getFloat(input.fieldIndex("neg_score"));
String score = input.getString(input.fieldIndex("score"));
String tweet_score =
String.format("%s,%s,%f,%f,%s\n", id, tweet, pos, neg, score);
this.tweet_scores.add(tweet_score);
if (this.tweet_scores.size() >= 100)
{
writeToHDFS();
this.tweet_scores = new ArrayList<String>(100);
}
}
private void writeToHDFS()
{
FileSystem hdfs = null;
Path file = null;
OutputStream os = null;
BufferedWriter wd = null;
try
{
Configuration conf = new Configuration();
conf.addResource(new Path("C:/Spark/spark-3.0.0-preview2-bin-hadoop2.7/etc/hadoop/core-site.xml"));
conf.addResource(new Path("C:/Spark/spark-3.0.0-preview2-bin-hadoop2.7/etc/hadoop/hdfs-site.xml"));
hdfs = FileSystem.get(conf);
file = new Path(
"hdfs://0.0.0.0:19000/twitter_realtime_data/tweets/score/"+this.id+".txt");
if (hdfs.exists(file))
os = hdfs.append(file);
else
os = hdfs.create(file);
wd = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
for (String tweet_score : tweet_scores)
{
wd.write(tweet_score);
}
}
catch (IOException ex)
{
LOGGER.error("Failed to write tweet score to HDFS", ex);
LOGGER.trace(null, ex);
}
finally
{
try
{
if (os != null) os.close();
if (wd != null) wd.close();
/*if (hdfs != null) hdfs.close();*/
}
catch (IOException ex)
{
LOGGER.fatal("IO Exception thrown while closing HDFS", ex);
LOGGER.trace(null, ex);
}
}
}
ЗАВИСИМОСТЬ :
commons-cli-1.4.jar
commons-codec-1.11.jar
commons-codec-1.4.jar
commons-codec-1.6.jar
commons-collections-3.2.2.jar
commons-compress-1.18.jar
commons-configuration-1.10.jar
commons-exec-1.1.jar
commons-exec-1.3.jar
commons-fileupload-1.2.1.jar
commons-fileupload-1.3.3.jar
commons-io-2.4.jar
commons-lang-2.5.jar
commons-lang-2.6.jar
commons-lang3-3.8.1.jar
commons-logging-1.1.1.jar
commons-logging-1.1.3.jar
commons-logging-1.2.jar
htrace-core-3.1.0-incubating.jar
storm-core-0.9.5.jar
accessors-smart-1.2.jar
animal-sniffer-annotations-1.17.jar
asm-4.0.jar
asm-5.0.3.jar
audience-annotations-0.5.0.jar
carbonite-1.3.2.jar
checker-qual-2.5.2.jar
chill-java-0.3.5.jar
chill-java-0.8.0.jar
clj-stacktrace-0.2.2.jar
clj-time-0.4.1.jar
clojure-1.5.0.jar
clojure-1.5.1.jar
clojure-1.6.0.jar
clout-1.0.1.jar
compojure-1.1.3.jar
core.incubator-0.1.0.jar
core.specs.alpha-0.2.44.jar
curator-client-1.0.1.jar
curator-client-2.3.0.jar
curator-framework-1.0.1.jar
curator-framework-2.2.0-incubating.jar
dependencies2.txt
disruptor-2.10.1.jar
error_prone_annotations-2.2.0.jar
failureaccess-1.0.1.jar
guava-13.0.jar
guava-27.0.1-jre.jar
hadoop-auth-2.7.4.jar
hadoop-auth-2.8.5.jar
hadoop-client-2.7.4.jar
hiccup-0.3.6.jar
httpclient-4.1.1.jar
httpclient-4.5.6.jar
httpcore-4.1.jar
httpcore-4.4.10.jar
j2objc-annotations-1.1.jar
jackson-annotations-2.5.4.jar
jackson-core-2.10.0.jar
jackson-databind-2.10.3.jar
jackson-databind-2.4.4.jar
jackson-dataformat-smile-2.9.8.jar
jackson-xml-databind-0.6.2.jar
javax.annotation-api-1.3.2.jar
javax.inject-1.jar
javax.servlet-api-3.1.0.jar
jaxb-api-2.3.0.jar
jcip-annotations-1.0-1.jar
jcl-over-slf4j-1.7.25.jar
jetty-6.1.26.jar
jetty-continuation-9.4.14.v20181114.jar
jetty-http-9.4.14.v20181114.jar
jetty-io-9.4.14.v20181114.jar
jetty-security-9.4.14.v20181114.jar
jetty-server-9.4.14.v20181114.jar
jetty-servlet-9.4.14.v20181114.jar
jetty-servlets-9.4.14.v20181114.jar
jetty-util-6.1.26.jar
jetty-util-9.4.14.v20181114.jar
jgrapht-core-0.9.0.jar
jline-0.9.94.jar
jline-2.11.jar
joda-time-2.0.jar
joda-time-2.3.jar
json-simple-1.1.1.jar
json-smart-2.3.jar
jsr305-3.0.2.jar
junit-3.8.1.jar
kafka-clients-0.8.2.1.jar
kafka_2.10-0.8.2.1.jar
kryo-2.21.jar
libthrift7-0.7.0.jar
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
log4j-1.2.17.jar
log4j-api-2.8.2.jar
log4j-core-2.8.2.jar
log4j-over-slf4j-1.6.6.jar
log4j-slf4j-impl-2.8.2.jar
lz4-1.3.0.jar
math.numeric-tower-0.0.1.jar
maven-artifact-3.6.0.jar
maven-builder-support-3.6.0.jar
maven-model-3.6.0.jar
maven-model-builder-3.6.0.jar
maven-repository-metadata-3.6.0.jar
maven-resolver-api-1.3.3.jar
maven-resolver-connector-basic-1.3.3.jar
maven-resolver-impl-1.3.3.jar
maven-resolver-provider-3.6.0.jar
maven-resolver-spi-1.3.3.jar
maven-resolver-transport-file-1.3.3.jar
maven-resolver-transport-http-1.3.3.jar
maven-resolver-util-1.3.3.jar
meat-locker-0.3.1.jar
metrics-core-2.2.0.jar
metrics-graphite-3.2.6.jar
minlog-1.2.jar
minlog-1.3.0.jar
mockito-all-1.9.5.jar
netty-3.10.6.Final.jar
netty-3.6.3.Final.jar
nimbus-jose-jwt-4.41.1.jar
objenesis-1.2.jar
objenesis-2.1.jar
plexus-component-annotations-1.7.1.jar
plexus-interpolation-1.25.jar
plexus-utils-3.1.0.jar
reflectasm-1.07-shaded.jar
reflectasm-1.10.1.jar
ring-core-1.1.5.jar
ring-devel-0.3.11.jar
ring-jetty-adapter-0.3.11.jar
ring-servlet-0.3.11.jar
rocksdbjni-5.18.3.jar
scala-library-2.10.4.jar
servlet-api-2.5-20081211.jar
servlet-api-2.5.jar
slf4j-api-1.6.5.jar
slf4j-api-1.7.26.jar
slf4j-api-1.7.5.jar
snakeyaml-1.11.jar
snappy-java-1.1.2.6.jar
spec.alpha-0.2.176.jar
storm-clojure-2.1.0.jar
storm-hdfs-2.1.0.jar
storm-kafka-0.9.3.jar
storm-kafka-monitor-2.1.0.jar
storm-server-2.1.0.jar
storm-shaded-deps-2.1.0.jar
storm-submit-tools-2.1.0.jar
tools.cli-0.2.2.jar
tools.cli-0.2.4.jar
tools.logging-0.2.3.jar
tools.macro-0.1.0.jar
tools.nrepl-0.2.3.jar
zookeeper-3.5.4-beta.jar
activation-1.1.1.jar
aircompressor-0.10.jar
algebra_2.12-2.0.0-M2.jar
antlr-runtime-3.5.2.jar
antlr4-runtime-4.7.1.jar
aopalliance-1.0.jar
aopalliance-repackaged-2.6.1.jar
apacheds-i18n-2.0.0-M15.jar
apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
arrow-format-0.15.1.jar
arrow-memory-0.15.1.jar
arrow-vector-0.15.1.jar
audience-annotations-0.5.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
avro-mapred-1.8.2-hadoop2.jar
bonecp-0.8.0.RELEASE.jar
breeze-macros_2.12-1.0.jar
breeze_2.12-1.0.jar
cats-kernel_2.12-2.0.0-M4.jar
chill-java-0.9.3.jar
chill_2.12-0.9.3.jar
commons-beanutils-1.9.4.jar
commons-cli-1.2.jar
commons-codec-1.10.jar
commons-collections-3.2.2.jar
commons-compiler-3.0.15.jar
commons-compress-1.8.1.jar
commons-configuration-1.6.jar
commons-crypto-1.0.0.jar
commons-dbcp-1.4.jar
commons-digester-1.8.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-lang3-3.9.jar
commons-logging-1.1.3.jar
commons-math3-3.4.1.jar
commons-net-3.1.jar
commons-pool-1.5.4.jar
commons-text-1.6.jar
compress-lzf-1.0.3.jar
core-1.1.2.jar
curator-client-2.7.1.jar
curator-framework-2.7.1.jar
curator-recipes-2.7.1.jar
datanucleus-api-jdo-4.2.4.jar
datanucleus-core-4.1.17.jar
datanucleus-rdbms-4.1.19.jar
derby-10.12.1.1.jar
dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
flatbuffers-java-1.9.0.jar
generex-1.0.2.jar
gson-2.2.4.jar
guava-14.0.1.jar
guice-3.0.jar
guice-servlet-3.0.jar
hadoop-annotations-2.7.4.jar
hadoop-auth-2.7.4.jar
hadoop-client-2.7.4.jar
hadoop-common-2.7.4.jar
hadoop-hdfs-2.7.4.jar
hadoop-mapreduce-client-app-2.7.4.jar
hadoop-mapreduce-client-common-2.7.4.jar
hadoop-mapreduce-client-core-2.7.4.jar
hadoop-mapreduce-client-jobclient-2.7.4.jar
hadoop-mapreduce-client-shuffle-2.7.4.jar
hadoop-yarn-api-2.7.4.jar
hadoop-yarn-client-2.7.4.jar
hadoop-yarn-common-2.7.4.jar
hadoop-yarn-server-common-2.7.4.jar
hadoop-yarn-server-web-proxy-2.7.4.jar
HikariCP-2.5.1.jar
hive-beeline-2.3.6.jar
hive-cli-2.3.6.jar
hive-common-2.3.6.jar
hive-exec-2.3.6-core.jar
hive-jdbc-2.3.6.jar
hive-llap-common-2.3.6.jar
hive-metastore-2.3.6.jar
hive-serde-2.3.6.jar
hive-service-rpc-2.3.6.jar
hive-shims-0.23-2.3.6.jar
hive-shims-2.3.6.jar
hive-shims-common-2.3.6.jar
hive-shims-scheduler-2.3.6.jar
hive-storage-api-2.6.0.jar
hive-vector-code-gen-2.3.6.jar
hk2-api-2.6.1.jar
hk2-locator-2.6.1.jar
hk2-utils-2.6.1.jar
htrace-core-3.1.0-incubating.jar
httpclient-4.5.6.jar
httpcore-4.4.12.jar
istack-commons-runtime-3.0.8.jar
ivy-2.4.0.jar
jackson-annotations-2.10.0.jar
jackson-core-2.10.0.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.10.0.jar
jackson-dataformat-yaml-2.10.0.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
jackson-module-jaxb-annotations-2.10.0.jar
jackson-module-paranamer-2.10.0.jar
jackson-module-scala_2.12-2.10.0.jar
jackson-xc-1.9.13.jar
jakarta.activation-api-1.2.1.jar
jakarta.annotation-api-1.3.5.jar
jakarta.inject-2.6.1.jar
jakarta.validation-api-2.0.2.jar
jakarta.ws.rs-api-2.1.6.jar
jakarta.xml.bind-api-2.3.2.jar
janino-3.0.15.jar
javassist-3.22.0-CR2.jar
javax.inject-1.jar
javax.jdo-3.2.0-m3.jar
javax.servlet-api-3.1.0.jar
javolution-5.5.1.jar
jaxb-api-2.2.2.jar
jaxb-runtime-2.3.2.jar
jcl-over-slf4j-1.7.16.jar
jdo-api-3.0.1.jar
jersey-client-2.29.1.jar
jersey-common-2.29.1.jar
jersey-container-servlet-2.29.1.jar
jersey-container-servlet-core-2.29.1.jar
jersey-hk2-2.29.1.jar
jersey-media-jaxb-2.29.1.jar
jersey-server-2.29.1.jar
jetty-6.1.26.jar
jetty-sslengine-6.1.26.jar
jetty-util-6.1.26.jar
JLargeArrays-1.5.jar
jline-2.14.6.jar
joda-time-2.10.5.jar
jodd-core-3.5.2.jar
jpam-1.1.jar
json-1.8.jar
json4s-ast_2.12-3.6.6.jar
json4s-core_2.12-3.6.6.jar
json4s-jackson_2.12-3.6.6.jar
json4s-scalap_2.12-3.6.6.jar
jsp-api-2.1.jar
jsr305-3.0.0.jar
jta-1.1.jar
JTransforms-3.1.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-4.0.2.jar
kubernetes-client-4.6.4.jar
kubernetes-model-4.6.4.jar
kubernetes-model-common-4.6.4.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.12.0.jar
log4j-1.2.17.jar
logging-interceptor-3.12.6.jar
lz4-java-1.7.0.jar
machinist_2.12-0.6.8.jar
macro-compat_2.12-1.1.1.jar
mesos-1.4.0-shaded-protobuf.jar
metrics-core-4.1.1.jar
metrics-graphite-4.1.1.jar
metrics-jmx-4.1.1.jar
metrics-json-4.1.1.jar
metrics-jvm-4.1.1.jar
minlog-1.3.0.jar
netty-all-4.1.42.Final.jar
objenesis-2.5.1.jar
okapi-shade-0.4.2.jar
okhttp-3.12.6.jar
okio-1.15.0.jar
opencsv-2.3.jar
orc-core-1.5.8.jar
orc-mapreduce-1.5.8.jar
orc-shims-1.5.8.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.3.jar
paranamer-2.8.jar
parquet-column-1.10.1.jar
parquet-common-1.10.1.jar
parquet-encoding-1.10.1.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.1.jar
parquet-jackson-1.10.1.jar
protobuf-java-2.5.0.jar
py4j-0.10.8.1.jar
pyrolite-4.30.jar
RoaringBitmap-0.7.45.jar
scala-collection-compat_2.12-2.1.1.jar
scala-compiler-2.12.10.jar
scala-library-2.12.10.jar
scala-parser-combinators_2.12-1.1.2.jar
scala-reflect-2.12.10.jar
scala-xml_2.12-1.2.0.jar
shapeless_2.12-2.3.3.jar
shims-0.7.45.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
snakeyaml-1.24.jar
snappy-java-1.1.7.3.jar
spark-catalyst_2.12-3.0.0-preview2.jar
spark-core_2.12-3.0.0-preview2.jar
spark-cypher_2.12-3.0.0-preview2.jar
spark-graph-api_2.12-3.0.0-preview2.jar
spark-graphx_2.12-3.0.0-preview2.jar
spark-graph_2.12-3.0.0-preview2.jar
spark-hive-thriftserver_2.12-3.0.0-preview2.jar
spark-hive_2.12-3.0.0-preview2.jar
spark-kubernetes_2.12-3.0.0-preview2.jar
spark-kvstore_2.12-3.0.0-preview2.jar
spark-launcher_2.12-3.0.0-preview2.jar
spark-mesos_2.12-3.0.0-preview2.jar
spark-mllib-local_2.12-3.0.0-preview2.jar
spark-mllib_2.12-3.0.0-preview2.jar
spark-network-common_2.12-3.0.0-preview2.jar
spark-network-shuffle_2.12-3.0.0-preview2.jar
spark-repl_2.12-3.0.0-preview2.jar
spark-sketch_2.12-3.0.0-preview2.jar
spark-sql_2.12-3.0.0-preview2.jar
spark-streaming_2.12-3.0.0-preview2.jar
spark-tags_2.12-3.0.0-preview2-tests.jar
spark-tags_2.12-3.0.0-preview2.jar
spark-unsafe_2.12-3.0.0-preview2.jar
spark-yarn_2.12-3.0.0-preview2.jar
spire-macros_2.12-0.17.0-M1.jar
spire-platform_2.12-0.17.0-M1.jar
spire-util_2.12-0.17.0-M1.jar
spire_2.12-0.17.0-M1.jar
ST4-4.0.4.jar
stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.9.6.jar
super-csv-2.2.0.jar
transaction-api-1.1.jar
univocity-parsers-2.8.3.jar
velocity-1.5.jar
xbean-asm7-shaded-4.15.jar
xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.5.jar
zjsonpatch-0.3.0.jar
zookeeper-3.4.14.jar
zstd-jni-1.4.4-3.jar