
참고: Scalar는 java 기반이다
Delta Lake는 Parquet 기반이며, 로그는 json
DESCRIBE EXTENDED 테이블명

DESCRIBE DETAIL 테이블명

location 필드: Delta Lake 테이블이 실제로 클라우드 객체 스토리지에 저장된 파일 모음으로 백업됨을 알 수 있음
Parquet 데이터 파일 + _delta_log 디렉토리 존재

OPTIMIZE 테이블명 ZORDER BY 필드
ZORDER 인덱싱을 위해 하나 또는 여러 개의 필드를 지정

DESCRIBE HISTORY

VERSION AS OF

DELETE FROM절을 사용했을 때, num_affected_rows 가 -1이라면 전체 데이터 디렉터리가 삭제되었음을 의미
RESTORE TABLE 테이블명 TO VERSION AS OF 버전

RESTORE 명령은 트랜잭션으로 기록됨SET spark.databricks.delta.retentionDurationCheck.enabled = false;SET spark.databricks.delta.vacuum.logging.enabled = true;VACUUM 테이블 RETAIN 시간 hours [DRY RUN]

델타 캐시는 현재 세션에서 쿼리된 파일의 복사본을 현재 활성 클러스터에 배포된 스토리지 볼륨에 저장하므로 이전 테이블 버전에 일시적으로 액세스할 수 있습니다(단, 시스템은 이러한 동작을 예상하도록 설계되어서는 안 됩니다).
클러스터를 다시 시작하면 캐시된 데이터 파일이 영구적으로 삭제됩니다.
Databricks에서 Spark SQL을 사용하여 파일에서 직접 데이터를 추출하는 방법
- Spark SQL을 사용하여 데이터 파일을 직접 쿼리
- 레이어 뷰 및 CTE를 사용하여 데이터 파일을 더 쉽게 참조
- text 및 binaryFile 메서드를 사용하여 원시 파일 내용을 검토
이 예제에서는 JSON 파일로 작성된 원시 Kafka 데이터 샘플을 사용
각 파일에는 5초 간격 동안 사용된 모든 레코드가 포함되어 있으며, 전체 Kafka 스키마와 함께 다중 레코드 JSON 파일로 저장
| field | type | description |
|---|---|---|
| key | BINARY | user_id 필드는 키로 사용됩니다. 세션/쿠키 정보에 해당하는 고유한 영숫자 필드입니다. |
| value | BINARY | JSON으로 전송되는 전체 데이터 페이로드(나중에 설명)입니다. |
| topic | STRING | Kafka 서비스는 여러 토픽을 호스팅하지만, clickstream 토픽의 레코드만 여기에 포함됩니다. |
| partition | INTEGER | 현재 Kafka 구현에서는 2개의 파티션(0과 1)만 사용합니다. |
| offset | LONG | 각 파티션에 대해 단조롭게 증가하는 고유한 값입니다. |
| 타임스탬프 | LONG | 이 타임스탬프는 에포크 이후 밀리초 단위로 기록되며, 프로듀서가 파티션에 레코드를 추가하는 시간을 나타냅니다. |

SELECT * FROM file_format.`/path/to/file`

SELECT * FROM file_format.`${DA.paths.kafka_events}`

- Spark SQL을 사용하여 외부 소스에서 데이터를 추출하는 옵션을 구성
- 다양한 파일 형식에 대해 외부 데이터 소스에 대한 테이블을 생성
- 외부 소스에 대해 정의된 테이블을 쿼리할 때의 기본 동작을 설명

컬럼 분리가 되어있지 않음
CREATE TABLE 테이블 (col_name1 col_type1, ...)
USING 파일형식
OPTIONS (
header = "val1",
delimiter = "val2",
key = "value",
...
)
LOCATION = path


참고: CSV를 데이터 소스로 사용하는 경우, 소스 디렉터리에 추가 데이터 파일이 추가되더라도 열 순서가 변경되지 않도록 해야 함. 이 데이터 형식은 스키마를 엄격하게 적용하지 않으므로 Spark는 테이블 선언 시 지정된 순서대로 열을 로드하고 열 이름과 데이터 유형을 적용


REFRESH TABLE 테이블명

CREATE TABLE
USING JDBC
OPTIONS (
url = "jdbc:{databaseServerType}://{jdbcHostname}:{jdbcPort}",
dbtable = "{jdbcDatabase}.table",
user = "{jdbcUsername}",
password = "{jdbcPassword}"
)




데이터웨어하우스와 같은 일부 SQL 시스템에는 사용자 지정 드라이버가 존재
Spark는 다양한 외부 데이터베이스와 서로 다르게 상호 작용하지만, 두 가지 기본적인 접근 방식은 다음과 같음
1. 전체 소스 테이블을 Databricks로 이동한 다음 현재 활성 클러스터에서 로직을 실행
2. 쿼리를 외부 SQL 데이터베이스로 푸시하고 결과만 Databricks로 다시 전송
두 경우 모두 외부 SQL 데이터베이스에서 매우 큰 데이터 세트를 사용하면 다음과 같은 이유로 상당한 오버헤드가 발생
1. 공용 인터넷을 통해 모든 데이터를 이동하는 데 따른 네트워크 전송 지연 시간
2. 빅데이터 쿼리에 최적화되지 않은 소스 시스템에서 쿼리 로직을 실행
- CTAS 문을 사용하여 Delta Lake 테이블 생성
- 기존 뷰 또는 테이블에서 새 테이블 생성
- 추가 메타데이터로 로드된 데이터 보강
- 생성된 열과 설명 주석을 사용하여 테이블 스키마 선언
- 데이터 위치, 품질 적용 및 파티셔닝을 제어하는 고급 옵션 설정
CREATE TABLE AS SELECT 문은 입력 쿼리에서 검색된 데이터를 사용하여 델타 테이블을 생성하고 채움



열 이름을 변경하거나 대상 테이블에서 열을 제외하는 것과 같은 간단한 변환은 테이블 생성 중에 쉽게 수행

뷰도 동일하게 사용 가능


date는 생성된 열이므로, date 열에 값을 제공하지 않고 purchase_dates에 값을 입력하면 Delta Lake에서 자동으로 계산

테이블에 삽입할 때 생성될 필드가 포함된 경우, 제공된 값이 생성된 열을 정의하는 데 사용된 로직에서 도출되는 값과 정확히 일치하지 않으면 삽입 작업이 실패



여러 추가 구성 및 메타데이터를 포함하도록 CTAS 문을 개선하는 방법





두 경우 모두 복제된 테이블에 적용된 데이터 수정 사항은 원본과 별도로 추적 및 저장
델타 레이크 테이블은 클라우드 객체 스토리지의 데이터 파일로 지원되는 테이블에 ACID 호환 업데이트를 제공
INSERT OVERWRITE를 사용하여 데이터 테이블 덮어쓰기INSERT INTO를 사용하여 테이블에 추가MERGE INTO를 사용하여 테이블에 추가, 업데이트 및 삭제COPY INTO를 사용하여 테이블에 데이터를 증분 방식으로 수집


(오류가 발생한 줄 알고 CRAS를 두 번 실행함을 유의)


대상 테이블의 데이터가 쿼리의 데이터로 대체
INSERT OVERWRITE:

INSERT INTO를 사용하여 기존 델타 테이블에 새로운 행을 원자적으로 추가
MERGE INTO target a
USING source b
ON {merge_condition}
WHEN MATCHED THEN {matched_action}
WHEN NOT MATCHED THEN {not_matched_action}


COPY INTO는 SQL 엔지니어에게 외부 시스템에서 데이터를 증분 방식으로 수집할 수 있는 멱등 옵션을 제공

JSON 파일로 작성된 원시 Kafka 데이터 샘플을 사용합니다.
각 파일에는 5초 간격 동안 사용된 모든 레코드가 포함되어 있으며, 전체 Kafka 스키마와 함께 다중 레코드 JSON 파일로 저장됩니다.
테이블 스키마는 다음과 같습니다.
| field | type | description |
|---|---|---|
| key | BINARY | user_id 필드는 키로 사용됩니다. 세션/쿠키 정보에 해당하는 고유한 영숫자 필드입니다. |
| offset | LONG | 각 파티션에 대해 단조롭게 증가하는 고유한 값입니다. |
| partition | INTEGER | 현재 Kafka 구현에서는 2개의 파티션(0과 1)만 사용합니다. |
| timestamp | LONG | 이 타임스탬프는 에포크 이후 밀리초 단위로 기록되며, 프로듀서가 파티션에 레코드를 추가하는 시간을 나타냅니다. |
| topic | STRING | Kafka 서비스는 여러 토픽을 호스팅하지만, clickstream 토픽의 레코드만 여기에 포함됩니다. |
| value | BINARY | 전체 데이터 페이로드(나중에 설명)이며, JSON 형식으로 전송됩니다. |
이 데이터를 Delta에 제대로 로드하려면 먼저 올바른 스키마를 사용하여 JSON 데이터를 추출해야 합니다.
아래 제공된 파일 경로에 있는 JSON 파일에 대한 외부 테이블을 생성합니다. 이 테이블의 이름을 events_json으로 지정하고 위의 스키마를 선언합니다.
CREATE TABLE IF NOT EXISTS events_json
(key BINARY, offset LONG, partition INTEGER, timestamp LONG, topic STRING, value BINARY)
USING JSON
OPTIONS (
path = '${da.paths.datasets}/ecommerce/raw/events-kafka/'
)
동일한 스키마를 사용하여 events_raw라는 이름의 빈 관리형 델타 테이블을 생성합니다.
CREATE TABLE events_raw
(key BINARY, offset LONG, partition INTEGER, timestamp LONG, topic STRING, value BINARY)
추출된 데이터와 Delta 테이블이 준비되면 events_json 테이블의 JSON 레코드를 새 events_raw Delta 테이블에 삽입합니다.
INSERT INTO events_raw
select * from events_json
새로운 이벤트 데이터 외에도, 나중에 사용할 제품 세부 정보를 제공하는 작은 조회 테이블도 로드해 보겠습니다.
CTAS 문을 사용하여 아래에 제공된 Parquet 디렉터리에서 데이터를 추출하는 item_lookup이라는 관리형 델타 테이블을 생성합니다.
CREATE TABLE item_lookup AS
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/item-lookup`
- 데이터세트 요약 및 null 동작 설명
- 중복 항목 검색 및 제거
- 예상 개수, 누락된 값 및 중복 레코드에 대한 데이터세트 검증
- 일반적인 변환을 적용하여 데이터 정리 및 변환
count(col)은 NULL 값을 건너뜀count(*)는 총 행 수(NULL 값만 있는 행 포함)를 계산하는 특별한 경우count_if 함수 또는 WHERE 절을 사용하여 값이 IS NULL인 레코드를 필터링하는 조건을 제공





아래 코드는 GROUP BY를 사용하여 user_id와 user_first_touch_timestamp를 기준으로 중복 레코드를 제거합니다.
max() 집계 함수는 email 열에 사용되어 여러 레코드가 있는 경우 null이 아닌 이메일을 캡처하는 방편으로 사용됩니다. 이 배치에서는 모든 updated 값이 동일했지만, 이 값을 그룹화 결과에 유지하려면 집계 함수를 사용해야 합니다.



아래 코드:
user_first_touch_timestamp를 올바른 크기 조정 및 유효한 타임스탬프로 변환합니다.regexp_extract를 사용하여 정규 표현식을 사용하여 이메일 열에서 도메인을 추출합니다.
.및:구문을 사용하여 중첩된 데이터를 쿼리- JSON 작업
- 배열 및 구조체를 평면화하고 압축 해제
- 조인 및 집합 연산자를 사용하여 데이터 세트를 결합
- 피벗 테이블을 사용하여 데이터 형태를 변경
- 고차 함수를 사용하여 배열 작업을 수행
아래에서 key와 value를 문자열로 변환하여 사람이 읽을 수 있는 형식으로 표시
CREATE OR REPLACE TEMP VIEW events_strings AS
SELECT string(key), string(value)
FROM events_raw;
SELECT * FROM events_strings

: 구문을 사용SELECT value:device, value:geo:city
FROM events_strings
from_json 함수에는 스키마가 필요SELECT value
FROM events_strings
WHERE value:event_name = "finalize"
ORDER BY key
LIMIT 1
schema_of_json 함수도 존재from_json 함수에 연결하여 value 필드를 구조체 타입으로 변환CREATE OR REPLACE TEMP VIEW parsed_events AS
SELECT from_json(value, schema_of_json('{"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1075.5,"total_item_quantity":1,"unique_items":1},"event_name":"finalize","event_previous_timestamp":1593879231210816,"event_timestamp":1593879335779563,"geo":{"city":"Houston","state":"TX"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_K","item_name":"Standard King Mattress","item_revenue_in_usd":1075.5,"price_in_usd":1195.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593454417513109,"user_id":"UA000000106116176"}')) AS json
FROM events_strings;
SELECT * FROM parsed_events
* (별표) 압축 해제를 지원CREATE OR REPLACE TEMP VIEW new_events_final AS
SELECT json.*
FROM parsed_events;
SELECT * FROM new_events_final
구조체는 .으로 접근
SELECT ecommerce.purchase_revenue_in_usd
FROM events
WHERE ecommerce.purchase_revenue_in_usd IS NOT NULL
size로 각 행의 배열 요소 개수를 계산
SELECT user_id, event_timestamp, event_name, items
FROM events
WHERE size(items) > 2
explode 함수를 사용하면 배열의 각 요소를 별도의 행에 배치 가능
SELECT user_id, event_timestamp, event_name, explode(items) AS item
FROM events
WHERE size(items) > 2
collect_set 함수는 배열 내의 필드를 포함하여 필드에 대한 고유한 값을 수집할 수 있습니다.
flatten 함수는 여러 배열을 하나의 배열로 결합할 수 있도록 합니다.
array_distinct 함수는 배열에서 중복 요소를 제거합니다.
SELECT user_id,
collect_set(event_name) AS event_history,
array_distinct(flatten(collect_set(items.item_id))) AS cart_history
FROM events
GROUP BY user_id
Spark SQL은 표준 조인 연산(inner, outer, left, right, anti, cross, semi)을 지원
CREATE OR REPLACE VIEW sales_enriched AS
SELECT *
FROM (
SELECT *, explode(items) AS item
FROM sales) a
INNER JOIN item_lookup b
ON a.item.item_id = b.item_id;
SELECT * FROM sales_enriched
Spark SQL은 UNION, MINUS, INTERSECT 집합 연산자를 지원합니다.
UNION은 두 쿼리의 집합을 반환합니다.
SELECT * FROM events
UNION
SELECT * FROM new_events_final
INTERSECT는 두 관계에서 모두 발견된 모든 행을 반환합니다.
SELECT * FROM events
INTERSECT
SELECT * FROM new_events_final
MINUS는 한 데이터세트에서는 발견되지만 다른 데이터세트에서는 발견되지 않은 모든 행을 반환
PIVOT 절은 데이터 관점에서 사용됩니다. 특정 열 값을 기준으로 집계된 값을 얻을 수 있으며, 이 값은 SELECT` 절에서 사용되는 여러 열로 변환됩니다. PIVOT` 절은 테이블 이름이나 하위 쿼리 뒤에 지정할 수 있습니다.
SELECT * FROM (): 괄호 안의 SELECT` 문은 이 테이블의 입력입니다.
PIVOT: 절의 첫 번째 인수는 집계 함수와 집계할 열입니다. 그런 다음 FOR 하위 절에서 피벗 열을 지정합니다. IN` 연산자는 피벗 열 값을 포함합니다.
평면화된 데이터 형식은 대시보드 작성에 유용할 뿐만 아니라, 추론이나 예측을 위한 머신 러닝 알고리즘 적용에도 유용합니다.
CREATE OR REPLACE TABLE transactions AS
SELECT * FROM (
SELECT
email,
order_id,
transaction_timestamp,
total_item_quantity,
purchase_revenue_in_usd,
unique_items,
item.item_id AS item_id,
item.quantity AS quantity
FROM sales_enriched
) PIVOT (
sum(quantity) FOR item_id in (
'P_FOAM_K',
'M_STAN_Q',
'P_FOAM_S',
'M_PREM_Q',
'M_STAN_F',
'M_STAN_T',
'M_PREM_K',
'M_PREM_F',
'M_STAN_K',
'M_PREM_T',
'P_DOWN_S',
'P_DOWN_K'
)
);
SELECT * FROM transactions
Spark SQL의 고차 함수를 사용하면 복잡한 데이터 유형을 직접 처리할 수 있습니다. 계층적 데이터를 처리할 때 레코드는 배열 또는 맵 유형 객체로 저장되는 경우가 많습니다. 고차 함수를 사용하면 원래 구조를 유지하면서 데이터를 변환할 수 있습니다.
고차 함수에는 다음이 포함됩니다.
FILTER는 주어진 람다 함수를 사용하여 배열을 필터링합니다.EXISTS는 배열의 하나 이상의 요소에 대해 명령문이 참인지 여부를 테스트합니다.TRANSFORM은 주어진 람다 함수를 사용하여 배열의 모든 요소를 변환합니다.REDUCE는 두 개의 람다 함수를 사용하여 배열의 요소를 버퍼에 병합하여 단일 값으로 줄이고, 최종 버퍼에 마무리 함수를 적용합니다.items 열의 모든 레코드에서 킹 사이즈가 아닌 항목을 제거합니다. FILTER` 함수를 사용하여 각 배열에서 해당 값을 제외하는 새 열을 만들 수 있습니다.
FILTER (items, i -> i.item_id LIKE "%K") AS king_items
위 명령문에서:
FILTER : 고차 함수의 이름 items : 입력 배열의 이름 i : 반복자 변수의 이름. 이 이름을 선택한 다음 람다 함수에서 사용합니다. 배열을 반복하며 각 값을 한 번에 하나씩 함수에 순환합니다.-> : 함수의 시작을 나타냅니다. i.item_id LIKE "%K" : 이것이 함수입니다. 각 값은 대문자 K로 끝나는지 확인합니다. 대문자 K로 끝나면 새 열인 king_items로 필터링됩니다.생성된 열에 빈 배열이 많이 생성되는 필터를 작성할 수 있습니다. 이 경우, 반환된 열에 비어 있지 않은 배열 값만 표시하기 위해 WHERE 절을 사용하는 것이 유용할 수 있습니다.
SELECT
order_id,
items,
FILTER (items, i -> i.item_id LIKE "%K") AS king_items
FROM sales
CREATE OR REPLACE TEMP VIEW king_size_sales AS
SELECT order_id, king_items
FROM (
SELECT
order_id,
FILTER (items, i -> i.item_id LIKE "%K") AS king_items
FROM sales)
WHERE size(king_items) > 0;
SELECT * FROM king_size_sales
내장 함수는 셀 내의 단일 단순 데이터 유형에 대해 작동하도록 설계되었으며, 배열 값을 처리할 수 없습니다. TRANSFORM은 배열의 각 요소에 기존 함수를 적용하려는 경우 특히 유용합니다.
-- get total revenue from king items per order
CREATE OR REPLACE TEMP VIEW king_item_revenues AS
SELECT
order_id,
king_items,
TRANSFORM (
king_items,
k -> CAST(k.item_revenue_in_usd * 100 AS INT)
) AS item_revenues
FROM king_size_sales;
SELECT * FROM king_item_revenues
- Databricks는 DBR 9.1부터 SQL에 기본적으로 등록된 사용자 정의 함수(UDF)에 대한 지원을 추가
- 사용자 정의 SQL 로직 조합을 데이터베이스에 함수로 등록하여 Databricks에서 SQL을 실행할 수 있는 모든 곳에서 이러한 메서드를 재사용
- SQL UDF 정의 및 등록
- SQL UDF 공유에 사용되는 보안 모델 설명
- SQL 코드에서
CASE/WHEN문 사용- 사용자 지정 제어 흐름을 위해 SQL UDF에서
CASE/WHEN문 활용
CREATE OR REPLACE FUNCTION 이름(변수이름 변수타입)
RETURNS 변수타입
RETURN 반환값
CREATE OR REPLACE FUNCTION yelling(text STRING)
RETURNS STRING
RETURN concat(upper(text), "!!!")
SELECT yelling(food) FROM foods

병렬 실행에 최적화
CREATE FUNCTION foods_i_like(food STRING)
RETURNS STRING
RETURN CASE
WHEN food = "beans" THEN "I love beans"
WHEN food = "potatoes" THEN "My favorite vegetable is potatoes"
WHEN food <> "beef" THEN concat("Do you have any good recipes for ", food ,"?")
ELSE concat("I don't eat ", food)
END;
events를 피벗하여 각 사용자의 작업 수를 계산합니다.event_name 열에 지정된 각 사용자가 특정 이벤트를 수행한 횟수를 집계하려고 합니다. 이를 위해 user_id로 그룹화하고 event_name을 피벗하여 각 이벤트 유형의 개수를 각 열에 표시합니다.
-- TODO
CREATE OR REPLACE VIEW events_pivot AS
SELECT *
FROM (
SELECT user_id as user, event_name
FROM events
)
PIVOT (
COUNT(event_name)
FOR event_name IN ('cart', 'pillows', 'login', 'main', 'careers', 'guest', 'faq', 'down', 'warranty', 'finalize',
'register', 'shipping_info', 'checkout', 'mattresses', 'add_item', 'press', 'email_coupon',
'cc_info', 'foam', 'reviews', 'original', 'delivery', 'premium')
);
SELECT * FROM events_pivot;
다음으로, events_pivot과 transactions를 조인하여 clickpaths 테이블을 만듭니다. 이 테이블에는 위에서 만든 events_pivot 테이블과 동일한 이벤트 이름 열이 있어야 하며, 그 뒤에 transactions 테이블의 열이 있어야 합니다
-- TODO
CREATE OR REPLACE VIEW clickpaths AS
SELECT
ep.*,
t.*
FROM events_pivot ep
INNER JOIN transactions t
ON ep.user = t.user_id
여기서는 고차 함수 EXISTS를 sales 테이블의 데이터와 함께 사용하여 구매한 품목이 매트리스인지 베개인지를 나타내는 부울 열 mattress와 pillow를 생성합니다.
예를 들어, items 열의 item_name이 문자열 "Mattress"로 끝나는 경우, mattress의 열 값은 true이고 pillow의 값은 false여야 합니다. 다음은 품목과 결과 값의 몇 가지 예입니다.
| items | mattress | pillow |
|---|---|---|
[{..., "item_id": "M_PREM_K", "item_name": "Premium King Mattress", ...}] | true | false |
[{..., "item_id": "P_FOAM_S", "item_name": "Standard Foam Pillow", ...}] | false | true |
[{..., "item_id": "M_STAN_F", "item_name": "Standard Full Mattress", ...}] | true | false |
-- TODO
CREATE OR REPLACE TABLE sales_product_flags AS
SELECT
items,
EXISTS(items, item -> item.item_name LIKE '%Mattress') AS mattress,
EXISTS(items, item -> item.item_name LIKE '%Pillow') AS pillow
FROM sales