использование потоков kafka с java, вызывающее ошибку: «Неожиданный переход состояния из STARTING в PARTITIONS_ASSIGNED» - PullRequest
1 голос
/ 23 апреля 2020

Привет всем, пожалуйста, есть проблема с использованием kafka-connect с java у меня не работает, есть кто-нибудь, кто может мне помочь?

  • kafka: localhost: 9092
  • zookeeper: localhost: 2181


Трассировка стека :

[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] partition assignment took 2 ms.
    current active tasks: []
    current standby tasks: []
    previous active tasks: []

[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:972)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] Unexpected state transition from STARTING to PARTITIONS_ASSIGNED
    at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:216)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:280)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    ... 3 more
[est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [est-fcdd4514-2d81-41d6-aead-15a8b08e6051-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN


Код :

public static void main(String[] args) {
        String bootstrapServer = "127.0.0.1:9092";
        String applicationId = "kafka-streams-test";

        // Creating properties
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                       Serdes.StringSerde.class.getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                       Serdes.StringSerde.class.getName());

        // Creating topology
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        // input topic
        KStream<String, String> inputTopic = streamsBuilder.stream("tweets");

        KStream<String, String> filteredStream = inputTopic.filter(
                (k, tweets) -> filterTweet(extractFollowerNumberFromTweets(tweets))
        );
        filteredStream.to("deletesTweets");

        // building topology
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.start();
}

private static BigInteger extractFollowerNumberFromTweets(String tweet) {
    try {
        JSONObject jsonObject = new JSONObject(tweet);
        return jsonObject.getJSONObject("payload").getJSONObject("User").getBigInteger("FollowersCount");
    } catch (Exception ex) {
        return BigInteger.valueOf(0);
    }
}

static boolean filterTweet(BigInteger followerCount) {
    if (followerCount.compareTo(BigInteger.valueOf(1000)) == 1) {
        return true;
    }
    return false;
}

я уже создал свою топи c, я добавил туда несколько твитов

это мои журналы после выполнения

...