Оптимизация задач по снижению ЦП в торговом приложении - PullRequest
3 голосов
/ 05 февраля 2011

Я разработал торговое приложение, которое обрабатывает инвестиционный портфель клиентов.

Я использую два вида хранилищ данных:

  1. Акции - содержит уникальное название акции и ее ежедневное изменение процента.
  2. UserTransactions - содержит информацию, касающуюся конкретной покупки акции, совершенной пользователем: стоимость покупки вместе со ссылкой на акцию для текущей покупки.

Модули Python db.Model:

class Stocks (db.Model):
stockname = db.StringProperty(multiline=True) 
dailyPercentChange=db.FloatProperty(default=1.0) 

class UserTransactions (db.Model): 
buyer = db.UserProperty() 
value=db.FloatProperty() 
stockref = db.ReferenceProperty(Stocks) 

Раз в час мне нужно обновлять базу данных: обновлять ежедневное процентное изменение в Stocks, а затем обновлять значение всехсущности в UserTransactions, которые ссылаются на этот запас.

Следующий модуль python выполняет итерацию по всем акциям, обновляет свойство dailyPercentChange и вызывает задачу для просмотра всех UserTransactions сущностей, которые ссылаются на акцию, и обновляет их значение:

Stocks.py

# Iterate over all stocks in datastore
for stock in Stocks.all():
   # update daily percent change in datastore
   db.run_in_transaction(updateStockTxn, stock.key()) 
   # create a task to update all user transactions entities referring to this stock
   taskqueue.add(url='/task', params={'stock_key': str(stock.key(), 'value' : self.request.get ('some_val_for_stock') }) 

def updateStockTxn(stock_key):
   #fetch the stock again - necessary to avoid concurrency updates
   stock = db.get(stock_key)
   stock.dailyPercentChange= data.get('some_val_for_stock') # I get this value from outside
   ... some more calculations here ...
   stock.put()

Task.py (/ task)

# Amount of transaction per task
amountPerCall=10
stock=db.get(self.request.get("stock_key")) 
# Get all user transactions which point to current stock
user_transaction_query=stock.usertransactions_set
cursor=self.request.get("cursor") 
if cursor: 
    user_transaction_query.with_cursor(cursor) 

# Spawn another task if more than 10 transactions are in datastore
transactions = user_transaction_query.fetch(amountPerCall) 
if len(transactions)==amountPerCall: 
    taskqueue.add(url='/task', params={'stock_key': str(stock.key(), 'value' : self.request.get ('some_val_for_stock'), 'cursor': user_transaction_query.cursor()  })

# Iterate over all transaction pointing to stock and update their value
for transaction in transactions: 
   db.run_in_transaction(updateUserTransactionTxn, transaction.key()) 

def updateUserTransactionTxn(transaction_key): 
   #fetch the transaction again - necessary to avoid concurrency updates
   transaction = db.get(transaction_key)
   transaction.value= transaction.value* self.request.get ('some_val_for_stock')
   db.put(transaction) 

Проблема:

В настоящее время система работает отлично, но проблема в том, что она плохо масштабируется ... У меня около 100 акций с 300 транзакциями пользователей, и я запускаю обновление каждый час.На панели инструментов я вижу, что файл task.py занимает около 65% процессорного времени (Stock.py занимает около 20-30%), и я использую почти все 6,5 бесплатных процессорных часа, предоставленных мне механизмом приложения.У меня нет проблем, чтобы включить биллинг и оплатить дополнительный процессор, но проблема заключается в масштабировании системы ... Использование 6,5 процессорных часов на 100 акций очень плохо.

Мне было интересно, учитывая требования системы, как упомянуто выше, существует ли лучшая и более эффективная реализация (или просто небольшое изменение, которое может помочь с текущей реализацией), чем представленная здесь.

Спасибо !!

Джоэл

1 Ответ

8 голосов
/ 07 февраля 2011

Необходимо сделать несколько очевидных улучшений:

  1. Вы должны использовать запрос keys_only в первом фрагменте: поскольку вы фактически не обращаетесь к свойствам объекта stock в любой точке,нет смысла его извлекать.Вы также можете получить только ключ.
  2. Массовое добавление задач можно выполнить с помощью метода .add объекта Queue, задокументированного здесь .Это более эффективно, чем добавление задач по отдельности.
  3. Ваши задачи объединяют новые в цепочку каждые 10 транзакций, но задачи могут выполняться до 10 минут, а для 10 транзакций хранилища данных может потребоваться не более одной или двух секунд.Вместо этого установите таймер в начале вашего запроса и проверяйте его каждый раз в цикле, прерывая и связывая следующую задачу, когда вы приближаетесь к 10-минутному пределу.
  4. Если вы ожидаете выполнить итерацию в течениебольшое количество сущностей, используйте .fetch и курсоры, а не повторяющиеся;итерация выборок небольшими партиями по 20 объектов.
  5. При обновлении отдельных объектов вы снова выполняете обычный запрос, но только с помощью ключа.Вместо этого выполните запрос keys_only.
  6. Является ли задача единственной вещью, которая обновит UserTransaction сущностей после их первоначальной записи?Если это так, вы можете пропустить транзакцию и обновить их в пакетном режиме.

Наконец, я бы предложил общий рефакторинг: вместо запуска нового задания для каждого запаса запустите внешний цикл внутри задачис таймером, упомянутым выше.Когда вы соединяете следующую задачу, используйте курсоры, чтобы передать текущее состояние и выбрать, где вы остановились.

Единственное, что нужно учитывать, - это если вы каким-то образом можете реструктурировать свои данные, чтобы избежать необходимостимного обновлений.Например, можете ли вы сделать так, чтобы сущности UserTransaction ссылались на какое-то значение в сущностях Stock, чтобы вы могли вычислить их фактическое значение во время выполнения, и вам нужно только обновить одну сущность Stock с изменением?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...