Les bénéfices d'utiliser Delta Lake
Installer et configuration Spark pour utiliser Delta Lake
Notre code
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, lit
from pyspark.sql.types import IntegerType, DateType
spark = SparkSession.builder.appName("Bike calculation").getOrCreate()
source_file = "source/244400404_comptages-velo-nantes-metropole.csv"
df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file)
def transformation(df: DataFrame) -> DataFrame:
return (
df.select(
col("Numéro de boucle").alias("loop_number"),
col("Libellé").alias("label"),
col("Total").cast(IntegerType()).alias("total"),
col("Date formatée").cast(DateType()).alias("date"),
col("Vacances").alias("holiday_name"),
)
.where(col("Probabilité de présence d'anomalies") == lit(""))
)
transformation(df).write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet")
Nous allons donner à la session Spark la configuration nécessaire. D'une part nous allons lui donner les dépendances, et d'autre part la configuration.
Télécharger les jars dans un dossier jars/
Ajoutons les jars dans un premier temps. Soit Spark va les télécharger, soit nous les fournissons. Dans nos exemples, nous allons le fournir.
spark = (
SparkSession
.builder
.appName("Bike calculation")
.config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar")
.getOrCreate()
)
Ensuite, ajoutons la configuration pour pouvoir utiliser le format Delta Lake.
spark = (
SparkSession
.builder
.appName("Bike calculation")
.config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
Votre session Spark est prêt pour utiliser le format Delta Lake.
Enregistrement de la table en delta
Lors de l'écriture de la table dans le dossier datalake/
, il faut changer le format de fichier pour delta.
transformation(df).write.format("delta").partitionBy("date").save("datalake/count-bike-nantes")
Voilà, votre table est au format delta.
Mise à jour de la table
Installer le package Python delta-spark
.
Attention à prendre la version correspondante. Se référer à la matrice de compatibilité.
pip install delta-spark==3.2.0
from delta import DeltaTable
delta_table_path = "datalake/count-bike-nantes"
if DeltaTable.isDeltaTable(spark, delta_table_path):
dt = DeltaTable.forPath(spark, "datalake/count-bike-nantes")
transformation(df)
(
dt
.alias("gold_table")
.merge(transformation(df).alias("fresh_data"), condition="fresh_data.loop_number = gold_table.loop_number")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
else:
transformation(df).write.format("delta").partitionBy("date").save(delta_table_path)
Conclusion
Références
Code complet