py4j - использовать объект члена java в многопроцессорной среде - PullRequest
0 голосов
/ 03 октября 2019

У меня есть два Java-объекта, которые принимают входные данные и возвращают выходные данные. Чтобы сделать их доступными в Python, я запускаю JVM с помощью subprocess.popen(java -Xmx3g -Xms3g -jar <fullpath\test.jar>)

, а затем строю шлюз с помощью:

gateway = JavaGateway(
        gateway_parameters=GatewayParameters(port=11001),
        callback_server_parameters=CallbackServerParameters(port=21001))

# load functions
modelCallCard = gateway.entry_point.modelCallCard
modelCallFlexi = gateway.entry_point.modelCallFlexi

modelCallCard & modelCallFlexi - это некоторые функции Java (я не знаю,много о тех, которые я нигде не мог найти), которые работают нормально, если я не использую многопроцессорность:

def modelCall(args):
    PRODUCT = args['PRODUCT']
    derivedVarsJson = args['derivedVarsJson']

    if PRODUCT.upper() == 'CARD':
        scoreInfoJson = modelCallCard(derivedVarsJson)
    else:
        scoreInfoJson = modelCallFlexi(derivedVarsJson)
    return scoreInfoJson

def main():
    df = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])

    df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')
    df['scoreInfoJson'] = df['_source_'].map(modelCall)


if __name__ == '__main__':
    main()

Я хочу использовать многопроцессорность и параллельно вызывать вышеупомянутую функцию. Для этого я изменил функцию на:

def outer_func(args2):
    temp_df = args2[0]
    modelCallCard = args2[1]
    modelCallFlexi = args2[2]

    def modelCall(args):
        PRODUCT = args['PRODUCT']
        derivedVarsJson = args['derivedVarsJson']

        if PRODUCT.upper() == 'CARD':
            scoreInfoJson = modelCallCard(derivedVarsJson)
        else:
            scoreInfoJson = modelCallFlexi(derivedVarsJson)
        return scoreInfoJson

    temp_df['scoreInfoJson'] = temp_df['_source_'].map(modelCall)

    return temp_df

Чтобы вызвать модифицированную функцию с использованием многопроцессорной обработки, я внес следующие изменения:

def main():
    cpuCount = 2
    pool = multiprocessing.Pool(processes=cpuCount)
    reader = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, chunksize=chunkSize, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])

    for c, df in enumerate(reader):
        logger.info(f'iterating ({c}) ...')

        df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')

        dfs = np.array_split(df, cpuCount)
        args = [(dfs[i], modelCallCard, modelCallFlexi) for i in range(cpuCount)]

        dfResults = pool.map(outer_func, args)
        df = pd.concat(dfResults)
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()

Но этот код выдает ошибку:

py4j.protocol.Py4JError: An error occurred while calling t.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
...