Delta Lake avec Apache Spark

Delta Lake avec Apache Spark


Qu'est ce que le format de fichier Delta Lake ?

Les bénéfices d'utiliser Delta Lake

Installer et configuration Spark pour utiliser Delta Lake

Notre code

from pyspark.sql import SparkSession, DataFrame from pyspark.sql.functions import col, lit from pyspark.sql.types import IntegerType, DateType spark = SparkSession.builder.appName("Bike calculation").getOrCreate() source_file = "source/244400404_comptages-velo-nantes-metropole.csv" df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file) def transformation(df: DataFrame) -> DataFrame: return ( df.select( col("Numéro de boucle").alias("loop_number"), col("Libellé").alias("label"), col("Total").cast(IntegerType()).alias("total"), col("Date formatée").cast(DateType()).alias("date"), col("Vacances").alias("holiday_name"), ) .where(col("Probabilité de présence d'anomalies") == lit("")) ) transformation(df).write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet")

Nous allons donner à la session Spark la configuration nécessaire. D'une part nous allons lui donner les dépendances, et d'autre part la configuration.

Télécharger les jars dans un dossier jars/

Ajoutons les jars dans un premier temps. Soit Spark va les télécharger, soit nous les fournissons. Dans nos exemples, nous allons le fournir.

spark = ( SparkSession .builder .appName("Bike calculation") .config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar") .getOrCreate() )

Ensuite, ajoutons la configuration pour pouvoir utiliser le format Delta Lake.

spark = ( SparkSession .builder .appName("Bike calculation") .config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate() )

Votre session Spark est prêt pour utiliser le format Delta Lake.

Enregistrement de la table en delta

Lors de l'écriture de la table dans le dossier datalake/, il faut changer le format de fichier pour delta.

transformation(df).write.format("delta").partitionBy("date").save("datalake/count-bike-nantes")

Voilà, votre table est au format delta.

Mise à jour de la table

Installer le package Python delta-spark.

Attention à prendre la version correspondante. Se référer à la matrice de compatibilité.

pip install delta-spark==3.2.0
from delta import DeltaTable delta_table_path = "datalake/count-bike-nantes" if DeltaTable.isDeltaTable(spark, delta_table_path): dt = DeltaTable.forPath(spark, "datalake/count-bike-nantes") transformation(df) ( dt .alias("gold_table") .merge(transformation(df).alias("fresh_data"), condition="fresh_data.loop_number = gold_table.loop_number") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .whenNotMatchedBySourceDelete() .execute() ) else: transformation(df).write.format("delta").partitionBy("date").save(delta_table_path)

Conclusion

Références

Code complet

Auteur(s)

Thierry T.

Thierry T.

Super Data Boy

Voir le profil

Vous souhaitez en savoir plus sur le sujet ?
Organisons un échange !

Notre équipe d'experts répond à toutes vos questions.

Nous contacter

Découvrez nos autres contenus dans le même thème

Comment tester son script Apache Spark avec Pytest ?

Tester son script Apache Spark avec pytest

Dans le domaine de la data, la qualité de la donnée est primordiale. Pour s'en assurer, plusieurs moyens existent, et nous allons nous attarder dans cet article sur l'un d'entre eux : tester unitairement avec Pytest.

Un long couloir fait à partir de données

Démarrer avec Apache Spark étape par étape

Le domaine de la data est présent dans le quotidien de chacun : la majorité de nos actions peut être traduite en données. Le volume croissant de ces données exploitables a un nom : "Big Data". Dans cet article, nous verrons comment exploiter ce "Big data" à l'aide du framework Apache Spark.