Поддерживает ли Google Dataflow конвейеры с состоянием, разработанные с использованием Python SDK? - PullRequest
1 голос
/ 29 марта 2019

Похоже, что Google Dataflow поддерживает конвейеры с сохранением состояния в некоторой степени в соответствии с матрицей возможностей и ранним сообщением о Quora

Однако, пока у меня есть Statefulконвейер, который работает нормально с DirectRunner, я получаю эту ошибку с DataflowRunner:

...
 File "pipelines/StatefulPipeline.py", line 197, in process
    states = list(location_state.read())
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 318, in __iter__
    for elem in self.first:
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 187, in __iter__
    data, continuation_token = self._state_handler.blocking_get(self._state_key)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 489, in blocking_get
    continuation_token=continuation_token)))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: java.lang.IllegalStateException: Tried to access state for stateless step: NameContext{stageName=s02, originalName=s12, systemName=s12, userName=AlertEngine}
    at org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
    at org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.stateInternals(StreamingModeExecutionContext.java:685)
    at org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.stateInternals(StreamingModeExecutionContext.java:737)
    at org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.lambda$handleBagUserState$5(RegisterAndProcessBundleOperation.java:509)
    at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
    at org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.handleBagUserState(RegisterAndProcessBundleOperation.java:505)
    at org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.delegateByStateKeyType(RegisterAndProcessBundleOperation.java:384)
    at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:130)
    at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:118)
    at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
    at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
    at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
    at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
    at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
    at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
 [while running 'generatedPtransform-327']
...

И мой код выглядит примерно так:

 class AlertEngine(DoFn):
        LOCATION_STATE = BagStateSpec('location', StrUtf8Coder())
        def process(self, msg, location_state=DoFn.StateParam(LOCATION_STATE)):
            location = msg.get('location')
            states = list(location_state.read())
            previous_location = states[0] if states else None
            if previous_location != location:
                yield location
                location_state.clear()
                location_state.add(location)

Поэтому я начал сомневаться, если конвейеры с состоянием конвейеров с PythonSDK просто не поддерживаются Dataflow.В заметках о выпуске Python SDK я нашел мало информации.

...