Получение NotSerializableException - При использовании Spark Streaming с Kafka - PullRequest
0 голосов
/ 24 апреля 2019

Я использую SparkStreaming для чтения данных из темы. Я сталкиваюсь с исключением в этом.

java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord Стек сериализации: - объект не сериализуем (класс: org.apache.kafka.clients.consumer.ConsumerRecord, значение: ConsumerRecord (topic = rawEventTopic, раздел = 0, смещение = 14098, CreateTime = 1556113016951, размер сериализованного ключа = -1, сериализованное значение size = 2916, headers = RecordHeaders (headers = [], isReadOnly = false), ключ = ноль, значение = { "ID": нулевой, "сообщение": нулевая, "EVENTDATE": "", "группа": нулевой, "категория": "AD", "Username": нулевой, "inboundDataSource": "AD", "источник ":" 192.168.1.14" , "место назначения": "192.168.1.15", "BytesSent": "200KB", "RAWDATA": "{имя пользователя: Vinit}», "account_name": нулевой, "security_id": нулевой, "ACCOUNT_DOMAIN": нулевой, "logon_id": нулевой, "process_id": нулевой, "PROCESS_INFORMATION": нулевой, "process_name": нулевой, "target_server_name": нуль "source_network_address": нулевой, "logon_process": нулевой, "authentication_Package": нулевой, "network_address": нулевой, "failure_reason": нулевой, "workstation_name": нулевой, "target_server": нулевой, "network_information": нулевой, "object_type": нулевой, "object_name": нулевой, "source_port": нулевой, "logon_type": нулевой, "group_name": нулевой, "source_dra": нулевой, "destination_dra": нулевой, "group_admin": нулевой, "sam_account_name ": нулевой," new_logon ": нулевой," destination_address ": нулевой," destination_port ": нулевой," source_address ": нулевой," logon_account ": нулевой," SUB_STATUS ": нулевой," EVENTDATE ": нулевой," TIME_TAKEN ": нуль "s_computername": нулевой, "cs_method": нулевой, "cs_uri_stem": нулевой, "cs_uri_query": нулевой, "c_ip": нулевой, "s_ip": нулевой, "s_supplier_name": нулевой, "s_sitename": нулевой, "cs_username": нулевой, "cs_auth_group": нулевой, "cs_categories": нулевой, "s_action": нулевой, "cs_host": нулевой, "cs_uri": нулевой, "cs_uri_scheme": нулевой, "cs_uri_port": п Улла, "cs_uri_path": нулевой, "cs_uri_extension": нулевой, "cs_referer": нулевой, "cs_user_agent": нулевой, "cs_bytes": нулевой, "sc_status": нулевой, "sc_bytes": нулевой, "sc_filter_result": нулевой, "sc_filter_category": нулевой, "x_virus_id": нулевой, "x_exception_id": нулевой, "rs_content_type": нулевой, "s_supplier_ip": нулевой, "cs_cookie": нулевой, "s_port": нулевой, "cs_version": нулевой, "CreationTime ": нулевой," операция ": нулевая," нагрузка ": нулевая," ClientIP ": нулевой," идентификатор пользователя ": нулевой," EventSource ": нулевой," ItemType ": нулевой," UserAgent ": нулевой," данныеСобытие ": нуль "sourceFileName": нулевой, "SiteURL": нулевой, "targetUserOrGroupType": нулевой, "targetUserOrGroupName": нулевой, "sourceFileExtension": нулевой, "sourceRelativeUrl": нулевой, "resultStatus": нулевой, "клиент": нулевой, "LoginStatus": нулевой, "UserDomain": нулевой, "clientIPAddress": нулевой, "clientProcessName": нулевой, "clientVersion": нулевой, "externalAccess": нулевой, "Тип входа": нулевой, "mailboxOwnerUPN": нулевой, "OrganizationName ": нулевой," originatingServer ": нулевой," субъект ": нулевой," sendAsUserSmtp ": нулевой," deviceexternalid ": нулевой," deviceeventcategory ": нулевой," devicecustomstring1 ": нулевой," customnumber2" : п Улла, "customnumber1": нулевой, "EmailSender": нулевой, "sourceusername": нулевой, "sourceaddress": нулевой, "emailrecipient": нулевой, "DestinationAddress": нулевой, "DestinationPort": нулевой, "requestclientapplication": нулевой, "oldfilepath": нулевой, "Путь к файлу": нулевой, "additionaldetails11": нулевой, "applicationprotocol": нулевой, "emailrecipienttype": нулевой, "почтового сообщения": нулевой, "transactionstring1": нулевой, "deviceaction": нулевой, "devicecustomdate2 ": нулевой," devicecustomdate1 ": нулевой," sourcehostname ": нулевой," additionaldetails10 ": нулевой," имя файла ": нулевой," bytesout ": нулевой," additionaldetails13 ": нулевой," additionaldetails14 ": нулевой," AccountName ": нуль "destinationhostname": нулевой, "DataSourceID": 2, "дата": "", "нарушили" ложь "oobjectId": нулевой, "eventCategoryName": "AD", "sourceDataType": "AD"}) ) - элемент массива (индекс: 0) - массив (класс [Lorg.apache.kafka.clients.consumer.ConsumerRecord ;, размер 1) в org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40) ~ [spark-core_2.11-2.3.0.jar: 2.3.0] в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 46) ~ [spark-core_2.11-2.3.0.jar: 2.3.0] в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 100)~ [spark-core_2.11-2.3.0.jar: 2.3.0] в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 393) ~ [spark-core_2.11-2.3.0.jar: 2.3.0] на java.util.concurrent.ThreadPoolExecutor.runWorker (неизвестный источник) [na: 1.8.0_151] на java.util.concurrent.ThreadPoolExecutor $ Worker.run (неизвестный источник) [na: 1.8.0_151] at java.lang.Thread.run (неизвестный источник) [na: 1.8.0_151]

2019-04-24 19: 07: 00.025 ОШИБКА 21144 --- [result-getter-1] o.apache.spark.scheduler.TaskSetManager: Задача 1.0 на этапе 48.0 (TID 97) привела к не сериализуемому результату: org.apache.kafka.clients.consumer.ConsumerRecord

Код для чтения данных темы приведен ниже-

 @Service
public class RawEventSparkConsumer {
    private final Logger logger = LoggerFactory.getLogger(RawEventSparkConsumer.class);

    @Autowired
    private DataModelServiceImpl dataModelServiceImpl;

    @Autowired
    private JavaStreamingContext streamingContext;

    @Autowired
    private JavaInputDStream<ConsumerRecord<String, String>> messages;

    @Autowired
    private EnrichEventKafkaProducer enrichEventKafkaProd;

    @PostConstruct
    private void sparkRawEventConsumer() {

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {

            messages.foreachRDD((rdd) -> {

                List<ConsumerRecord<String, String>> rddList = rdd.collect();
                Iterator<ConsumerRecord<String, String>> rddIterator = rddList.iterator();
                while (rddIterator.hasNext()) {
                    ConsumerRecord<String, String> rddRecord = rddIterator.next();

                    if (rddRecord.topic().toString().equalsIgnoreCase("rawEventTopic")) {
                        ObjectMapper mapper = new ObjectMapper();
                        BaseDataModel csvDataModel = mapper.readValue(rddRecord.value(), BaseDataModel.class);
                        EnrichEventDataModel enrichEventDataModel = (EnrichEventDataModel) csvDataModel;
                        enrichEventKafkaProd.sendEnrichEvent(enrichEventDataModel);

                    } else if (rddRecord.topic().toString().equalsIgnoreCase("enrichEventTopic")) {
                        System.out.println("************getting enrichEventTopic data ************************");
                    }

                }

            });

            streamingContext.start();

            try {
                streamingContext.awaitTermination();
            } catch (InterruptedException e) { // TODO Auto-generated catch block
                e.printStackTrace();
            }
        });

    }

Это код конфигурации.

@Bean
public JavaInputDStream<ConsumerRecord<String, String>> getKafkaParam(JavaStreamingContext streamingContext) {
            Map<String, Object> kafkaParams = new HashedMap();
            kafkaParams.put("bootstrap.servers", "localhost:9092");
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "group1");
            kafkaParams.put("auto.offset.reset", "latest");
            kafkaParams.put("enable.auto.commit", false);
            Collection<String> topics = Arrays.asList(rawEventTopic,enrichEventTopic);

            return KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );


        }

Пожалуйста, помогите.Я застрял в этой точке.

1 Ответ

0 голосов
/ 12 мая 2019

Нашел решение моей проблемы в следующей ссылке -

org.apache.spark.SparkException: задача не сериализуема

объявить внутренний класс как статическую переменную:

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
}
...