Всякий раз, когда приложение Stream Streaming пытается передать смещение Кафке, оно становится нулевым.
val consumer = new KafkaConsumer[String,String](kafka_props) //create a 2nd consumer to fetch last offset
import java.util
consumer.subscribe(util.Arrays.asList("topic_1")) //Subscribe to the 2nd consumer. Without this step, the offsetAndMetadata can't be fetched.
val offsetAndMetadata = consumer.committed(topicAndPartition) //Find last committed offset for the given topicAndPartition
val endOffset = offsetAndMetadata.offset().toLong //fetch the last committed offset from offsetAndMetadata and cast it to Long data type.
val fetch_from_offset = Map(new org.apache.kafka.common.TopicPartition("topic_1", 0) -> endOffset) // create a Map with data type (TopicPartition, Long)
Всякий раз, когда он подходит к этой точке, он становится нулевым
val offsetAndMetadata = consumer.committed (topicAndPartition)