Concurrency control

유상기·2023년 2월 21일
0

Delta Lake

목록 보기
12/16
post-thumbnail

본 글은 Delta Lake 2.2.0 Concurrency control 을 번역 및 정리하였습니다.

Delta Lake는 읽기와 쓰기 사이에서 ACID 트랜잭션 보장을 제공합니다. 이것은 다음을 의미합니다:

  • 지원되는 스토리지 시스템의 경우, 여러 클러스터에서 여러 작성자가 테이블 파티션을 동시에 수정하고 일련의 쓰기에 대한 일관된 스냅샷 뷰를 볼 수 있습니다.
  • 작업 중에 테이블이 수정되어도 Spark 작업이 시작된 시점에서의 일관된 스냅샷 뷰를 계속 볼 수 있습니다.

Optimistic concurrency control

Delta Lake는 쓰기 간의 트랜잭션 보장을 위해 낙관적 동시성 제어(Optimistic Concurrency Control)를 사용합니다. 이 메커니즘 아래에서 쓰기 작업은 세 단계에서 수행됩니다:

  1. 읽기: (필요한 경우) 테이블의 가장 최근 버전을 읽어 어떤 파일을 수정해야 하는지 식별합니다.
  2. 쓰기: 새로운 데이터 파일을 쓰기 작업을 위해 모두 변경합니다.
  3. 유효성 검사 및 커밋: 변경 사항을 커밋하기 전에, 읽은 스냅샷 이후에 동시에 커밋된 다른 변경 사항과 충돌하는지 확인합니다. 충돌이 없다면, 모든 변경 사항이 새로운 버전화된 스냅샷으로 커밋되고 쓰기 작업이 성공합니다. 하지만 충돌이 있으면 쓰기 작업은 Parquet 테이블에서와 같이 테이블을 손상시키지 않도록 동시 수정 예외를 발생시킵니다.

Write conflicts

다음 표는 서로 충돌 가능한 쓰기 작업 쌍을 나열합니다. Compaction은 dataChange 옵션을 false로 설정하여 작성된 파일 압축 작업을 의미합니다.

Avoid conflicts using partitioning and disjoint command conditions

모든 "충돌 가능"으로 표시된 경우, 두 작업이 충돌하는지 여부는 두 작업이 동일한 파일 세트에 대해 작동하는지 여부에 따라 다릅니다. 작업의 조건에 사용된 열과 동일한 열을 기준으로 테이블을 파티셔닝하여 두 파일 세트를 분리할 수 있습니다. 예를 들어, UPDATE table WHERE date > '2010-01-01' ... 및 DELETE table WHERE date < '2010-01-01' 두 명령은 테이블이 날짜로 파티셔닝되지 않은 경우 동일한 파일 세트를 수정하려고 시도하여 충돌합니다. 날짜별로 테이블을 파티셔닝하면 충돌을 피할 수 있습니다. 따라서 명령에서 일반적으로 사용되는 조건에 따라 테이블을 파티셔닝하면 충돌을 크게 줄일 수 있습니다. 그러나 높은 카디널리티(cardinality) 열을 기준으로 테이블을 파티셔닝하면 대규모 하위 디렉토리로 인한 다른 성능 문제가 발생할 수 있습니다.

Conflict exceptions

이 예외는 동시에 수행되는 작업이 읽는 작업의 동일한 파티션(또는 비파티션 테이블의 어디든지)에 파일을 추가 할 때 발생합니다. 파일 추가는 INSERT, DELETE, UPDATE 또는 MERGE 작업에 의해 발생할 수 있습니다.

이 예외는 종종 동시 DELETE, UPDATE 또는 MERGE 작업 중에 발생합니다. 동시 작업은 물리적으로 서로 다른 파티션 디렉토리를 업데이트 할 수 있지만, 그 중 하나는 다른 작업이 동시에 업데이트하는 동일한 파티션을 읽을 수 있으므로 충돌이 발생합니다. 이를 조건문에서 분명하게 나누어 작업하면 이를 회피 할 수 있습니다. 다음은 예입니다.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

위의 코드를 서로 다른 날짜나 국가에 대해 동시에 실행한다고 가정해 보겠습니다. 각 작업은 대상 Delta 테이블의 독립적인 파티션에서 작업하므로 어떠한 충돌도 예상하지 않습니다. 그러나 조건이 명확하지 않으며 전체 테이블을 스캔하며 다른 파티션을 업데이트하는 동시 작업과 충돌할 수 있습니다. 대신 다음 예제와 같이 특정 날짜와 국가를 합병 조건에 추가하여 문을 다시 작성할 수 있습니다.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

이 작업은 이제 다른 날짜 및 국가에서 동시에 안전하게 실행할 수 있습니다.

ConcurrentDeleteReadException

이 예외는 동시 작업이 파일을 삭제하면서 해당 파일을 읽는 작업이 발생할 때 발생합니다. 이런 경우 DELETE, UPDATE, 또는 MERGE 작업이 파일을 다시 작성할 수 있습니다.

ConcurrentDeleteDeleteException

동시에 실행되는 작업에서 같은 파일을 삭제하려는 경우 발생하는 예외입니다. 이는 두 개의 동시 컴팩션 작업이 동일한 파일을 재작성하면 발생할 수 있습니다.

MetadataChangedException

이 예외는 Delta 테이블의 메타데이터를 동시에 업데이트하는 동시 트랜잭션이 있는 경우 발생합니다. ALTER TABLE 작업 또는 테이블 스키마를 업데이트하는 Delta 테이블에 대한 쓰기 작업이 일반적인 원인입니다.

ConcurrentTransactionException

만약 동일한 체크포인트 위치를 사용하는 스트리밍 쿼리가 동시에 여러번 시작되고 동시에 Delta 테이블에 쓰기를 시도하면 이 예외가 발생합니다. 두 개의 스트리밍 쿼리가 동일한 체크포인트 위치를 사용하고 동시에 실행되지 않도록해야 합니다.

ProtocolChangedException

Delta Lake 에서 이 예외는 다음과 같은 경우에 발생할 수 있습니다:

  • Delta table 이 새 버전으로 업그레이드 되었을 때. 이후 작업이 성공하려면 Delta Lake 버전을 업그레이드해야 할 수 있습니다.
  • 여러 작성기가 동시에 테이블을 만들거나 대체하는 경우.
  • 여러 작성기가 동시에 빈 경로에 작성하는 경우.
profile
Data/AI Solution Architect

0개의 댓글