Я пытаюсь получить данные из Twitter с помощью Apache Flume и сохранить их в HDFS, но у меня возникли проблемы
Это мой flume-env.sh
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
$JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"
FLUME_CLASSPATH="/home/vineasouza/apache-flume-1.9.0-bin/lib/flume-sources-1.0-SNAPSHOT.jar"
Это мой twitter.conf
# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = #
TwitterAgent.sources.Twitter.consumerSecret = #
TwitterAgent.sources.Twitter.accessToken = #
TwitterAgent.sources.Twitter.accessTokenSecret = #
TwitterAgent.sources.Twitter.keywords = brasil
# Describing/Configuring the sink
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://0.0.0.0:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600
# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 10000
# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel
И я запускаю эту команду
$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent
Но у меня такое исключение:
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] User-Agent: twitter4j http://twitter4j.org/ /3.0.3
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Connection: close
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-Version: 3.0.3
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-URL: http://twitter4j.org/en/twitter4j-3.0.3.xml
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Accept-Encoding: gzip
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client: Twitter4J
Exception in thread "Twitter Stream consumer-1[Establishing connection]" java.lang.OutOfMemoryError: Java heap space
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:239)
at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200)
at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:614)
at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:569)
at java.base/sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:740)
at java.base/sun.security.util.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:222)
at java.base/java.security.KeyStore.load(KeyStore.java:1479)
at java.base/sun.security.util.AnchorCertificates$1.run(AnchorCertificates.java:62)
at java.base/sun.security.util.AnchorCertificates$1.run(AnchorCertificates.java:53)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/sun.security.util.AnchorCertificates.<clinit>(AnchorCertificates.java:53)
at java.base/sun.security.provider.certpath.AlgorithmChecker.checkFingerprint(AlgorithmChecker.java:214)
at java.base/sun.security.provider.certpath.AlgorithmChecker.<init>(AlgorithmChecker.java:164)
at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:181)
at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:145)
at java.base/sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(PKIXCertPathValidator.java:84)
at java.base/java.security.cert.CertPathValidator.validate(CertPathValidator.java:309)
at java.base/sun.security.validator.PKIXValidator.doValidate(PKIXValidator.java:364)
at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:275)
at java.base/sun.security.validator.Validator.validate(Validator.java:264)
at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:183)
Кто-нибудь мог мне помочь? Я пробовал поисковые решения, но мою проблему ничего не решило