Текущее количество рабочих в задании потока данных отображается в журналах сообщений под autoscaling
.Например, я быстро выполнил работу в качестве примера и получил следующее сообщение при отображении журналов заданий в облачной оболочке:
INFO:root:2019-01-28T16:42:33.173Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 0 based on the rate of progress in the currently running step(s).
INFO:root:2019-01-28T16:43:02.166Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently running step(s).
INFO:root:2019-01-28T16:43:05.385Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:root:2019-01-28T16:43:05.433Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
Теперь вы можете запрашивать эти сообщения с помощью projects.jobs.messages.list
, в API потока данных и настройку параметра minimumImportance
равным JOB_MESSAGE_BASIC
.
Вы получите ответ, подобный следующему:
...
"autoscalingEvents": [
{...} //other events
{
"currentNumWorkers": "1",
"eventType": "CURRENT_NUM_WORKERS_CHANGED",
"description": {
"messageText": "(fcfef6769cff802b): Worker pool started.",
"messageKey": "POOL_STARTUP_COMPLETED"
},
"time": "2019-01-28T16:43:02.130129051Z",
"workerPool": "Regular"
},
Чтобы расширить его, вы можете создать скрипт Python для анализа ответа и получить параметр currentNumWorkers
только из последнегоэлемент в списке autoscalingEvents
, чтобы узнать, какое число (следовательно, текущее) число рабочих в задании.
Обратите внимание, что если этот параметр отсутствует, это означает, что число рабочих равно нулю..
Редактировать :
Я сделал быстрый скрипт на python, который извлекает текущее количество рабочих из журналов сообщений, используя API, который я упомянул выше:
from google.oauth2 import service_account
import googleapiclient.discovery
credentials = service_account.Credentials.from_service_account_file(
filename='PATH-TO-SERVICE-ACCOUNT-KEY/key.json',
scopes=['https://www.googleapis.com/auth/cloud-platform'])
service = googleapiclient.discovery.build(
'dataflow', 'v1b3', credentials=credentials)
project_id="MY-PROJECT-ID"
job_id="DATAFLOW-JOB-ID"
messages=service.projects().jobs().messages().list(
projectId=project_id,
jobId=job_id
).execute()
try:
print("Current number of workers is "+messages['autoscalingEvents'][-1]['currentNumWorkers'])
except:
print("Current number of workers is 0")
Пара замечаний:
Области - это разрешения, необходимые для ключа служебной учетной записи, на которую вы ссылаетесь (в функции from_service_account_file
), чтобы сделатьвызов API.Эта строка необходима для аутентификации в API.Вы можете использовать любой из этого списка , чтобы упростить свою работу, я просто использовал служебный ключ учетной записи с разрешениями project/owner
.
Если выХотите узнать больше о клиентских библиотеках Python API, проверьте эту документацию и этот пример .