Привет всем, пожалуйста, есть проблема с использованием kafka-connect с java у меня не работает, есть кто-нибудь, кто может мне помочь?
- kafka: localhost: 9092
- zookeeper: localhost: 2181
Трассировка стека :
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] partition assignment took 2 ms.
current active tasks: []
current standby tasks: []
previous active tasks: []
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:972)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
... 3 more
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
Код :
public static void main(String[] args) {
String bootstrapServer = "127.0.0.1:9092";
String applicationId = "kafka-streams-test";
// Creating properties
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
// Creating topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
// input topic
KStream<String, String> inputTopic = streamsBuilder.stream("tweets");
KStream<String, String> filteredStream = inputTopic.filter(
(k, tweets) -> filterTweet(extractFollowerNumberFromTweets(tweets))
);
filteredStream.to("deletesTweets");
// building topology
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
}
private static BigInteger extractFollowerNumberFromTweets(String tweet) {
try {
JSONObject jsonObject = new JSONObject(tweet);
return jsonObject.getJSONObject("payload").getJSONObject("User").getBigInteger("FollowersCount");
} catch (Exception ex) {
return BigInteger.valueOf(0);
}
}
static boolean filterTweet(BigInteger followerCount) {
if (followerCount.compareTo(BigInteger.valueOf(1000)) == 1) {
return true;
}
return false;
}
я уже создал свою топи c, я добавил туда несколько твитов
это мои журналы после выполнения