У меня есть два Java-объекта, которые принимают входные данные и возвращают выходные данные. Чтобы сделать их доступными в Python, я запускаю JVM с помощью subprocess.popen(java -Xmx3g -Xms3g -jar <fullpath\test.jar>)
, а затем строю шлюз с помощью:
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=11001),
callback_server_parameters=CallbackServerParameters(port=21001))
# load functions
modelCallCard = gateway.entry_point.modelCallCard
modelCallFlexi = gateway.entry_point.modelCallFlexi
modelCallCard & modelCallFlexi - это некоторые функции Java (я не знаю,много о тех, которые я нигде не мог найти), которые работают нормально, если я не использую многопроцессорность:
def modelCall(args):
PRODUCT = args['PRODUCT']
derivedVarsJson = args['derivedVarsJson']
if PRODUCT.upper() == 'CARD':
scoreInfoJson = modelCallCard(derivedVarsJson)
else:
scoreInfoJson = modelCallFlexi(derivedVarsJson)
return scoreInfoJson
def main():
df = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])
df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')
df['scoreInfoJson'] = df['_source_'].map(modelCall)
if __name__ == '__main__':
main()
Я хочу использовать многопроцессорность и параллельно вызывать вышеупомянутую функцию. Для этого я изменил функцию на:
def outer_func(args2):
temp_df = args2[0]
modelCallCard = args2[1]
modelCallFlexi = args2[2]
def modelCall(args):
PRODUCT = args['PRODUCT']
derivedVarsJson = args['derivedVarsJson']
if PRODUCT.upper() == 'CARD':
scoreInfoJson = modelCallCard(derivedVarsJson)
else:
scoreInfoJson = modelCallFlexi(derivedVarsJson)
return scoreInfoJson
temp_df['scoreInfoJson'] = temp_df['_source_'].map(modelCall)
return temp_df
Чтобы вызвать модифицированную функцию с использованием многопроцессорной обработки, я внес следующие изменения:
def main():
cpuCount = 2
pool = multiprocessing.Pool(processes=cpuCount)
reader = pd.read_csv(inputFileName, sep=dlm_out, encoding='utf-8', header=0, chunksize=chunkSize, index_col='_index_', usecols=['_index_', 'ACC_ID', 'PRODUCT', 'derivedVarsJson'])
for c, df in enumerate(reader):
logger.info(f'iterating ({c}) ...')
df['_source_'] = df[['PRODUCT', 'derivedVarsJson']].to_dict(orient='records')
dfs = np.array_split(df, cpuCount)
args = [(dfs[i], modelCallCard, modelCallFlexi) for i in range(cpuCount)]
dfResults = pool.map(outer_func, args)
df = pd.concat(dfResults)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Но этот код выдает ошибку:
py4j.protocol.Py4JError: An error occurred while calling t.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)