Automatiser la création de version d'une application avec semantic-release
Dans cet article, découvrez comment automatiser une création de version de votre application grâce à Semantic-Release : nommage des commits et configurations
Sommaire
Initié par les créateur du moteur Apache Spark, et également de la solution SaaS Databricks, ce format est une surcouche au format parquet. Il apporte le concept ACID (Atomicité, Cohérence, Isolation et Durabilité) sur les fichiers parquet dans du stockage de type objet (tel que Google Cloud Storage, AWS S3). Ansi, nous pouvons bénéficier d'un stockage à très bas coût et les bénéfices d'une table dans une base de données (en particulier la notion ACID).
Comme vu précédemment, il y a la notion de transaction ACID, à cela s'ajoute les avantages suivants :
Le format Delta Lake se veut être les fondations d'une architecture de type Lakehouse. L'industrie de la data évolue vers cette architecture afin de réduire drastriquement les coûts, et cela permet également de réduire la barrière entre les différents utilisateurs. Avec l'avènement de l'intelligence artificielle, les équipes Data Scientiest ont besoin d'accéder à de la données fraîche.
Reprenons le code de notre précédent article Démarrer avec Apache Spark étape par étape.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DateType
spark = SparkSession.builder.appName("Bike calculation").getOrCreate()
source_file = "source/244400404_comptages-velo-nantes-metropole.csv"
df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file)
df_clean = (
df.select(
col("Numéro de boucle").alias("loop_number"),
col("Libellé").alias("label"),
col("Total").cast(IntegerType()).alias("total"),
col("Date formatée").cast(DateType()).alias("date"),
col("Vacances").alias("holiday_name"),
)
.where(col("Probabilité de présence d'anomalies").isNull())
)
df_clean.write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet")
Nous allons donner à la session Spark la configuration nécessaire. D'une part, nous allons lui donner les dépendances, et d'autre part la configuration.
Télécharger les jars dans un dossier jars/
Ajoutons les jars dans un premier temps. Soit Spark va les télécharger, soit nous les fournissons. Dans nos exemples, nous allons le lui fournir.
spark = ( SparkSession .builder .appName("Bike calculation") .config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar") .getOrCreate() )
Ensuite, ajoutons la configuration pour pouvoir utiliser le format Delta Lake.
spark = ( SparkSession .builder .appName("Bike calculation") .config("spark.jars", "jars/delta-spark_2.12-3.2.0.jar,jars/delta-storage-3.2.0.jar") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate() )
Votre session Spark est prêt pour utiliser le format Delta Lake.
Lors de l'écriture de la table dans le dossier datalake/
, il faut changer le format de fichier pour delta.
df_clean.write.format("delta").partitionBy("date").save("datalake/count-bike-nantes.parquet")
Voilà, votre table est maintenant enregistré format delta.
Note
Il y a eu une mise à jour de la source de données. Il faut donc les intégrer. Pour cela, nous allons utiliser la fonction merge()
de la lib Python Delta Lake.
Cette fonction va automatiquement faire la mise à jour de la table en fonction des conditions. Si la ligne est nouvelle dans la source, alors elle sera ajouté. Si elle existe déjà et qu'elle a changé, alors la ligne dans la table des destinations elle sera mise à jour.
Voyons en détail son utilisation.
Pour cela, installer le package Python delta-spark
.
Attention à prendre la version correspondante. Se référer à la matrice de compatibilité https://docs.delta.io/latest/releases.html#compatibility-with-apache-spark.
Dans notre cas, nous avons besoin de la version 3.2.0 car nous utilisons Spark 3.2.0.
pip install delta-spark==3.2.0
Note
Nous effectuons toujours notre calcul avec notre nouveau fichier source. Ensuite, nous avons besoin de lire notre table de destination. Généralement, cette table est qualifié de Gold (or) car c'est une table avec des données aggrégés et à forte valeur.
# Calcul avec les nouvelles données
df_clean = (
df.select(
col("Numéro de boucle").alias("loop_number"),
col("Libellé").alias("label"),
col("Total").cast(IntegerType()).alias("total"),
col("Date formatée").cast(DateType()).alias("date"),
col("Vacances").alias("holiday_name"),
)
.where(col("Probabilité de présence d'anomalies").isNull())
)
# Lecture de la table gold
from delta import DeltaTable
delta_table = DeltaTable.forPath(spark, "datalake/count-bike-nantes.parquet")
Appliquons la fonction merge()
pour fusionner les deux Dataframes.
(
delta_table
.alias("gold_table")
.merge(
df_clean.alias("fresh_data"),
condition="fresh_data.loop_number = gold_table.loop_number and fresh_data.date = gold_table.date"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
La fonction merge()
prend en entrée un dataframe avec lequel faire la comparaison. Ensuite, nous avons une condition pour effectuer la comparaison. Avec fresh_data.loop_number = gold_table.loop_number and fresh_data.date = gold_table.date
, la comparaison entre les deux dataframes est effectué sur la colonne date
et loop_number
.
Ensuite, des conditions de merge sont appliqués :
whenMatchedUpdateAll()
, s'il existe une correspondance entre les deux dataframes sur ces clefs, alors la ligne dans le dataframe de destination (la gold) est mise à jour.whenNotMatchedInsertAll()
, s'il n'existe pas de correspondance entre les deux dataframes sur ces clefs, alors la ligne dans le dataframe de destination (la gold) est ajoutée.whenNotMatchedBySourceDelete()
, si une ligne existe dans le dataframe de destination mais n'a plus de correspondance dans le dataframe source, alors la ligne sera supprimé dans le dataframe de destination.Enfin, la fonction execute()
va appliquer les modifications.
En quelques lignes, votre table sera facilement mis à jour. En fonction de votre besoin métier, ajustez les conditons de merge.
Code complet
Auteur(s)
Thierry T.
Super Data Boy
Vous souhaitez en savoir plus sur le sujet ?
Organisons un échange !
Notre équipe d'experts répond à toutes vos questions.
Nous contacterDécouvrez nos autres contenus dans le même thème
Dans cet article, découvrez comment automatiser une création de version de votre application grâce à Semantic-Release : nommage des commits et configurations
Dans le domaine de la data, la qualité de la donnée est primordiale. Pour s'en assurer, plusieurs moyens existent, et nous allons nous attarder dans cet article sur l'un d'entre eux : tester unitairement avec Pytest.
Le domaine de la data est présent dans le quotidien de chacun : la majorité de nos actions peut être traduite en données. Le volume croissant de ces données exploitables a un nom : "Big Data". Dans cet article, nous verrons comment exploiter ce "Big data" à l'aide du framework Apache Spark.