cleaning, 중첩데이터, udfs, 함수s, read&write
Cleaning Data
누락된 데이터 검사
SELECT count_if(email IS NULL) FROM users_dirty;
SELECT count(*) FROM users_dirty WHERE email IS NULL;]
%python
from pyspark.sql.functions import col
usersDF = spark.read.table("users_dirty")
usersDF.selectExpr("count_if(email IS NULL)")
usersDF.where(col("email").isNull()).count()
중복 행 제거
SELECT DISTINCT(*) FROM users_dirty
%python
usersDF.distinct().display()
특정 열 기준 중복 행 제거
CREATE OR REPLACE TEMP VIEW deduped_users AS
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;
SELECT count(*) FROM deduped_users
%python
from pyspark.sql.functions import max
dedupedDF = (usersDF
.where(col("user_id").isNotNull())
.groupBy("user_id", "user_first_touch_timestamp")
.agg(max("email").alias("email"),
max("updated").alias("updated"))
)
dedupedDF.count()
데이터 셋 검증
고유한 user_id를 가지고 있는지 확인합니다.
CREATE OR REPLACE TEMP VIEW deduped_users AS
SELECT user_id, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
FROM users_dirty
WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;
SELECT count(*) FROM deduped_users
%python
from pyspark.sql.functions import max
dedupedDF = (usersDF
.where(col("user_id").isNotNull())
.groupBy("user_id", "user_first_touch_timestamp")
.agg(max("email").alias("email"),
max("updated").alias("updated"))
)
dedupedDF.count()
각 이메일이 최대 하나의 user_id와 연결되어 있는지 확인합니다.
SELECT max(user_id_count) <= 1 at_most_one_id FROM (
SELECT email, count(user_id) AS user_id_count
FROM deduped_users
WHERE email IS NOT NULL
GROUP BY email)
%python
display(dedupedDF
.where(col("email").isNotNull())
.groupby("email")
.agg(count("user_id").alias("user_id_count"))
.select((max("user_id_count") <= 1).alias("at_most_one_id")))
날짜 형식 및 정규 표현식
SELECT *,
date_format(first_touch, "MMM d, yyyy") AS first_touch_date,
date_format(first_touch, "HH:mm:ss") AS first_touch_time,
regexp_extract(email, "(?<=@).+", 0) AS email_domain
FROM (
SELECT *,
CAST(user_first_touch_timestamp / 1e6 AS timestamp) AS first_touch
FROM deduped_users
)
%python
from pyspark.sql.functions import date_format, regexp_extract
display(dedupedDF
.withColumn("first_touch", (col("user_first_touch_timestamp") / 1e6).cast("timestamp"))
.withColumn("first_touch_date", date_format("first_touch", "MMM d, yyyy"))
.withColumn("first_touch_time", date_format("first_touch", "HH:mm:ss"))
.withColumn("email_domain", regexp_extract("email", "(?<=@).+", 0))
)
Complex Transformations
데이터 읽어오기
CREATE OR REPLACE TEMP VIEW events_strings AS
SELECT string(key), string(value) FROM events_raw;
SELECT * FROM events_strings
%python
from pyspark.sql.functions import col
events_stringsDF = (spark
.table("events_raw")
.select(col("key").cast("string"),
col("value").cast("string"))
)
display(events_stringsDF)
중첩 데이터 작업
- JSON 문자열의 하위 필드에 액세스하려면 쿼리에서 : 구문을 사용하세요.
- 구조체 타입의 하위 필드에 액세스하려면 쿼리에서 . 구문을 사용하세요.
(중첩데이터 예)
key value
UA00000010645… | {"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1047.6,"total_item_quantity":2,"unique_items":2},"event_name":"finalize","event_previous_timestamp….. |
SELECT * FROM events_strings
WHERE value:event_name = "finalize"
ORDER BY key LIMIT 1
%python
display(events_stringsDF
.where("value:event_name = 'finalize'")
.orderBy("key")
.limit(1)
)
JSON 문자열을 구조체 유형으로 압축 해제한 후, 모든 구조체 필드를 열로 압축 해제하고 평면화
- *schema_of_json()*은 예제 JSON 문자열에서 도출된 스키마를 반환합니다.
- *from_json()*은 지정된 스키마를 사용하여 JSON 문자열이 포함된 열을 구조체 유형으로 구문 분석합니다.
SELECT schema_of_json('{"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1075.5,"total_item_quantity":1,"unique_items":1},"event_name":"finalize","event_previous_timestamp":1593879231210816,"event_timestamp":1593879335779563,"geo":{"city":"Houston","state":"TX"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_K","item_name":"Standard King Mattress","item_revenue_in_usd":1075.5,"price_in_usd":1195.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593454417513109,"user_id":"UA000000106116176"}')
AS schema
CREATE OR REPLACE TEMP VIEW parsed_events AS SELECT json.* FROM (
SELECT from_json(value, 'STRUCT<device: STRING, ecommerce: STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name: STRING, event_previous_timestamp: BIGINT, event_timestamp: BIGINT, geo: STRUCT<city: STRING, state: STRING>, items: ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source: STRING, user_first_touch_timestamp: BIGINT, user_id: STRING>') AS json
FROM events_strings);
SELECT * FROM parsed_events
%python
from pyspark.sql.functions import from_json, schema_of_json
json_string = """
{"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1047.6,"total_item_quantity":2,"unique_items":2},"event_name":"finalize","event_previous_timestamp":1593879787820475,"event_timestamp":1593879948830076,"geo":{"city":"Huntington Park","state":"CA"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_Q","item_name":"Standard Queen Mattress","item_revenue_in_usd":940.5,"price_in_usd":1045.0,"quantity":1},{"coupon":"NEWBED10","item_id":"P_DOWN_S","item_name":"Standard Down Pillow","item_revenue_in_usd":107.10000000000001,"price_in_usd":119.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593583891412316,"user_id":"UA000000106459577"}
"""
parsed_eventsDF = (events_stringsDF
.select(from_json("value", schema_of_json(json_string)).alias("json"))
.select("json.*")
)
display(parsed_eventsDF)
배열 조작
- *explode()*는 배열의 요소를 여러 행으로 분리합니다. 각 요소에 대해 새 행을 생성합니다.
- *size()*는 각 행에 대한 배열의 요소 개수를 계산합니다.
CREATE OR REPLACE TEMP VIEW exploded_events AS
SELECT *, explode(items) AS item
FROM parsed_events;
SELECT * FROM exploded_events WHERE size(items) > 2
%python
from pyspark.sql.functions import explode, size
exploded_eventsDF = (parsed_eventsDF
.withColumn("item", explode("items"))
)
display(exploded_eventsDF.where(size("items") > 2))
- *collect_set()*은 배열 내의 필드를 포함하여 필드에 대한 고유한 값을 수집합니다.
- *flatten()*은 여러 배열을 하나의 배열로 결합합니다.
- *array_distinct()*는 배열에서 중복 요소를 제거합니다.
SELECT user_id,
collect_set(event_name) AS event_history,
array_distinct(flatten(collect_set(items.item_id))) AS cart_history
FROM exploded_events
GROUP BY user_id
%python
from pyspark.sql.functions import array_distinct, collect_set, flatten
display(exploded_eventsDF
.groupby("user_id")
.agg(collect_set("event_name").alias("event_history"),
array_distinct(flatten(collect_set("items.item_id"))).alias("cart_history"))
)
데이터 결합 및 재구성
테이블 조인
CREATE OR REPLACE TEMP VIEW item_purchases AS
SELECT *
FROM (SELECT *, explode(items) AS item FROM sales) a
INNER JOIN item_lookup b
ON a.item.item_id = b.item_id;
SELECT * FROM item_purchases
%python
exploded_salesDF = (spark
.table("sales")
.withColumn("item", explode("items"))
)
itemsDF = spark.table("item_lookup")
item_purchasesDF = (exploded_salesDF
.join(itemsDF, exploded_salesDF.item.item_id == itemsDF.item_id)
)
display(item_purchasesDF)
피봇 테이블
다음 코드 셀은 PIVOT을 사용하여 sales 데이터 세트에서 파생된 여러 필드에 포함된 품목 구매 정보를 평면화합니다.
SELECT *
FROM item_purchases
PIVOT (
sum(item.quantity) FOR item_id IN (
'P_FOAM_K',
'M_STAN_Q',
'P_FOAM_S',
'M_PREM_Q',
'M_STAN_F',
'M_STAN_T',
'M_PREM_K',
'M_PREM_F',
'M_STAN_K',
'M_PREM_T',
'P_DOWN_S',
'P_DOWN_K')
)
%python
transactionsDF = (item_purchasesDF
.groupBy("order_id",
"email",
"transaction_timestamp",
"total_item_quantity",
"purchase_revenue_in_usd",
"unique_items",
"items",
"item",
"name",
"price")
.pivot("item_id")
.sum("item.quantity")
)
display(transactionsDF)
SQL UDFs
Spark SQL의 사용자 정의 함수(UDF)를 사용하면 사용자 지정 SQL 로직을 데이터베이스에 함수로 등록하여 Databricks에서 SQL을 실행할 수 있는 모든 곳에서 재사용할 수 있습니다.
CREATE OR REPLACE FUNCTION sale_announcement(item_name STRING, item_price INT)
RETURNS STRING
RETURN concat("The ", item_name, " is on sale for $", round(item_price * 0.8, 0));
SELECT *, sale_announcement(name, price) AS message FROM item_lookup
DESCRIBE FUNCTION sale_announcement
DESCRIBE FUNCTION EXTENDED sale_announcement
SQL UDF를 CASE / WHEN 절 형태의 제어 흐름과 결합하면 SQL 워크로드 내에서 제어 흐름 실행을 최적화할 수 있습니다.
CREATE OR REPLACE FUNCTION item_preference(name STRING, price INT)
RETURNS STRING
RETURN CASE
WHEN name = "Standard Queen Mattress" THEN "This is my default mattress"
WHEN name = "Premium Queen Mattress" THEN "This is my favorite mattress"
WHEN price > 100 THEN concat("I'd wait until the ", name, " is on sale for $", round(price * 0.8, 0))
ELSE concat("I don't need a ", name)
END;
SELECT *, item_preference(name, price) FROM item_lookup
Python UDFs
def first_letter_function(email):
return email[0]
first_letter_function("annagray@kaufman.com")
first_letter_udf = udf(first_letter_function)
데코레이터 구문 사용(Python 전용)
Python 데코레이터 구문을 사용하여 UDF를 정의하고 등록할 수 있습니다.
@udf 데코레이터 매개변수는 함수가 반환하는 Column 데이터 유형입니다.
# Our input/output is a string
@udf("string")
def first_letter_udf(email: str) -> str:
return email[0]
from pyspark.sql.functions import col
sales_df = spark.table("sales")
display(sales_df.select(first_letter_udf(col("email"))))
Pandas/벡터화된 UDF
Pandas UDF는 UDF의 효율성을 향상시키기 위해 Python에서 제공됩니다. Pandas UDF는 Apache Arrow를 활용하여 계산 속도를 높입니다.
import pandas as pd
from pyspark.sql.functions import pandas_udf
# We have a string input/output
@pandas_udf("string")
def vectorized_udf(email: pd.Series) -> pd.Series:
return email.str[0]
# AlternativelyA
# def vectorized_udf(email: pd.Series) -> pd.Series:
# return email.str[0]
# vectorized_udf = pandas_udf(vectorized_udf, "string")
이러한 Pandas UDF를 SQL 네임스페이스에 등록할 수 있습니다.
spark.udf.register("sql_vectorized_udf", vectorized_udf)
%sql
-- Use the Pandas UDF from SQL
SELECT sql_vectorized_udf(email) AS firstLetter FROM sales
Spark SQL의 고차함수
Spark SQL의 고차 함수를 사용하면 배열이나 맵 타입 객체와 같은 복잡한 데이터 타입을 원래 구조를 유지하면서 변환할 수 있습니다. 다음은 그 예입니다.
- FILTER()는 주어진 람다 함수를 사용하여 배열을 필터링합니다.
- EXIST()는 배열의 하나 이상의 요소에 대해 특정 명령문이 참인지 여부를 테스트합니다.
- TRANSFORM()은 주어진 람다 함수를 사용하여 배열의 모든 요소를 변환합니다.
- REDUCE()는 두 개의 람다 함수를 사용하여 배열의 요소를 버퍼에 병합하여 단일 값으로 축소하고, 최종 버퍼에 마무리 함수를 적용합니다.
FILTER()
SELECT * FROM (
SELECT
order_id,
FILTER (items, i -> i.item_id LIKE "%K") AS king_items
FROM sales)
WHERE size(king_items) > 0
- FILTER: 고차 함수의 이름
- items: 입력 배열의 이름
- i: 반복자 변수의 이름. 이 이름을 선택한 다음 람다 함수에서 사용합니다. 배열을 반복하며 각 값을 한 번에 하나씩 함수로 순환합니다.
- > : 함수의 시작을 나타냅니다.
- i.item_id LIKE "%K" : 이것이 함수입니다. 각 값은 대문자 K로 끝나는지 확인합니다. 대문자 K로 끝나면 새 열인 **king_items*로 필터링됩니다.
TRANSFORM()
SELECT *,
TRANSFORM (
items, i -> CAST(i.item_revenue_in_usd * 100 AS INT)
) AS item_revenues
FROM sales
EXIST()
CREATE OR REPLACE TABLE sales_product_flags AS
SELECT
items,
EXISTS(items, i -> i.item_name LIKE '%Mattress') AS mattress,
EXISTS(items, i -> i.item_name LIKE '%Pillow') AS pillow
FROM sales;
createOrReplaceTempView
createOrReplaceTempView는 DataFrame을 기반으로 임시 뷰를 생성합니다. 임시 뷰의 수명은 DataFrame을 생성하는 데 사용된 SparkSession에 연결됩니다.
budget_df.createOrReplaceTempView("budget")
display(spark.sql("SELECT * FROM budget"))
열 표현식
from pyspark.sql.functions import col
print(events_df.device)
print(events_df["device"])
print(col("device"))
Scala는 DataFrame의 기존 열을 기반으로 새 열을 생성하는 추가 구문을 지원합니다.
%scala
$"device"
열 연산자 및 메서드
메서드 설명
*, + , <, >= | 수학 및 비교 연산자 |
==, != | 같음 및 같지 않음 테스트(Scala 연산자는 === 및 **=!=**입니다) |
alias | 열에 별칭을 지정합니다 |
cast, astype | 열을 다른 데이터 유형으로 변환합니다 |
isNull, isNotNull, isNan | null인지, null이 아닌지, NaN인지 |
asc, desc | 열의 오름차순/내림차순 정렬 표현식을 반환합니다 |
rev_df = (events_df
.filter(col("ecommerce.purchase_revenue_in_usd").isNotNull())
.withColumn("purchase_revenue", (col("ecommerce.purchase_revenue_in_usd") * 100).cast("int"))
.withColumn("avg_purchase_revenue", col("ecommerce.purchase_revenue_in_usd") / col("ecommerce.total_item_quantity"))
.sort(col("avg_purchase_revenue").desc())
)
display(rev_df)
DataFrame 변환 메서드
메서드 설명
select | 각 요소에 대해 주어진 표현식을 계산하여 새 DataFrame을 반환합니다. |
drop | 열을 삭제한 새 DataFrame을 반환합니다. |
withColumnRenamed | 열 이름이 변경된 새 DataFrame을 반환합니다. |
withColumn | 열을 추가하거나 이름이 같은 기존 열을 대체하여 새 DataFrame을 반환합니다. |
filter, where | 주어진 조건을 사용하여 행을 필터링합니다. |
sort, orderBy | 주어진 표현식으로 정렬된 새 DataFrame을 반환합니다. |
dropDuplicates, distinct | 중복 행을 제거한 새 DataFrame을 반환합니다. |
limit | 처음 n개 행을 가져와 새 DataFrame을 반환합니다. |
groupBy | 지정된 열을 사용하여 DataFrame을 그룹화하여 해당 열에 대한 집계를 실행할 수 있습니다. |
apple_df = events_df.selectExpr("user_id", "device in ('macOS', 'iOS') as apple_user")
display(apple_df)
anonymous_df = events_df.drop("user_id", "geo", "device")
display(anonymous_df)
no_sales_df = events_df.drop(col("ecommerce"))
display(no_sales_df)
mobile_df = events_df.withColumn("mobile", col("device").isin("iOS", "Android"))
display(mobile_df)
dropDuplicates()
중복 행을 제거한 새 DataFrame을 반환합니다. 선택적으로 일부 열만 고려합니다.
별칭: distinct
display(events_df.distinct())
distinct_users_df = events_df.dropDuplicates(["user_id"])
display(distinct_users_df)
sort()
주어진 열 또는 표현식을 기준으로 정렬된 새 DataFrame을 반환합니다.
별칭: orderBy
increase_timestamps_df = events_df.sort("event_timestamp")
display(increase_timestamps_df)
decrease_timestamp_df = events_df.sort(col("event_timestamp").desc())
display(decrease_timestamp_df)
increase_sessions_df = events_df.orderBy(["user_first_touch_timestamp", "event_timestamp"])
display(increase_sessions_df)
decrease_sessions_df = events_df.sort(col("user_first_touch_timestamp").desc(), col("event_timestamp"))
display(decrease_sessions_df)
내장 함수
DataFrame 및 Column 변환 메서드 외에도 Spark의 내장 SQL 함수 모듈에는 유용한 함수가 많이 있습니다.
Scala에서는 org.apache.spark.sql.functions이고, Python에서는 pyspark.sql.functions입니다. 이 모듈의 함수는 코드로 가져와야 합니다.
집계 함수
다음은 집계에 사용할 수 있는 몇 가지 기본 제공 함수입니다.
메서드 설명
approx_count_distinct | 그룹 내 고유 항목의 대략적인 개수를 반환합니다. |
avg | 그룹 내 값의 평균을 반환합니다. |
collect_list | 중복된 객체 목록을 반환합니다. |
corr | 두 숫자형 열의 상관계수를 반환합니다. |
max | 각 그룹의 각 숫자 열에 대한 최댓값을 계산합니다. |
mean | 각 그룹의 각 숫자 열에 대한 평균값을 계산합니다. |
stddev_samp | 그룹 내 표현식의 표본 표준 편차를 반환합니다. |
sumDistinct | 표현식 내 고유 값의 합계를 반환합니다. |
var_pop | 그룹 내 값의 모분산을 반환합니다. |
그룹화된 데이터 메서드 agg를 사용하여 내장 집계 함수를 적용합니다.
이렇게 하면 결과 열에 alias와 같은 다른 변환을 적용할 수 있습니다.
from pyspark.sql.functions import avg, approx_count_distinct
state_aggregates_df = (df
.groupBy("geo.state")
.agg(avg("ecommerce.total_item_quantity").alias("avg_quantity"),
approx_count_distinct("user_id").alias("distinct_users"))
)
display(state_aggregates_df)
수학 함수
다음은 수학 연산을 위한 내장 함수입니다.
메서드 설명
ceil | 주어진 열의 상한값을 계산합니다. |
cos | 주어진 값의 코사인을 계산합니다. |
log | 주어진 값의 자연 로그를 계산합니다. |
round | HALF_UP 반올림 모드를 사용하여 e 열의 값을 소수점 이하 0자리로 반올림하여 반환합니다. |
sqrt | 지정된 부동 소수점 값의 제곱근을 계산합니다. |
from pyspark.sql.functions import cos, sqrt
display(spark.range(10) # Create a DataFrame with a single column called "id" with a range of integer values
.withColumn("sqrt", sqrt("id"))
.withColumn("cos", cos("id"))
)
Reader & Writer
CSV 파일에서 읽기
%sql
users_df = (spark
.read
.csv(DA.paths.users_csv, sep="\\t", header=True, inferSchema=True)
)
users_df.printSchema()
%sql
ddl_schema = "user_id string, user_first_touch_timestamp long, email string"
users_df = (spark
.read
.option("sep", "\\t")
.option("header", True)
.schema(ddl_schema)
.csv(DA.paths.users_csv)
)
from pyspark.sql.types import LongType, StringType, StructType, StructField
user_defined_schema = StructType([
StructField("user_id", StringType(), True),
StructField("user_first_touch_timestamp", LongType(), True),
StructField("email", StringType(), True)
])
JSON 파일에서 읽기
%sql
events_df = (spark
.read
.option("inferSchema", True)
.json(DA.paths.events_json)
)
events_df.printSchema()
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType, StringType, StructType, StructField
user_defined_schema = StructType([
StructField("device", StringType(), True),
StructField("ecommerce", StructType([
StructField("purchaseRevenue", DoubleType(), True),
StructField("total_item_quantity", LongType(), True),
StructField("unique_items", LongType(), True)
]), True),
StructField("event_name", StringType(), True),
StructField("event_previous_timestamp", LongType(), True),
StructField("event_timestamp", LongType(), True),
StructField("geo", StructType([
StructField("city", StringType(), True),
StructField("state", StringType(), True)
]), True),
StructField("items", ArrayType(
StructType([
StructField("coupon", StringType(), True),
StructField("item_id", StringType(), True),
StructField("item_name", StringType(), True),
StructField("item_revenue_in_usd", DoubleType(), True),
StructField("price_in_usd", DoubleType(), True),
StructField("quantity", LongType(), True)
])
), True),
StructField("traffic_source", StringType(), True),
StructField("user_first_touch_timestamp", LongType(), True),
StructField("user_id", StringType(), True)
])
events_df = (spark
.read
.schema(user_defined_schema)
.json(DA.paths.events_json)
)
스칼라의 StructType 메서드인 **toDDL**을 사용하면 DDL 형식의 문자열을 자동으로 생성할 수 있습니다.
이 기능은 CSV 및 JSON 데이터를 처리하기 위해 DDL 형식의 문자열을 가져와야 하지만 문자열을 직접 작성하거나 스키마의 StructType 변형을 원하지 않을 때 편리합니다.
# Step 1 - 공유된 spark-config를 사용하여 Python과 Scala 간에 값(데이터 세트 경로)을 전송하려면 이 트릭을 사용하세요.
spark.conf.set("com.whatever.your_scope.events_path", DA.paths.events_json)
%scala
// Step 2 - config에서 값을 끌어오거나 복사하여 붙여넣습니다.
val eventsJsonPath = spark.conf.get("com.whatever.your_scope.events_path")
// Step 3 - JSON을 읽지만 스키마를 추론하게 합니다.
val eventsSchema = spark.read
.option("inferSchema", true)
.json(eventsJsonPath)
.schema.toDDL
// Step 4 - 스키마를 print하고, 선택한 후 복사합니다.
println("="*80)
println(eventsSchema)
println("="*80)
# Step 5 - 위의 스키마를 붙여넣고 여기에서 볼 수 있듯이 변수에 할당합니다.
events_schema = "`device` STRING,`ecommerce` STRUCT<`purchase_revenue_in_usd`: DOUBLE, `total_item_quantity`: BIGINT, `unique_items`: BIGINT>,`event_name` STRING,`event_previous_timestamp` BIGINT,`event_timestamp` BIGINT,`geo` STRUCT<`city`: STRING, `state`: STRING>,`items` ARRAY<STRUCT<`coupon`: STRING, `item_id`: STRING, `item_name`: STRING, `item_revenue_in_usd`: DOUBLE, `price_in_usd`: DOUBLE, `quantity`: BIGINT>>,`traffic_source` STRING,`user_first_touch_timestamp` BIGINT,`user_id` STRING"
# Step 6 - 새로운 DDL 형식 문자열을 사용하여 JSON 데이터를 읽습니다.
events_df = (spark.read
.schema(events_schema)
.json(DA.paths.events_json))
display(events_df)
DataFrameWriter
파일에 DataFrame 쓰기
users_output_dir = DA.paths.working_dir + "/users.parquet"
(users_df
.write
.option("compression", "snappy")
.mode("overwrite")
.parquet(users_output_dir)
)
display(
dbutils.fs.ls(users_output_dir)
)
(users_df
.write
.parquet(users_output_dir, compression="snappy", mode="overwrite")
)
테이블에 DataFrame 쓰기
events_df.write.mode("overwrite").saveAsTable("events")
print(DA.schema_name)
Delta Lake
Delta Lake의 주요 기능
- ACID 트랜잭션
- 확장 가능한 메타데이터 처리
- 통합 스트리밍 및 일괄 처리
- 시간 이동(데이터 버전 관리)
- 스키마 적용 및 진화
- 감사 기록
- Parquet 형식
- Apache Spark API와 호환
델타 테이블에 결과 쓰기
events_output_path = DA.paths.working_dir + "/delta/events"
(events_df
.write
.format("delta")
.mode("overwrite")
.save(events_output_path)
)