Data/Databricks

cleaning, 중첩데이터, udfs, 함수s, read&write

별ㅇI 2025. 6. 6. 13:04
반응형

Cleaning Data

누락된 데이터 검사

SELECT count_if(email IS NULL) FROM users_dirty;
SELECT count(*) FROM users_dirty WHERE email IS NULL;]
%python 
from pyspark.sql.functions import col
usersDF = spark.read.table("users_dirty")

usersDF.selectExpr("count_if(email IS NULL)")
usersDF.where(col("email").isNull()).count()

중복 행 제거

SELECT DISTINCT(*) FROM users_dirty
%python
usersDF.distinct().display()

특정 열 기준 중복 행 제거

CREATE OR REPLACE TEMP VIEW deduped_users AS 
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;

SELECT count(*) FROM deduped_users
%python
from pyspark.sql.functions import max
dedupedDF = (usersDF
    .where(col("user_id").isNotNull())
    .groupBy("user_id", "user_first_touch_timestamp")
    .agg(max("email").alias("email"), 
         max("updated").alias("updated"))
    )

dedupedDF.count()

데이터 셋 검증

고유한 user_id를 가지고 있는지 확인합니다.

CREATE OR REPLACE TEMP VIEW deduped_users AS 
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;

SELECT count(*) FROM deduped_users
%python
from pyspark.sql.functions import max
dedupedDF = (usersDF
    .where(col("user_id").isNotNull())
    .groupBy("user_id", "user_first_touch_timestamp")
    .agg(max("email").alias("email"), 
         max("updated").alias("updated"))
    )

dedupedDF.count()

각 이메일이 최대 하나의 user_id와 연결되어 있는지 확인합니다.

SELECT max(user_id_count) <= 1 at_most_one_id FROM (
  SELECT email, count(user_id) AS user_id_count
  FROM deduped_users
  WHERE email IS NOT NULL
  GROUP BY email)

%python

display(dedupedDF
    .where(col("email").isNotNull())
    .groupby("email")
    .agg(count("user_id").alias("user_id_count"))
    .select((max("user_id_count") <= 1).alias("at_most_one_id")))

날짜 형식 및 정규 표현식

SELECT *, 
  date_format(first_touch, "MMM d, yyyy") AS first_touch_date,
  date_format(first_touch, "HH:mm:ss") AS first_touch_time,
  regexp_extract(email, "(?<=@).+", 0) AS email_domain
FROM (
  SELECT *,
    CAST(user_first_touch_timestamp / 1e6 AS timestamp) AS first_touch 
  FROM deduped_users
)
%python
from pyspark.sql.functions import date_format, regexp_extract

display(dedupedDF
    .withColumn("first_touch", (col("user_first_touch_timestamp") / 1e6).cast("timestamp"))
    .withColumn("first_touch_date", date_format("first_touch", "MMM d, yyyy"))
    .withColumn("first_touch_time", date_format("first_touch", "HH:mm:ss"))
    .withColumn("email_domain", regexp_extract("email", "(?<=@).+", 0))
)

Complex Transformations

데이터 읽어오기

CREATE OR REPLACE TEMP VIEW events_strings AS 
SELECT string(key), string(value) FROM events_raw;

SELECT * FROM events_strings
%python
from pyspark.sql.functions import col

events_stringsDF = (spark
    .table("events_raw")
    .select(col("key").cast("string"), 
            col("value").cast("string"))
    )
display(events_stringsDF)

중첩 데이터 작업

  • JSON 문자열의 하위 필드에 액세스하려면 쿼리에서 : 구문을 사용하세요.
  • 구조체 타입의 하위 필드에 액세스하려면 쿼리에서 . 구문을 사용하세요.

(중첩데이터 예)

key value

UA00000010645… {"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1047.6,"total_item_quantity":2,"unique_items":2},"event_name":"finalize","event_previous_timestamp…..
SELECT * FROM events_strings 
WHERE value:event_name = "finalize" 
ORDER BY key LIMIT 1
%python
display(events_stringsDF
    .where("value:event_name = 'finalize'")
    .orderBy("key")
    .limit(1)
)

JSON 문자열을 구조체 유형으로 압축 해제한 후, 모든 구조체 필드를 열로 압축 해제하고 평면화

  • *schema_of_json()*은 예제 JSON 문자열에서 도출된 스키마를 반환합니다.
  • *from_json()*은 지정된 스키마를 사용하여 JSON 문자열이 포함된 열을 구조체 유형으로 구문 분석합니다.
SELECT schema_of_json('{"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1075.5,"total_item_quantity":1,"unique_items":1},"event_name":"finalize","event_previous_timestamp":1593879231210816,"event_timestamp":1593879335779563,"geo":{"city":"Houston","state":"TX"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_K","item_name":"Standard King Mattress","item_revenue_in_usd":1075.5,"price_in_usd":1195.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593454417513109,"user_id":"UA000000106116176"}') 
AS schema

CREATE OR REPLACE TEMP VIEW parsed_events AS SELECT json.* FROM (
SELECT from_json(value, 'STRUCT<device: STRING, ecommerce: STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name: STRING, event_previous_timestamp: BIGINT, event_timestamp: BIGINT, geo: STRUCT<city: STRING, state: STRING>, items: ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source: STRING, user_first_touch_timestamp: BIGINT, user_id: STRING>') AS json 
FROM events_strings);

SELECT * FROM parsed_events
%python
from pyspark.sql.functions import from_json, schema_of_json

json_string = """
{"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1047.6,"total_item_quantity":2,"unique_items":2},"event_name":"finalize","event_previous_timestamp":1593879787820475,"event_timestamp":1593879948830076,"geo":{"city":"Huntington Park","state":"CA"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_Q","item_name":"Standard Queen Mattress","item_revenue_in_usd":940.5,"price_in_usd":1045.0,"quantity":1},{"coupon":"NEWBED10","item_id":"P_DOWN_S","item_name":"Standard Down Pillow","item_revenue_in_usd":107.10000000000001,"price_in_usd":119.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593583891412316,"user_id":"UA000000106459577"}
"""
parsed_eventsDF = (events_stringsDF
    .select(from_json("value", schema_of_json(json_string)).alias("json"))
    .select("json.*")
)

display(parsed_eventsDF)

배열 조작

  • *explode()*는 배열의 요소를 여러 행으로 분리합니다. 각 요소에 대해 새 행을 생성합니다.
  • *size()*는 각 행에 대한 배열의 요소 개수를 계산합니다.
CREATE OR REPLACE TEMP VIEW exploded_events AS
SELECT *, explode(items) AS item
FROM parsed_events;

SELECT * FROM exploded_events WHERE size(items) > 2
%python
from pyspark.sql.functions import explode, size

exploded_eventsDF = (parsed_eventsDF
    .withColumn("item", explode("items"))
)

display(exploded_eventsDF.where(size("items") > 2))

  • *collect_set()*은 배열 내의 필드를 포함하여 필드에 대한 고유한 값을 수집합니다.
  • *flatten()*은 여러 배열을 하나의 배열로 결합합니다.
  • *array_distinct()*는 배열에서 중복 요소를 제거합니다.
SELECT user_id,
  collect_set(event_name) AS event_history,
  array_distinct(flatten(collect_set(items.item_id))) AS cart_history
FROM exploded_events
GROUP BY user_id
%python

from pyspark.sql.functions import array_distinct, collect_set, flatten

display(exploded_eventsDF
    .groupby("user_id")
    .agg(collect_set("event_name").alias("event_history"),
            array_distinct(flatten(collect_set("items.item_id"))).alias("cart_history"))
)

데이터 결합 및 재구성

테이블 조인

CREATE OR REPLACE TEMP VIEW item_purchases AS

SELECT * 
FROM (SELECT *, explode(items) AS item FROM sales) a
INNER JOIN item_lookup b
ON a.item.item_id = b.item_id;

SELECT * FROM item_purchases
%python
exploded_salesDF = (spark
    .table("sales")
    .withColumn("item", explode("items"))
)

itemsDF = spark.table("item_lookup")

item_purchasesDF = (exploded_salesDF
    .join(itemsDF, exploded_salesDF.item.item_id == itemsDF.item_id)
)

display(item_purchasesDF)

피봇 테이블

다음 코드 셀은 PIVOT을 사용하여 sales 데이터 세트에서 파생된 여러 필드에 포함된 품목 구매 정보를 평면화합니다.

SELECT *
FROM item_purchases
PIVOT (
  sum(item.quantity) FOR item_id IN (
    'P_FOAM_K',
    'M_STAN_Q',
    'P_FOAM_S',
    'M_PREM_Q',
    'M_STAN_F',
    'M_STAN_T',
    'M_PREM_K',
    'M_PREM_F',
    'M_STAN_K',
    'M_PREM_T',
    'P_DOWN_S',
    'P_DOWN_K')
)
%python
transactionsDF = (item_purchasesDF
    .groupBy("order_id", 
        "email",
        "transaction_timestamp", 
        "total_item_quantity", 
        "purchase_revenue_in_usd", 
        "unique_items",
        "items",
        "item",
        "name",
        "price")
    .pivot("item_id")
    .sum("item.quantity")
)
display(transactionsDF)

SQL UDFs

Spark SQL의 사용자 정의 함수(UDF)를 사용하면 사용자 지정 SQL 로직을 데이터베이스에 함수로 등록하여 Databricks에서 SQL을 실행할 수 있는 모든 곳에서 재사용할 수 있습니다.

CREATE OR REPLACE FUNCTION sale_announcement(item_name STRING, item_price INT)
RETURNS STRING
RETURN concat("The ", item_name, " is on sale for $", round(item_price * 0.8, 0));

SELECT *, sale_announcement(name, price) AS message FROM item_lookup
DESCRIBE FUNCTION sale_announcement
DESCRIBE FUNCTION EXTENDED sale_announcement

SQL UDF를 CASE / WHEN 절 형태의 제어 흐름과 결합하면 SQL 워크로드 내에서 제어 흐름 실행을 최적화할 수 있습니다.

CREATE OR REPLACE FUNCTION item_preference(name STRING, price INT)
RETURNS STRING
RETURN CASE 
  WHEN name = "Standard Queen Mattress" THEN "This is my default mattress"
  WHEN name = "Premium Queen Mattress" THEN "This is my favorite mattress"
  WHEN price > 100 THEN concat("I'd wait until the ", name, " is on sale for $", round(price * 0.8, 0))
  ELSE concat("I don't need a ", name)
END;

SELECT *, item_preference(name, price) FROM item_lookup

Python UDFs

def first_letter_function(email):
    return email[0]

first_letter_function("annagray@kaufman.com")
first_letter_udf = udf(first_letter_function)

데코레이터 구문 사용(Python 전용)

Python 데코레이터 구문을 사용하여 UDF를 정의하고 등록할 수 있습니다.

@udf 데코레이터 매개변수는 함수가 반환하는 Column 데이터 유형입니다.

# Our input/output is a string
@udf("string")
def first_letter_udf(email: str) -> str:
    return email[0]
from pyspark.sql.functions import col

sales_df = spark.table("sales")
display(sales_df.select(first_letter_udf(col("email"))))

Pandas/벡터화된 UDF

Pandas UDF는 UDF의 효율성을 향상시키기 위해 Python에서 제공됩니다. Pandas UDF는 Apache Arrow를 활용하여 계산 속도를 높입니다.

import pandas as pd
from pyspark.sql.functions import pandas_udf

# We have a string input/output
@pandas_udf("string")
def vectorized_udf(email: pd.Series) -> pd.Series:
    return email.str[0]

# AlternativelyA
# def vectorized_udf(email: pd.Series) -> pd.Series:
#     return email.str[0]
# vectorized_udf = pandas_udf(vectorized_udf, "string")

이러한 Pandas UDF를 SQL 네임스페이스에 등록할 수 있습니다.

spark.udf.register("sql_vectorized_udf", vectorized_udf)
%sql
-- Use the Pandas UDF from SQL
SELECT sql_vectorized_udf(email) AS firstLetter FROM sales

Spark SQL의 고차함수

Spark SQL의 고차 함수를 사용하면 배열이나 맵 타입 객체와 같은 복잡한 데이터 타입을 원래 구조를 유지하면서 변환할 수 있습니다. 다음은 그 예입니다.

  • FILTER()는 주어진 람다 함수를 사용하여 배열을 필터링합니다.
  • EXIST()는 배열의 하나 이상의 요소에 대해 특정 명령문이 참인지 여부를 테스트합니다.
  • TRANSFORM()은 주어진 람다 함수를 사용하여 배열의 모든 요소를 변환합니다.
  • REDUCE()는 두 개의 람다 함수를 사용하여 배열의 요소를 버퍼에 병합하여 단일 값으로 축소하고, 최종 버퍼에 마무리 함수를 적용합니다.

FILTER()

SELECT * FROM (
  SELECT
    order_id,
    FILTER (items, i -> i.item_id LIKE "%K") AS king_items
  FROM sales)
WHERE size(king_items) > 0
  • FILTER: 고차 함수의 이름
  • items: 입력 배열의 이름
  • i: 반복자 변수의 이름. 이 이름을 선택한 다음 람다 함수에서 사용합니다. 배열을 반복하며 각 값을 한 번에 하나씩 함수로 순환합니다.
  • > : 함수의 시작을 나타냅니다.
  • i.item_id LIKE "%K" : 이것이 함수입니다. 각 값은 대문자 K로 끝나는지 확인합니다. 대문자 K로 끝나면 새 열인 **king_items*로 필터링됩니다.

TRANSFORM()

SELECT *,
  TRANSFORM (
    items, i -> CAST(i.item_revenue_in_usd * 100 AS INT)
  ) AS item_revenues
FROM sales

EXIST()

CREATE OR REPLACE TABLE sales_product_flags AS
SELECT
    items,
    EXISTS(items, i -> i.item_name LIKE '%Mattress') AS mattress,
    EXISTS(items, i -> i.item_name LIKE '%Pillow') AS pillow
FROM sales;

createOrReplaceTempView

createOrReplaceTempView는 DataFrame을 기반으로 임시 뷰를 생성합니다. 임시 뷰의 수명은 DataFrame을 생성하는 데 사용된 SparkSession에 연결됩니다.

budget_df.createOrReplaceTempView("budget")
display(spark.sql("SELECT * FROM budget"))

열 표현식

from pyspark.sql.functions import col

print(events_df.device)
print(events_df["device"])
print(col("device"))

Scala는 DataFrame의 기존 열을 기반으로 새 열을 생성하는 추가 구문을 지원합니다.

%scala
$"device"

열 연산자 및 메서드

메서드 설명

*, + , <, >= 수학 및 비교 연산자
==, != 같음 및 같지 않음 테스트(Scala 연산자는 === 및 **=!=**입니다)
alias 열에 별칭을 지정합니다
cast, astype 열을 다른 데이터 유형으로 변환합니다
isNull, isNotNull, isNan null인지, null이 아닌지, NaN인지
asc, desc 열의 오름차순/내림차순 정렬 표현식을 반환합니다
rev_df = (events_df
         .filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
         .withColumn("purchase_revenue", (col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
         .withColumn("avg_purchase_revenue", col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))
         .sort(col("avg_purchase_revenue").desc())
        )

display(rev_df)

DataFrame 변환 메서드

메서드 설명

select 각 요소에 대해 주어진 표현식을 계산하여 새 DataFrame을 반환합니다.
drop 열을 삭제한 새 DataFrame을 반환합니다.
withColumnRenamed 열 이름이 변경된 새 DataFrame을 반환합니다.
withColumn 열을 추가하거나 이름이 같은 기존 열을 대체하여 새 DataFrame을 반환합니다.
filterwhere 주어진 조건을 사용하여 행을 필터링합니다.
sortorderBy 주어진 표현식으로 정렬된 새 DataFrame을 반환합니다.
dropDuplicatesdistinct 중복 행을 제거한 새 DataFrame을 반환합니다.
limit 처음 n개 행을 가져와 새 DataFrame을 반환합니다.
groupBy 지정된 열을 사용하여 DataFrame을 그룹화하여 해당 열에 대한 집계를 실행할 수 있습니다.
apple_df = events_df.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
display(apple_df)

anonymous_df = events_df.drop("user_id", "geo", "device")
display(anonymous_df)
no_sales_df = events_df.drop(col("ecommerce"))
display(no_sales_df)
mobile_df = events_df.withColumn("mobile", col("device").isin("iOS", "Android"))
display(mobile_df)

dropDuplicates()

중복 행을 제거한 새 DataFrame을 반환합니다. 선택적으로 일부 열만 고려합니다.

별칭: distinct

display(events_df.distinct())
distinct_users_df = events_df.dropDuplicates(["user_id"])
display(distinct_users_df)

sort()

주어진 열 또는 표현식을 기준으로 정렬된 새 DataFrame을 반환합니다.

별칭: orderBy

increase_timestamps_df = events_df.sort("event_timestamp")
display(increase_timestamps_df)
decrease_timestamp_df = events_df.sort(col("event_timestamp").desc())
display(decrease_timestamp_df)
increase_sessions_df = events_df.orderBy(["user_first_touch_timestamp", "event_timestamp"])
display(increase_sessions_df)
decrease_sessions_df = events_df.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
display(decrease_sessions_df)

내장 함수

DataFrame 및 Column 변환 메서드 외에도 Spark의 내장 SQL 함수 모듈에는 유용한 함수가 많이 있습니다.

Scala에서는 org.apache.spark.sql.functions이고, Python에서는 pyspark.sql.functions입니다. 이 모듈의 함수는 코드로 가져와야 합니다.

집계 함수

다음은 집계에 사용할 수 있는 몇 가지 기본 제공 함수입니다.

메서드 설명

approx_count_distinct 그룹 내 고유 항목의 대략적인 개수를 반환합니다.
avg 그룹 내 값의 평균을 반환합니다.
collect_list 중복된 객체 목록을 반환합니다.
corr 두 숫자형 열의 상관계수를 반환합니다.
max 각 그룹의 각 숫자 열에 대한 최댓값을 계산합니다.
mean 각 그룹의 각 숫자 열에 대한 평균값을 계산합니다.
stddev_samp 그룹 내 표현식의 표본 표준 편차를 반환합니다.
sumDistinct 표현식 내 고유 값의 합계를 반환합니다.
var_pop 그룹 내 값의 모분산을 반환합니다.

그룹화된 데이터 메서드 agg를 사용하여 내장 집계 함수를 적용합니다.

이렇게 하면 결과 열에 alias와 같은 다른 변환을 적용할 수 있습니다.

from pyspark.sql.functions import avg, approx_count_distinct

state_aggregates_df = (df
                       .groupBy("geo.state")
                       .agg(avg("ecommerce.total_item_quantity").alias("avg_quantity"),
                            approx_count_distinct("user_id").alias("distinct_users"))
                      )

display(state_aggregates_df)

수학 함수

다음은 수학 연산을 위한 내장 함수입니다.

메서드 설명

ceil 주어진 열의 상한값을 계산합니다.
cos 주어진 값의 코사인을 계산합니다.
log 주어진 값의 자연 로그를 계산합니다.
round HALF_UP 반올림 모드를 사용하여 e 열의 값을 소수점 이하 0자리로 반올림하여 반환합니다.
sqrt 지정된 부동 소수점 값의 제곱근을 계산합니다.
from pyspark.sql.functions import cos, sqrt

display(spark.range(10)  # Create a DataFrame with a single column called "id" with a range of integer values
        .withColumn("sqrt", sqrt("id"))
        .withColumn("cos", cos("id"))
       )

Reader & Writer

CSV 파일에서 읽기

%sql
users_df = (spark
           .read
           .csv(DA.paths.users_csv, sep="\\t", header=True, inferSchema=True)
          )

users_df.printSchema()
%sql
ddl_schema = "user_id string, user_first_touch_timestamp long, email string"

users_df = (spark
           .read
           .option("sep", "\\t")
           .option("header", True)
           .schema(ddl_schema)
           .csv(DA.paths.users_csv)
          )
from pyspark.sql.types import LongType, StringType, StructType, StructField

user_defined_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("user_first_touch_timestamp", LongType(), True),
    StructField("email", StringType(), True)
])

JSON 파일에서 읽기

%sql
events_df = (spark
            .read
            .option("inferSchema", True)
            .json(DA.paths.events_json)
           )

events_df.printSchema()
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType, StringType, StructType, StructField

user_defined_schema = StructType([
    StructField("device", StringType(), True),
    StructField("ecommerce", StructType([
        StructField("purchaseRevenue", DoubleType(), True),
        StructField("total_item_quantity", LongType(), True),
        StructField("unique_items", LongType(), True)
    ]), True),
    StructField("event_name", StringType(), True),
    StructField("event_previous_timestamp", LongType(), True),
    StructField("event_timestamp", LongType(), True),
    StructField("geo", StructType([
        StructField("city", StringType(), True),
        StructField("state", StringType(), True)
    ]), True),
    StructField("items", ArrayType(
        StructType([
            StructField("coupon", StringType(), True),
            StructField("item_id", StringType(), True),
            StructField("item_name", StringType(), True),
            StructField("item_revenue_in_usd", DoubleType(), True),
            StructField("price_in_usd", DoubleType(), True),
            StructField("quantity", LongType(), True)
        ])
    ), True),
    StructField("traffic_source", StringType(), True),
    StructField("user_first_touch_timestamp", LongType(), True),
    StructField("user_id", StringType(), True)
])

events_df = (spark
            .read
            .schema(user_defined_schema)
            .json(DA.paths.events_json)
           )

스칼라의 StructType 메서드인 **toDDL**을 사용하면 DDL 형식의 문자열을 자동으로 생성할 수 있습니다.

이 기능은 CSV 및 JSON 데이터를 처리하기 위해 DDL 형식의 문자열을 가져와야 하지만 문자열을 직접 작성하거나 스키마의 StructType 변형을 원하지 않을 때 편리합니다.

# Step 1 - 공유된 spark-config를 사용하여 Python과 Scala 간에 값(데이터 세트 경로)을 전송하려면 이 트릭을 사용하세요.
spark.conf.set("com.whatever.your_scope.events_path", DA.paths.events_json)
%scala
// Step 2 - config에서 값을 끌어오거나 복사하여 붙여넣습니다.
val eventsJsonPath = spark.conf.get("com.whatever.your_scope.events_path")

// Step 3 - JSON을 읽지만 스키마를 추론하게 합니다.
val eventsSchema = spark.read
                        .option("inferSchema", true)
                        .json(eventsJsonPath)
                        .schema.toDDL

// Step 4 - 스키마를 print하고, 선택한 후 복사합니다.
println("="*80)
println(eventsSchema)
println("="*80)
# Step 5 - 위의 스키마를 붙여넣고 여기에서 볼 수 있듯이 변수에 할당합니다.
events_schema = "`device` STRING,`ecommerce` STRUCT<`purchase_revenue_in_usd`: DOUBLE, `total_item_quantity`: BIGINT, `unique_items`: BIGINT>,`event_name` STRING,`event_previous_timestamp` BIGINT,`event_timestamp` BIGINT,`geo` STRUCT<`city`: STRING, `state`: STRING>,`items` ARRAY<STRUCT<`coupon`: STRING, `item_id`: STRING, `item_name`: STRING, `item_revenue_in_usd`: DOUBLE, `price_in_usd`: DOUBLE, `quantity`: BIGINT>>,`traffic_source` STRING,`user_first_touch_timestamp` BIGINT,`user_id` STRING"

# Step 6 - 새로운 DDL 형식 문자열을 사용하여 JSON 데이터를 읽습니다.
events_df = (spark.read
                 .schema(events_schema)
                 .json(DA.paths.events_json))

display(events_df)

DataFrameWriter

파일에 DataFrame 쓰기

users_output_dir = DA.paths.working_dir + "/users.parquet"

(users_df
 .write
 .option("compression", "snappy")
 .mode("overwrite")
 .parquet(users_output_dir)
)
display(
    dbutils.fs.ls(users_output_dir)
)
(users_df
 .write
 .parquet(users_output_dir, compression="snappy", mode="overwrite")
)

테이블에 DataFrame 쓰기

events_df.write.mode("overwrite").saveAsTable("events")
print(DA.schema_name)

Delta Lake

Delta Lake의 주요 기능

  • ACID 트랜잭션
  • 확장 가능한 메타데이터 처리
  • 통합 스트리밍 및 일괄 처리
  • 시간 이동(데이터 버전 관리)
  • 스키마 적용 및 진화
  • 감사 기록
  • Parquet 형식
  • Apache Spark API와 호환

델타 테이블에 결과 쓰기

events_output_path = DA.paths.working_dir + "/delta/events"

(events_df
 .write
 .format("delta")
 .mode("overwrite")
 .save(events_output_path)
)
반응형