Я создаю небольшое приложение, которое должно помещать данные из API Twitter в поток aws kinesis. мой код в настоящее время выглядит следующим образом ...
package course;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import com.amazonaws.services.kinesis.producer.*;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import static com.google.common.util.concurrent.Futures.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import twitter4j.RawStreamListener;
import twitter4j.Status;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
public class TwitterProducerMain {
public static void main(String... args) {
TwitterStream twitterStream = createTwitterStream();
twitterStream.addListener(createListener());
twitterStream.sample();
}
private static TwitterStream createTwitterStream() {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setOAuthConsumerKey("xxx")
.setOAuthConsumerSecret("xxx")
.setOAuthAccessToken("xxx")
.setOAuthAccessTokenSecret("xxx");
return new TwitterStreamFactory(cb.build()).getInstance();
}
private static RawStreamListener createListener() {
KinesisProducer kinesisProducer = createKinesisProducer();
return new TweetsStatusListener(kinesisProducer);
}
private static KinesisProducer createKinesisProducer() {
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
.setRequestTimeout(60000)
.setRegion("us-east-1")
.setRecordMaxBufferedTime(15000);
return new KinesisProducer(config);
}
private static class TweetsStatusListener implements RawStreamListener {
private KinesisProducer kinesisProducer;
public TweetsStatusListener(KinesisProducer kinesisProducer) {
this.kinesisProducer = kinesisProducer;
}
public void onMessage(String tweetJson) {
try {
Status status = TwitterObjectFactory.createStatus(tweetJson);
if (status.getUser() != null) {
byte[] tweetBytes = tweetJson.getBytes(StandardCharsets.UTF_8);
String partitionKey = status.getLang();
ListenableFuture<UserRecordResult> f = kinesisProducer.addUserRecord("twitter", partitionKey,
ByteBuffer.wrap(tweetBytes));
addCallback(f, new FutureCallback<UserRecordResult>() {
public void onSuccess(UserRecordResult userRecordResult) {
}
public void onFailure(Throwable throwable) {
if (throwable instanceof UserRecordFailedException) {
UserRecordFailedException e = (UserRecordFailedException) throwable;
UserRecordResult result = e.getResult();
com.amazonaws.services.kinesis.producer.Attempt last = Iterables
.getLast(result.getAttempts());
System.err.println(String.format("Put failed - %s", last.getErrorMessage()));
}
}
});
System.out.println(tweetJson);
System.out.println(status.getUser());
System.out.println(status.getText());
}
} catch (TwitterException e) {
e.printStackTrace();
}
}
private void addCallback(ListenableFuture<UserRecordResult> f, FutureCallback<UserRecordResult> userRecordResultFutureCallback) {
}
public void onException(Exception e) {
e.printStackTrace();
}
}
}
Но когда я запускаю программу, я получаю следующее сообщение об ошибке ...
Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
at course.TwitterProducerMain.createKinesisProducer(TwitterProducerMain.java:42)
at course.TwitterProducerMain.createListener(TwitterProducerMain.java:37)
at course.TwitterProducerMain.main(TwitterProducerMain.java:22)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.auth.AWSCredentialsProvider
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 3 more
Кажется, что мой класс не может читать библиотеки KinesisProducerConfiguration, но я указал ее в своем пути к библиотеке intelliJ, и в текстовом редакторе нет сообщений об ошибках (в нем говорится, что он импортировал класс, а также прогнозно показаны различные методы для этого класса).
Я не совсем уверен, что происходит, и был бы признателен за любую помощь в этом. Спасибо!