зачем передавать аргументы конструктору класса операторной функции - это null для Flink? - PullRequest
0 голосов
/ 09 мая 2020

Я изучаю Flink, я хочу создать операторную функцию, которая расширяет ProcessWindowFunction и перегрузить новый конструктор с параметром в качестве значения поля класса, но когда этот класс создается экземпляром без этого поля, я запутался. код, как показано ниже.

import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.FieldType;
import com.aliyun.datahub.client.model.PutRecordsResult;
import io.github.streamingwithflink.chapter8.PoJoElecMeterSource;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class DataHubSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.enableCheckpointing(10_000L);
        env.setParallelism(2);

        RecordSchemaSer schema = new RecordSchemaSer();

        schema.addField(new Field("id", FieldType.STRING));            

        DataStream<PutRecordsResult> out = env
                .addSource(new PoJoElecMeterSource())
                .keyBy( r -> r.getId())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(3))) 
                .process(new PutDatahubFunction<>(schema));  // PutDatahubFunction is my build a new Operator function class

        env.execute();
    }
}

схема переменных - это параметр, который я хочу отправить конструктору, это экземпляр класса RecordSchemaSer

import com.aliyun.datahub.client.model.RecordSchema;
import java.io.Serializable;

public class RecordSchemaSer
        extends RecordSchema
        implements Serializable {

}

PutDatahubFunction - это класс, расширяющий ProcessWindowFunction, код следующим образом:

import com.aliyun.datahub.client.model.*;

import io.github.streamingwithflink.chapter8.PUDAPoJo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;

public class PutDatahubFunction<IN extends PUDAPoJo, KEY>
        extends ProcessWindowFunction<IN, PutRecordsResult, KEY, TimeWindow> {

    private DataHubBase dataHubHandler;
    private List<RecordEntry> recordEntries;
    private RecordSchema schema;

    public PutDatahubFunction(RecordSchema schema) {

        this.schema = schema;
        System.out.println("field 'id' not exist ? " + this.schema.containsField("id"));  // it's true
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        .........
    }

    @Override
    public void process(KEY KEY,
                        Context context,
                        Iterable<IN> elements,
                        Collector<PutRecordsResult> out)
            throws Exception {

        RecordEntry entry = new RecordEntry();

        for (IN e : elements) {
            System.out.println("field 'id' not exist ? " + this.schema.containsField("id")); // it's false
            ......
        }

    }
}

первый system.out в конструкторе this.schema.containsField ("id") имеет значение true, но второй system.out в методе процесса this.schema.containsField ( "id") ложно! Зачем? У меня есть имя класса system.out два экземпляра, которые оба являются PutDatahubFunction.

использование ValueState не работает, потому что конструктор не вызывает getRuntimeContext (), иначе исключение в потоке «main» java .lang.IllegalStateException: контекст времени выполнения не инициализирован. код, как показано ниже:

private ValueState<RecordSchema>  schema;


public PutTupleDatahubFunction(RecordSchema schema) throws IOException {
    ValueStateDescriptor schemaDes =
            new ValueStateDescriptor("datahub schema", TypeInformation.of(RecordSchema.class));
    /*
     * error Exception in thread "main" java.lang.IllegalStateException:
     * The runtime context has not been initialized.
     */
    this.schema = getRuntimeContext().getState(schemaDes);
    this.schema.update(schema);
}

Я очень нечеткий, кто может сказать мне причину. Есть ли способ передать аргументы конструктору этого класса операторной функции? спасибо.

1 Ответ

0 голосов
/ 13 мая 2020

Я наконец понял, почему , причина в сериализации и десериализации. Я не кодирую RecordSchemaSer, причина заключается в сериализации содержимого из-за null

public class RecordSchemaSer
        extends RecordSchema
        implements Serializable
{


}
...