Amazon 에서 개발 및 사용되는 오픈 소스 도구
Deequ 는 Apache Spark 을 기반으로 구축된 라이브러리
main component 들을 살펴보자
maven
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>2.0.0-spark-3.1</version>
</dependency>
sbt
libraryDependencies += "com.amazon.deequ" % "deequ" % "2.0.0-spark-3.1"
대규모 데이터셋의 이해가 어려운 경우, Column 단위 프로파일링 지원
자동으로 제약 조건 제안
data quality metric을 파일 시스템에 저장
특정 metric을 MetricRepository 에 저장한 후, 현재와 과거 값을 비교하여 비정상적인 변경을 감지
spark 에서 데이터를 읽는 것과 동일
val rawData = spark.read.parquet(path)
rawData.printSchema()
를 통해 데이터 스키마를 살펴보자
root
|-- marketplace: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: integer (nullable = true)
|-- helpful_votes: integer (nullable = true)
|-- total_votes: integer (nullable = true)
|-- vine: string (nullable = true)
|-- year: integer (nullable = true)
아래와 같이 룰을 정하고 Data가 이 조건을 만족하는지 Test를 진행해보자.
review_id
가 절대 NULL이 될 수 없다review_id
는 Unique 해야한다star_rating
는 1.0 ~ 5.0 사이의 값만 가질 수 있다marketplace
오직 “US”, “UK”, “DE”, “JP”, “FR” 만 포함 가능하다year
는 절대 음수가 될 수 없다import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
val verificationResult: VerificationResult = { VerificationSuite()
// data to run the verification on
.onData(dataset)
// define a data quality check
.addCheck(
Check(CheckLevel.Error, "Review Check")
.hasSize(_ >= 3000000) // at least 3 million rows
.hasMin("star_rating", _ == 1.0) // min is 1.0
.hasMax("star_rating", _ == 5.0) // max is 5.0
.isComplete("review_id") // should never be NULL
.isUnique("review_id") // should not contain duplicates
.isComplete("marketplace") // should never be NULL
// contains only the listed values
.isContainedIn("marketplace", Array("US", "UK", "DE", "JP", "FR"))
.isNonNegative("year")) // should not contain negative values
// compute metrics and verify check conditions
.run()
}
// convert check results to a Spark data frame
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.show(truncate=false)
UniquenessConstraint(Uniqueness(List(review_id)))
를 제외하고 모든 제약 조건을 만족하는 것을 확인할 수 있다VerificationResult.successMetricsAsDataFrame(spark, verificationResult).show(truncate=False)
데이터의 열이 너무 많거나 수동으로 제약 조건을 정의하기 어려운 경우 사용합니다.
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._ // for toDS method
// We ask deequ to compute constraint suggestions for us on the data
val suggestionResult = { ConstraintSuggestionRunner()
// data to suggest constraints for
.onData(dataset)
// default set of rules for constraint suggestion
.addConstraintRules(Rules.DEFAULT)
// run data profiling and constraint suggestion
.run()
}
// We can now investigate the constraints that Deequ suggested.
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(column, constraint.description, constraint.codeForConstraint)
}
}.toSeq.toDS()
suggestionDataFrame.show(truncate=false)
val allConstraints = suggestionResult.constraintSuggestions
.flatMap { case (_, suggestions) => suggestions.map { _.constraint }}
.toSeq
val generatedCheck = Check(CheckLevel.Error, "generated constraints", allConstraints) //passing the generated checks to verificationSuite
val verificationWithSuggestionResult = VerificationSuite()
.onData(testData)
.addChecks(Seq(generatedCheck))
.run()
val verificationWithSuggestionResultDataFrame = checkResultsAsDataFrame(spark, verificationWithSuggestionResult)
verificationWithSuggestionResultDataFrame.show()