Dia Egg - Shugo Chara

Data/Databricks

날짜/복합 등 함수, 쿼리최적화, databricks 데이터로드 및 시각화, vacuum

별ㅇI 2025. 6. 6. 13:14
반응형

날짜/시간 함수

from pyspark.sql.functions import col

df = spark.table("events").select("user_id", col("event_timestamp").alias("timestamp"))
display(df)

내장 함수: 날짜/시간 함수

Spark에서 날짜와 시간을 조작하는 몇 가지 내장 함수는 다음과 같습니다.

메서드 설명

add_months startDate로부터 numMonths 후의 날짜를 반환합니다.
current_timestamp 쿼리 실행 시작 시 현재 타임스탬프를 타임스탬프 열로 반환합니다.
date_format 날짜/타임스탬프/문자열을 두 번째 인수로 지정된 날짜 형식의 문자열 값으로 변환합니다.
dayofweek 주어진 날짜/타임스탬프/문자열에서 일자를 정수로 추출합니다.
from_unixtime 유닉스 시대(1970-01-01 00:00:00 UTC)의 초 수를 현재 시스템 시간대의 해당 시점의 타임스탬프를 yyyy-MM-dd HH:mm:ss 형식으로 나타내는 문자열로 변환합니다.
minute 주어진 날짜/타임스탬프/문자열에서 분을 정수로 추출합니다.
unix_timestamp 주어진 패턴을 갖는 시간 문자열을 유닉스 타임스탬프(초)로 변환합니다.

타임스탬프로 변환

cast()

문자열 표현이나 DataType을 사용하여 지정된 다른 데이터 유형으로 열을 변환합니다.

timestamp_df = df.withColumn("timestamp", (col("timestamp") / 1e6).cast("timestamp"))
display(timestamp_df)
from pyspark.sql.types import TimestampType

timestamp_df = df.withColumn("timestamp", (col("timestamp") / 1e6).cast(TimestampType()))
display(timestamp_df)

날짜 형식 지정

date_format()

날짜/타임스탬프/문자열을 주어진 날짜/시간 패턴으로 형식화된 문자열로 변환합니다.

from pyspark.sql.functions import date_format

formatted_df = (timestamp_df
                .withColumn("date string", date_format("timestamp", "MMMM dd, yyyy"))
                .withColumn("time string", date_format("timestamp", "HH:mm:ss.SSSSSS"))
               )
display(formatted_df)
from pyspark.sql.functions import year, month, dayofweek, minute, second

datetime_df = (timestamp_df
               .withColumn("year", year(col("timestamp")))
               .withColumn("month", month(col("timestamp")))
               .withColumn("dayofweek", dayofweek(col("timestamp")))
               .withColumn("minute", minute(col("timestamp")))
               .withColumn("second", second(col("timestamp")))
              )
display(datetime_df)
from pyspark.sql.functions import to_date

date_df = timestamp_df.withColumn("date", to_date(col("timestamp")))
display(date_df)
from pyspark.sql.functions import date_add

plus_2_df = timestamp_df.withColumn("plus_two_days", date_add(col("timestamp"), 2))
display(plus_2_df)

복합 타입

문자열 함수

다음은 문자열을 조작하는 데 사용할 수 있는 몇 가지 내장 함수입니다.

메서드 설명

translate src의 모든 문자를 replaceString의 문자로 변환합니다.
regexp_replace 지정된 문자열 값에서 regexp와 일치하는 모든 부분 문자열을 rep로 바꿉니다.
regexp_extract 지정된 문자열 열에서 Java 정규식과 일치하는 특정 그룹을 추출합니다.
ltrim 지정된 문자열 열에서 선행 공백 문자를 제거합니다.
lower 문자열 열을 소문자로 변환합니다.
split 주어진 패턴과 일치하는 문자열을 중심으로 str을 나눕니다.
display(df.select(split(df.email, '@', 0).alias('email_handle')))

컬렉션 함수

다음은 배열 작업에 사용할 수 있는 몇 가지 내장 함수입니다.

메서드 설명

array_contains 배열이 null이면 null을 반환하고, 배열에 값이 있으면 true를 반환하고, 그렇지 않으면 false를 반환합니다.
element_at 주어진 인덱스에 있는 배열의 요소를 반환합니다. 배열 요소는 1부터 번호가 매겨집니다.
explode 주어진 배열 또는 맵 열의 각 요소에 대해 새 행을 생성합니다.
collect_set 중복 요소가 제거된 객체 집합을 반환합니다.
mattress_df = (details_df
               .filter(array_contains(col("details"), "Mattress"))
               .withColumn("size", element_at(col("details"), 2)))
display(mattress_df)

집계 함수

다음은 일반적으로 GroupedData에서 배열을 생성하는 데 사용할 수 있는 몇 가지 내장 집계 함수입니다.

메서드 설명

collect_list 그룹 내의 모든 값으로 구성된 배열을 반환합니다.
collect_set 그룹 내의 모든 고유 값으로 구성된 배열을 반환합니다.
size_df = mattress_df.groupBy("email").agg(collect_set("size").alias("size options"))

display(size_df)

Union 및 unionByName

아래는 두 데이터프레임에 **union**이 적합한 일치하는 스키마가 있는지 확인하는 코드입니다.

mattress_df.schema==size_df.schema

간단한 select 문으로 두 스키마를 일치시킬 수 있다면, **union**을 사용할 수 있습니다.

union_count = mattress_df.select("email").union(size_df.select("email")).count()

mattress_count = mattress_df.count()
size_count = size_df.count()

mattress_count + size_count == union_count

추가 함수

비집계 함수 및 기타 함수

메서드 설명

col / column 주어진 열 이름을 기반으로 열을 반환합니다.
lit 리터럴 값의 열을 생성합니다.
isnull 열이 null이면 true를 반환합니다.
rand [0.0, 1.0]에 균등하게 분포하는 독립적이고 동일 분포(i.i.d.) 샘플을 갖는 난수 열을 생성합니다.
display(gmail_accounts.select("email", lit(True).alias("gmail user")))

메서드 설명

drop 열의 선택적 하위 집합을 고려하여 null 값이 있는 행을 일부, 전체 또는 지정된 개수만큼 제외하고 새 DataFrame을 반환합니다.
fill 열의 선택적 하위 집합에 대해 null 값을 지정된 값으로 바꿉니다.
replace 열의 선택적 하위 집합을 고려하여 값을 다른 값으로 바꾸고 새 DataFrame을 반환합니다.

items.coupon과 같은 열에서 null을 찾으려면 항목을 분해해야 합니다.

sales_exploded_df = sales_df.withColumn("items", explode(col("items")))
display(sales_exploded_df.select("items.coupon"))
print(sales_exploded_df.select("items.coupon").count())
print(sales_exploded_df.select("items.coupon").na.drop().count())

누락된 쿠폰 코드는 **na.fill**을 사용하여 채울 수 있습니다.

display(sales_exploded_df.select("items.coupon").na.fill("NO COUPON"))

DataFrame 결합

DataFrame의 join 메서드는 주어진 조인 표현식을 기반으로 두 DataFrame을 결합합니다.

df1.join(df2, "name")

"name"과 "age"라는 공유 열의 값이 같은 경우를 기준으로 하는 내부 조인

df1.join(df2, ["name", "age"])

"name"이라는 공유 열의 값이 같은 경우를 기준으로 하는 전체 외부 조인

df1.join(df2, "name", "outer")

명시적 열 표현식을 기준으로 하는 왼쪽 외부 조인

df1.join(df2, df1["customer_name"] == df2["account_name"], "left_outer")

joined_df = gmail_accounts.join(other=users_df, on='email', how = "inner")
display(joined_df)

쿼리 최적화

구분 논리적 계획 물리적 계획

목적 처리할 작업의 논리적 구조 정의 클러스터에서의 구체적 실행 전략
최적화 대상 데이터 변환 절차 실행 시간, 리소스 사용량
생성 주체 Spark SQL 파서 Catalyst Optimizer + Cost Model
변환 규칙 규칙 기반 최적화 비용 기반 최적화
실행 단계 변환만 정의 (실행 X) 실제 실행 코드 생성

논리적 최적화

  • *explain(..)*은 쿼리 계획을 출력하며, 선택적으로 지정된 설명 모드에 따라 형식이 지정됩니다. 다음 논리적 계획과 물리적 계획을 비교하면서 Catalyst가 여러 filter 변환을 어떻게 처리했는지 확인하세요.
from pyspark.sql.functions import col

limit_events_df = (df
                   .filter(col("event_name") != "reviews")
                   .filter(col("event_name") != "checkout")
                   .filter(col("event_name") != "register")
                   .filter(col("event_name") != "email_coupon")
                   .filter(col("event_name") != "cc_info")
                   .filter(col("event_name") != "delivery")
                   .filter(col("event_name") != "shipping_info")
                   .filter(col("event_name") != "press")
                  )

limit_events_df.explain(True)

better_df = (df
             .filter((col("event_name").isNotNull()) &
                     (col("event_name") != "reviews") &
                     (col("event_name") != "checkout") &
                     (col("event_name") != "register") &
                     (col("event_name") != "email_coupon") &
                     (col("event_name") != "cc_info") &
                     (col("event_name") != "delivery") &
                     (col("event_name") != "shipping_info") &
                     (col("event_name") != "press"))
            )

better_df.explain(True)

캐싱

기본적으로 DataFrame의 데이터는 쿼리 처리 중에만 Spark 클러스터에 존재하며, 이후 클러스터에 자동으로 저장되지 않습니다. (Spark는 데이터 저장 시스템이 아니라 데이터 처리 엔진입니다.) Spark의 cache 메서드를 호출하여 클러스터에 DataFrame을 저장하도록 명시적으로 요청할 수 있습니다.

조건자 푸시다운

다음은 JDBC 소스에서 읽어오는 예제입니다. Catalyst는 조건자 푸시다운이 발생할 수 있다고 판단합니다.

%scala
// Ensure that the driver class is loaded
Class.forName("org.postgresql.Driver")
jdbc_url = "jdbc:postgresql://server1.training.databricks.com/training"

# Username and Password w/read-only rights
conn_properties = {
    "user" : "training",
    "password" : "training"
}

pp_df = (spark
         .read
         .jdbc(url=jdbc_url,                 # the JDBC URL
               table="training.people_1m",   # the name of the table
               column="id",                  # the name of a column of an integral type that will be used for partitioning
               lowerBound=1,                 # the minimum value of columnName used to decide partition stride
               upperBound=1000000,           # the maximum value of columnName used to decide partition stride
               numPartitions=8,              # the number of partitions/connections
               properties=conn_properties    # the connection properties
              )
         .filter(col("gender") == "M")   # Filter the data by gender
        )

pp_df.explain(True)

Scan에 Filter가 없고 PushedFilters가 있다는 점에 유의하세요. 필터 작업은 데이터베이스에 푸시되고 일치하는 레코드만 Spark로 전송됩니다. 이렇게 하면 Spark가 수집해야 하는 데이터 양을 크게 줄일 수 있습니다.

조건문 푸시다운 없음

반면에, 필터링 전에 데이터를 캐싱하면 조건문 푸시다운이 발생할 가능성이 없어집니다.

cached_df = (spark
            .read
            .jdbc(url=jdbc_url,
                  table="training.people_1m",
                  column="id",
                  lowerBound=1,
                  upperBound=1000000,
                  numPartitions=8,
                  properties=conn_properties
                 )
            )

cached_df.cache()
filtered_df = cached_df.filter(col("gender") == "M")

filtered_df.explain(True)

이전 예제에서 보았던 Scan(JDBC 읽기) 외에도, 여기서는 설명 계획에서 Filter가 뒤따르는 InMemoryTableScan도 볼 수 있습니다.

데이터, 그래프로 시각화 해보기

노트북: Explore data with Spark

# Clear the plot area
plt.clf()
# Create a figure for 2 subplots (1 row, 2 columns)
fig, ax = plt.subplots(1, 2, figsize= (10,4))
# Create a bar plot of revenue by year on the first axis
ax[0].bar(x=df_sales['OrderYear'], height=df_sales['GrossRevenue'], color='orange')
ax[0].set_title('Revenue by Year')
# Create a pie chart of yearly order counts on the second axis
yearly_counts= df_sales['OrderYear'].value_counts()
ax[1].pie(yearly_counts)
ax[1].set_title('Orders per Year')
ax[1].legend(yearly_counts.keys().tolist())
fig.suptitle('Sales Data')
plt.show()

 # Clear the plot area
 plt.clf()
 # Create a bar chart
 ax=sns.lineplot(x='OrderYear', y='GrossRevenue', data=df_sales)
 plt.show()

Databricks의 스키마 및 테이블

스키마

CREATE SCHEMA IF NOT EXISTS \\
${da.schema_name}_custom_location \\
LOCATION '${da.paths.working_dir}/${da.schema_name}_custom_location.db';
%python
print(DA.paths.working_dir, DA.schema_name)

관리형 테이블

USE ${da.schema_name}_default_location;

CREATE OR REPLACE TABLE managed_table (width INT, length INT, height INT);
INSERT INTO managed_table 
VALUES (3, 2, 1);
SELECT * FROM managed_table;

기본적으로, 지정된 위치가 없는 스키마의 관리되는 테이블은 dbfs:/user/hive/warehouse/.db/` 디렉터리에 생성됩니다.

%python 
tbl_location = spark.sql(f"DESCRIBE DETAIL managed_table").first().location
# tbl_location = dbfs:/user/hive/warehouse/santiago_ortiz_be24_da_delp_default_location.db/managed_table
print(tbl_location)

files = dbutils.fs.ls(tbl_location)
display(files)

외부 테이블

USE ${da.schema_name}_default_location;

CREATE OR REPLACE TEMPORARY VIEW temp_delays USING CSV OPTIONS (
  path = '${da.paths.datasets}/flights/departuredelays.csv',
  header = "true",
  mode = "FAILFAST" -- 잘못된 줄이 발견되면 RuntimeException으로 파일 구문 분석을 중단합니다.
);
CREATE OR REPLACE TABLE external_table LOCATION '${da.paths.working_dir}/external_table' AS
  SELECT * FROM temp_delays;

SELECT * FROM external_table;
DESCRIBE TABLE EXTENDED external_table; 

델타 테이블 설정

Create Table as Select (CTAS)

CREATE OR REPLACE TABLE sales AS
SELECT * FROM parquet.`${DA.paths.datasets}/ecommerce/raw/sales-historical`;

DESCRIBE EXTENDED sales;

CTAS 문은 쿼리 결과에서 스키마 정보를 자동으로 유추하며, 수동 스키마 선언은 지원하지 않습니다.

즉, CTAS 문은 Parquet 파일 및 테이블과 같이 스키마가 명확하게 정의된 소스에서 외부 데이터를 수집하는 데 유용합니다.

CTAS 문은 추가 파일 옵션 지정도 지원하지 않습니다.

CREATE OR REPLACE TEMP VIEW sales_tmp_vw
  (order_id LONG, email STRING, transactions_timestamp LONG, total_item_quantity INTEGER, purchase_revenue_in_usd DOUBLE, unique_items INTEGER, items STRING)
USING CSV
OPTIONS (
  path = "${da.paths.datasets}/ecommerce/raw/sales-csv",
  header = "true",
  delimiter = "|"
);

CREATE TABLE sales_delta AS
  SELECT * FROM sales_tmp_vw;
  
SELECT * FROM sales_delta

기존 테이블의 열 필터링 및 이름 바꾸기

CREATE OR REPLACE TABLE purchases AS
SELECT order_id AS id, transaction_timestamp, purchase_revenue_in_usd AS price
FROM sales;

SELECT * FROM purchases

생성된 열로 스키마 선언

CREATE OR REPLACE TABLE purchase_dates (
  id STRING, 
  transaction_timestamp STRING, 
  price STRING,
  date DATE GENERATED ALWAYS AS (
    cast(cast(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE))
    COMMENT "generated based on `transactions_timestamp` column")

테이블 제약 조건 추가

ALTER TABLE purchase_dates ADD CONSTRAINT valid_date CHECK (date > '2020-01-01');

추가 옵션 및 메타데이터로 테이블 강화

SELECT 절은 파일 수집에 유용한 두 가지 내장 Spark SQL 명령을 활용합니다.

  • *current_timestamp()*는 로직이 실행될 때의 타임스탬프를 기록합니다.
  • *input_file_name()*은 테이블의 각 레코드에 대한 소스 데이터 파일을 기록합니다.

CREATE TABLE 절에는 여러 옵션이 포함되어 있습니다.

  • 테이블 내용을 더 쉽게 검색할 수 있도록 **COMMENT*가 추가되었습니다.
  • *LOCATION*이 지정되어 관리형 테이블이 아닌 외부 테이블이 생성됩니다.
  • 테이블은 날짜 열로 **PARTITIONED BY*됩니다. 즉, 각 데이터의 데이터는 대상 저장소 위치의 자체 디렉터리에 저장됩니다.
CREATE OR REPLACE TABLE users_pii
COMMENT "Contains PII"
LOCATION "${da.paths.working_dir}/tmp/users_pii"
PARTITIONED BY (first_touch_date)
AS
  SELECT *, 
    cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date, 
    current_timestamp() updated,
    input_file_name() source_file
  FROM parquet.`${da.paths.datasets}/ecommerce/raw/users-historical/`;
  
SELECT * FROM users_pii;

델타 레이크 테이블 복제

**DEEP CLONE**은 소스 테이블의 데이터와 메타데이터를 타겟 테이블로 완전히 복사합니다.

CREATE OR REPLACE TABLE purchases_clone
DEEP CLONE purchases

모든 데이터 파일을 복사해야 하므로 대용량 데이터 세트의 경우 시간이 꽤 오래 걸릴 수 있습니다.

현재 테이블을 수정할 위험 없이 변경 사항을 적용해 보기 위해 테이블의 복사본을 빠르게 생성하려면 **SHALLOW CLONE**가 좋은 선택이 될 수 있습니다.

CREATE OR REPLACE TABLE purchases_shallow_clone
SHALLOW CLONE purchases

Delta Lake에 데이터 로드하기

완전 덮어쓰기

  • 테이블 덮어쓰기는 디렉터리를 재귀적으로 나열하거나 파일을 삭제할 필요가 없기 때문에 훨씬 빠릅니다.
  • 이전 버전의 테이블이 여전히 존재하므로 시간 여행을 통해 이전 데이터를 쉽게 검색할 수 있습니다.
  • 원자적 연산입니다. 테이블을 삭제하는 동안에도 동시 쿼리가 테이블을 읽을 수 있습니다.
  • ACID 트랜잭션 보장으로 인해 테이블 덮어쓰기가 실패하더라도 테이블은 이전 상태로 유지됩니다.

CREATE OR REPLACE TABLE(CRAS) 문은 실행될 때마다 테이블의 내용을 완전히 바꿉니다.

CREATE OR REPLACE TABLE events AS
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/events-historical`

INSERT OVERWRITE:

  • 기존 테이블만 덮어쓸 수 있으며, CRAS 문처럼 새 테이블을 만들 수는 없습니다.
  • *INSERT OVERWRITE*는 위와 거의 동일한 결과를 제공합니다. 대상 테이블의 데이터가 쿼리의 데이터로 대체됩니다.
  • 현재 테이블 스키마와 일치하는 새 레코드로만 덮어쓸 수 있습니다. 따라서 다운스트림 소비자를 방해하지 않고 기존 테이블을 덮어쓸 수 있는 "더 안전한" 기법입니다.
  • 개별 파티션을 덮어쓸 수 있습니다.
INSERT OVERWRITE sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-historical/`
DESCRIBE HISTORY sales

행 추가

  • *INSERT INTO*를 사용하여 기존 Delta 테이블에 새로운 행을 원자적으로 추가할 수 있습니다. 이렇게 하면 기존 테이블에 증분 업데이트를 적용할 수 있으므로 매번 덮어쓰는 것보다 훨씬 효율적입니다.
INSERT INTO sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-30m`

업데이트 병합

  • *MERGE*의 주요 이점:
  • 업데이트, 삽입 및 삭제가 단일 트랜잭션으로 완료됩니다.
  • 일치하는 필드 외에도 여러 조건을 추가할 수 있습니다.
  • 사용자 지정 로직을 구현하기 위한 광범위한 옵션을 제공합니다.
MERGE INTO users a
USING users_update b
ON a.user_id = b.user_id
WHEN MATCHED AND a.email IS NULL AND b.email IS NOT NULL THEN
  UPDATE SET email = b.email, updated = b.updated
WHEN NOT MATCHED THEN INSERT *

중복 제거를 위한 삽입 전용 병합

MERGE INTO events a
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = 'email' THEN 
  INSERT *

증분 로드

COPY INTO

  • "동일한 파일을 여러 번 적재해도 중복 없이 항상 같은 결과가 보장된다"
  • "이미 적재된 파일은 자동으로 건너뛰고, 새 파일만 적재된다"
  • 는 뜻입니다.
COPY INTO sales
FROM "${da.paths.datasets}/ecommerce/raw/sales-30m"
FILEFORMAT = PARQUET

Delta Lake의 버전 관리, 최적화, VACUUM

디렉토리에는 여러 개의 Parquet 데이터 파일과 **_delta_log**라는 디렉터리가 있습니다.

Delta Lake 테이블의 레코드는 Parquet 파일에 데이터로 저장됩니다.

Delta Lake 테이블에 대한 트랜잭션은 **_delta_log**에 기록됩니다.

%python
display(dbutils.fs.ls(f"{DA.paths.user_db}/students/_delta_log"))

%python
display(spark.sql(f"SELECT * FROM json.`{DA.paths.user_db}/students/_delta_log/00000000000000000007.json`"))

add 열에는 테이블에 새로 작성된 모든 파일 목록이 포함됩니다. remove` 열은 테이블에 더 이상 포함되지 않아야 할 파일을 나타냅니다.

작은 파일 압축 및 인덱싱

OPTIMIZE 명령을 사용하면 파일이 테이블 크기에 따라 크기가 조정되는 최적의 크기로 결합됩니다. **OPTIMIZE**를 실행할 때 사용자는 선택적으로 ZORDER 인덱싱을 위해 하나 또는 여러 개의 필드를 지정할 수 있습니다. Z-order의 구체적인 계산 방식은 중요하지 않지만, 제공된 필드를 기준으로 필터링할 때 데이터 파일 내에서 유사한 값을 가진 데이터를 함께 배치하여 데이터 검색 속도를 높입니다.

OPTIMIZE students
ZORDER BY id
SELECT * 
FROM students VERSION AS OF 3

롤백 버전

RESTORE TABLE students TO VERSION AS OF 8 

오래된 파일 정리

-- VACUUM students RETAIN 0 HOURS

기본적으로 **VACUUM**은 7일 미만의 파일은 삭제하지 못하도록 하여 장기 실행 작업이 삭제할 파일을 참조하지 않도록 합니다.

SET spark.databricks.delta.retentionDurationCheck.enabled = false;
SET spark.databricks.delta.vacuum.logging.enabled = true;

VACUUM students RETAIN 0 HOURS DRY RUN
VACUUM students RETAIN 0 HOURS
반응형