기존 트랜잭션 인터페이스에서는 시작, 커밋 혹은 어보트하는 인터페이스를 제공합니다. 여러 수행의 결과를 유기적으로 사용할 수 있는 장점이 있지만, 한 트랜잭션이 길어질 수 있는 문제가 있습니다. 길어진 트랜잭션은 시스템 리소스에 대한 잠금을 잡기 때문에 대규모 트래픽에서는 문제를 일으킬 수 있습니다. 따라서 DynamoDB에서는 여러 연산을 한 번에 요청하고, 모두 성공 혹은 모두 실패로 반환합니다.
기존 트랜잭션 인터페이스들은 새 버전을 생성하는 동안 읽기 전용 트랜잭션은 이전 버전의 데이터에 접근할 수 있는 MVCC 구현체를 많이 사용합니다. MVCC를 지원하기 위해서는 PSQL처럼 row에 append 하거나, MySQL에서 undo 영역을 관리해야 합니다. 이런 작업들은 기존의 DynamoDB 서버를 크게 변경해야 하고 추가 데이터 적재를 위해 많은 저장 공간이 필요합니다. 따라서 DynamoDB에서는 MVCC를 지원하지 않고, 한 row는 동시에 하나의 버전으로만 업데이트됩니다.
2단계 잠금(2PL)은 일반적으로 동시 트랜잭션이 동일한 데이터 항목을 읽고 쓰는 것을 방지하기 위해 사용되지만, 단점이 있습니다. 잠금은 동시성을 제한하고 교착 상태를 초래할 수 있습니다. 또한 트랜잭션의 일부로 잠금을 획득한 후 해당 트랜잭션이 커밋되기 전에 애플리케이션이 실패할 경우 잠금을 해제하기 위한 복구 메커니즘이 필요합니다. 설계를 간소화하고 운영을 최소화 하기 위해 DynamoDB는 잠금을 완전히 피하는 낙관적인 동시성 제어 체계를 사용합니다.
1. 트랜잭션 요청이 들어오면, 요청 라우터가 요청에 필요한 인증과 권한 부여를 수행한 후 트랜잭션 코디네이터에게 요청을 전달합니다.
2. 트랜잭션 코디네이터는 트랜잭션을 key 기준으로 연산을 나누고, key에 해당하는 스토리지 노드들과 통신하여 트랜잭션을 완료 혹은 실패 처리합니다.
3. 그 결과는 다시 라우터를 통해 클라이언트로 전달됩니다.
트랜잭션 코디네이터(TC)는 각 스토리지 노드(SN)들과 2단계 커밋 프로토콜(2PC)로 소통합니다.
수도 코드로 표현하면 다음과 같습니다.
TransactWriteItem(TransactWriteItems input):
# Prepare all items
TransactionState = 'PREPARING'
for operation in input:
sendPrepareAsyncToSN(operation)
waitForAllPreparesToComplete()
# Evaluate whether to commit or cancel the transaction
if all prepares succeeded:
TransactionState = 'COMMITTING'
for operation in input:
sendCommitAsyncToSN(operation)
waitForAllCommitsToComplete()
TransactionState = 'COMPLETED'
return 'SUCCESS'
else:
TransactionState = 'CANCELLING'
for operation in input:
sendCancellationAsyncToSN(operation)
waitForAllCancellationsToComplete()
TransactionState = 'COMPLETED'
return 'ReasonForCancellation'
2PC의 문제 중 하나인 코디네이터가 죽어서 응답 처리가 안되는 문제를 해결하기 위해 Ledger(장부)를 이용합니다.
def processPrepare(PrepareInput input):
item = readItem(input)
if item != NONE:
if (evaluateConditionsOnItem(item, input.conditions) and
evaluateSystemRestrictions(item, input) and
item.timestamp < input.timestamp and
item.ongoingTransactions == NONE):
item.ongoingTransaction = input.transactionId
return SUCCESS
else:
return FAILED
else: # item does not exist
item = new Item(input.item)
if (evaluateConditionsOnItem(item, input.conditions) and
evaluateSystemRestrictions(input) and
partition.maxDeleteTimestamp < input.timestamp):
item.ongoingTransaction = input.transactionId
return SUCCESS
return FAILED
def processCommit(CommitInput input):
item = readItem(input)
if item == NONE or item.ongoingTransaction != input.transactionId:
return COMMIT_FAILED
applyChangeForCommit(item, input.writeOperation)
item.ongoingTransaction = NONE
item.timestamp = input.timestamp
return SUCCESS
def processCancel(CancellationInput input):
item = readItem(input)
if item == NONE or item.ongoingTransaction != input.transactionId:
return CANCELLATION_FAILED
item.ongoingTransaction = NONE
# item was only created as part of this transaction
if item was created during prepare:
deleteItem(item)
return SUCCESS