DB2 Express-C 9.7.2 - ошибка согласованности при повторяемом чтении - PullRequest
0 голосов
/ 03 марта 2011

Я выполняю задание по настройке БД, в котором я выполняю свопы сальдо по счетам одновременно с суммой всех сальдо.

Моя настройка заключается в том, что я запускаю, например, 50 потоков с 100 свопами.В настоящее время семантика фиксации DB2 установлена ​​на ON, как в конфигурации по умолчанию.

Если я повторяю эксперимент достаточно много раз, я получаю ошибку согласованности, т.е. изменяется общая сумма балансов.Следовательно, у меня должно было быть несколько потерянных обновлений.

Потерянные обновления не должны быть возможны при повторяемом чтении (?).

Насколько я понимаю, в настоящее время принятая семантика работает как изоляция моментального снимка, так что еслидругая транзакция обращается к данным, в то время как исходная транзакция записывает, она использует текущие зафиксированные данные, а не незафиксированные записи в одновременно выполняемой транзакции.Это может привести к потере обновлений.Но в соответствии с тем, что я могу найти в документации по db2, это относится только к стабильности курсора.

Теперь я получаю потерянные обновления с уровнем повторяемой изоляции чтения.У кого-нибудь есть идея, почему это так?

РЕДАКТИРОВАТЬ:

swap1.sql:

select balance from accounts where number = ? for update;

swap2.sql:

update accounts set balance = ? where number = ?;

sum.sql:

select sum(balance) from accounts;

init.sql

create table accounts(number integer not null primary key, branchnum integer not null, balance float not null);


#!/usr/bin/env python
# encoding: utf-8
"""
DB2/ValueOfSerializability/experiment.py

import sys
import getopt
import timeit
import multiprocessing
import random
import os
import ibm_db
import time


### Experiment parameters (default values)
NBRUNS         = 5          # Number of runs (-r:, --runs=)
NBSWAPS        = 100        # Number of swaps (-s:, --swaps=)
NBSWAPTHREADS  = 10         # Number of swap threads (-t:, --threads=)
RANGE_LOW      = 1          # Lower bound of the range for account number
RANGE_UP       = 1000000    # Upper bound of the range for account number
ISOL_LEVEL     = 'RR'

### Output parameters (default values)
OUTPUT_FILE_PATH  = '.'   # Path of the output file output.txt (append)

### Database parameters (DATABASE; HOSTNAME; PORT; USERNAME; PASSWORD)
sys.path.append("..")
from db2 import *

# Process Manager data structure
q = None

""""
Swapping of balance values.
read balance for account number X into valX and for account number Y into valY.
update account number X with balance set to valY
update account number Y with balance set to valX
X < Y
We avoid deadlocks because of the clustered index on
account number that garantees that account numbers
are accessed in acending order.
"""
def swap(q):
    swap1_str= q[0]; swap2_str = q[1]
    # Connect to DB
    conn = ibm_db.pconnect(DATABASE, USERNAME, PASSWORD)
    if conn is None: raise Usage(ibm_db.conn_errormsg())
    ibm_db.autocommit(ibm_db.SQL_AUTOCOMMIT_OFF)
    # Set isolation level
    ret = ibm_db.exec_immediate(conn, "SET CURRENT ISOLATION = "+ISOL_LEVEL)
    # Prepare Statements
    swap1_stmt = ibm_db.prepare(conn, swap1_str)
    if (swap1_stmt == False):
        raise Usage("Failed to prepare swap1 query")
    swap2_stmt = ibm_db.prepare(conn, swap2_str)
    if (swap2_stmt == False):
        raise Usage("Failed to prepare swap2 update")
    # Execute Statements
    nbrep = int(round(NBSWAPS / NBSWAPTHREADS))
    for i in range(nbrep):
        x = random.randint(RANGE_LOW, RANGE_UP/2)
        y = random.randint(x,RANGE_UP)
        if ibm_db.execute(swap1_stmt, (x,)) == False:
            raise Usage("Failed to execute the swap1 query (x)")
        valX = ibm_db.fetch_tuple(swap1_stmt)
        if valX == False:
            raise Usage("Failed to iterate over the swap1 result set (x)")
        if ibm_db.execute(swap1_stmt, (y,)) == False:
            raise Usage("Failed to execute the swap1 query (y)")
        valY = ibm_db.fetch_tuple(swap1_stmt)
        if valY == False:
            raise Usage("Failed to iterate over the swap1 result set (y)")
        time.sleep(0.1)
        if ibm_db.execute(swap2_stmt, (valY[0],x)) == False:
            raise Usage("Failed to execute the swap2 query (x, valY)")
        if ibm_db.execute(swap2_stmt, (valX[0],y)) == False:
            raise Usage("Failed to execute the swap1 query (y, valX)")
        ibm_db.commit(conn)
    # Disconnect from DB
    status = ibm_db.close(conn)
    if status == False: raise Usage("Failed to close db connection.\n")


def summation(q):
    sum_str = q[2]
    # Connect to DB
    conn = ibm_db.pconnect(DATABASE, USERNAME, PASSWORD)
    if conn is None:
        raise Usage(ibm_db.conn_errormsg())
    ibm_db.autocommit(ibm_db.SQL_AUTOCOMMIT_OFF)
    # Set isolation level
    ret = ibm_db.exec_immediate(conn, "SET CURRENT ISOLATION = "+ISOL_LEVEL)
    # Prepare statement
    sum_stmt   = ibm_db.prepare(conn, sum_str)
    if (sum_stmt == False): raise Usage("Failed to prepare sum query")
    # Execute statement
    if ibm_db.execute(sum_stmt) == False:
        raise Usage("Failed to execute the sum query")
    sum= ibm_db.fetch_tuple(sum_stmt)
    ibm_db.commit(conn)
    # Print result set to output file
    try:
      f = open(OUTPUT_FILE_PATH+'/output.txt', 'a')
      f.write(str(sum)+'\n')
      f.close()
    except IOError, e:
      raise Usage("Failed to manipulate output.txt.\n")
    finally:
      f.close()
    # Disconnect from DB
    status = ibm_db.close(conn)
    if status == False: raise Usage("Failed to close db connection.\n")

"""
Thread wrapper class
"""
class Thread(multiprocessing.Process):
    def __init__(self, target, *args):
        multiprocessing.Process.__init__(self, target=target, args=args)
        self.start()

def experiment(q):
    ThreadL = []
    # Launch swap threads
    for n in range(NBSWAPTHREADS):
        ThreadL.append(Thread(swap,q))
    # Launch Summation thread
    ThreadL.append(Thread(summation, q))
    # Barrier
    for t in ThreadL:
        t.join()

help_message = '''
python sumNswap.py [options]
options:
-h, --help       : this help message
-t, --threads=   : number of swap threads (1..59)
-s, --swaps=     : total number of swaps (< 1000)
-r, --runs=      : number of repetitions (< 100)
-i, --isol=      : isolation level ('UR', 'CS', 'RS','RR')
-o, --output=    : path to output file (result.txt)

Executes sum and swap transactions against the database described in ../db2.py
and prints timing

Example: python sumNswap.py -t10 -s1000 -r5 -iCS
'''

class Usage(Exception):
    def __init__(self, msg):
        self.msg = msg

def main(argv=None):
    global NBRUNS, NBSWAPS, NBSWAPTHREADS, RANGE_LOW, RANGE_UP, ISOL_LEVEL
    global OUTPUT_FILE_PATH
    global q
    try:
        if argv is None:
            argv = sys.argv

            try:
                opts, args = getopt.getopt(argv[1:],
                "ho:vr:s:t:g:i:",
                ["help", "output=", "runs=","swaps=", "threads=", "isol="])
            except getopt.error, msg:
                raise Usage(msg)

        # Option processing
        for option, value in opts:
            if option == "-v":
                verbose = True
            if option in ("-h", "--help"):
                raise Usage(help_message)
            if option in ("-r", "--runs"):
                v = int(value)
                if not (v < 100): raise Usage("Runs out of bounds")
                NBRUNS = v
            if option in ("-s", "--swaps"):
                v = int(value)
                if not (v < 10000): raise Usage("Swaps out of bounds")
                NBSWAPS = v
            if option in ("-t", "--threads"):
                v = int(value)
                if (v < 0 or v>60): raise Usage("Threads out of bounds")
                NBSWAPTHREADS = v
            if option in ("-i", "--isol"):
                if not value in ['UR', 'CS', 'RS', 'RR']: raise Usage("Isolation level not supported")
                ISOL_LEVEL = value
            if option in ("-o", "--output"):
                if not os.path.exists(value): raise Usage("Result file path does not exist")
                OUTPUT_FILE_PATH= value

        # Verify preconditions: required sql files exist
        try:
            f = open('sum.sql', 'r')
            sum_str = f.readline()
            f.close()
        except IOError, e:
            raise Usage("Failed to manipulate sum.sql.\n")

        try:
            f = open('swap1.sql', 'r')
            swap1_str = f.readline()
            f.close()
        except IOError, e:
            raise Usage("Failed to manipulate swap1.sql.\n")

        try:
            f = open('swap2.sql', 'r')
            swap2_str = f.readline()
            f.close()
        except IOError, e:
            raise Usage("Failed to manipulate swap2.sql.\n")

        print 'run (isol: '+ISOL_LEVEL+', threads: '+str(NBSWAPTHREADS)+', swaps:'+str(NBSWAPS)+')'
        # Queue Initialization
        manager = multiprocessing.Manager()
        q = manager.list([swap1_str, swap2_str, sum_str])

        # Timed experiment
        t = timeit.Timer("experiment(q)", "from __main__ import experiment,q")
        timings = []
        try:
            # repeat 1 experiment NBRUNS time - output is a list of timing
            timings = t.repeat(NBRUNS,1)
            # Log timing
            for timing in timings:
                s = str(timing)
                print s        
        except: 
            raise Usage(t.print_exc())

    except Usage, err:
        print >> sys.stderr, sys.argv[0].split("/")[-1] + ": " + str(err.msg)
        print >> sys.stderr, "\t for help use --help"
        return 2

if __name__ == "__main__":
    sys.exit(main())

введите кодздесь

Ответы [ 2 ]

3 голосов
/ 03 марта 2011

Фактически НАСТОЯЩЕЕ ОБЯЗАТЕЛЬСТВО является свойством изоляции СТАБИЛЬНОСТИ КУРСОРА. Все, что делает CC, - это получает текущую зафиксированную версию строки всякий раз, когда она сталкивается с блокировкой, вместо того, чтобы «ждать результата», что было традиционным поведением DB2.

REPEATABLE READ - это другой уровень изоляции.

Если есть какая-либо документация DB2, приравненная к НАСТОЯЩЕМУ ОБЯЗАТЕЛЬНОМУ ПОВТОРНОМУ ЧТЕНИЮ, пожалуйста, укажите мне, чтобы я мог исправить это.

0 голосов
/ 30 января 2012

Я думаю, что потерянные обновления возможны в следующем сценарии для повторяемого чтения:

в сценарии с уровнем изоляции, установленным в RR:

1) транзакция t1 считывает данные из строки r1,

2) транзакция t2 считывает те же данные из строки r1 (поскольку блокировка чтения используется совместно),

3) t1 изменяет данные, считанные в # 1, и передает данные в r1 (обновляет общее чтениеблокировка на эксклюзивную блокировку записи и освобождение ее после фиксации)

4) t2 изменяет данные, считанные в # 2, и фиксирует данные в r1 (последовательность блокировки аналогична описанной выше) - обновление t1 потеряно.

Я полагаю, что одним из решений этой проблемы является использование версий, обнаружение обновлений и повторная попытка транзакции.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...