Automatiser la vérification du commit
Automatiser la vérification du commit avec commitlint
Sommaire
Lorsque l'on travaille dans l'univers de la data, nous effectuons principalement trois grands types de tâches :
Ces trois étapes constituent ce que l'on appelle un "pipeline ETL", pour : Extract, Transform, Load (Extraire, Transformer, Charger).
Il existe une multitude de façons d'effectuer ces tâches, mais dans cet article, nous allons nous concentrer sur comment le faire avec Apache Spark.
Apache Spark est un framework qui permet de manipuler et transformer la données, et qui s'appuie sur le framework Hadoop pour distribuer les calculs sur les différents noeuds du cluster.
Par simplicité, dans la suite de cet article nous utiliserons le nom "Spark" pour désigner Apache Spark.
Rentrons dans le vif du sujet avec un cas concret. Je veux importer les données sur le nombre de passages de vélo à un point géographique donné afin d'effectuer une étude sur l'usage du vélo en ville.
Prenons par exemple la ville de Nantes qui met à disposition un jeu de données https://data.nantesmetropole.fr/explore/dataset/244400404_comptages-velo-nantes-metropole/information.
Nous prenons ce fichier et nous le déposons ensuite dans le dossier source/244400404_comptages-velo-nantes-metropole.csv
.
Voici un échantillon du fichier en question :
Numéro de boucle;Libellé;Total;Probabilité de présence d'anomalies;Jour de la semaine;Boucle de comptage;Date formatée;Vacances 0674;Pont Haudaudine vers Sud;657;;2;0674 - Pont Haudaudine vers Sud;2021-03-16;Hors Vacances 0674;Pont Haudaudine vers Sud;689;;4;0674 - Pont Haudaudine vers Sud;2021-03-18;Hors Vacances 0674;Pont Haudaudine vers Sud;589;;5;0674 - Pont Haudaudine vers Sud;2021-03-26;Hors Vacances
Il existe plusieurs façons d'installer Apache Spark : soit en prenant le binaire, soit avec Docker avec Jupyter Docker Stacks.
Nous allons effectuer l'installation avec le binaire Spark. Biern que ce processus soit plus long et complexe, il est plus intéressant car il nous permet de mieux comprendre les différents éléments qui le constituent.
Notez que pour l'installation, il sera nécessaire d'avoir au moins Python 3.8 et Java 17.
Voici la commande pour installer Java Runtime sous Ubuntu :
sudo apt install openjdk-17-jre-headless
Allez sur la page de téléchargement https://spark.apache.org/downloads.html et téléchargez le package en tgz. Prenez la dernière version disponible (il s'agit de la 3.5.1 au moment de l'écriture de l'article).
Une fois téléchargé, décomppressez le tout dans un dossier, par exemple ~/Apps/spark
.
Dans le fichier .bashrc
, ajoutez une variable d'environnement qui pointera vers le dossier du binaire Spark.
# ~/.bashrc export SPARK_HOME=~/Apps/spark/spark-3.5.1-bin-hadoop3
Vous êtes prêt ! Il ne restera qu'à installer le package Python qui permet de manipuler Spark.
PySpark est une bibliothèque en Python qui fait la correspondance vers les appels Java Spark.
Dans notre dossier qui contient le projet, nous avons au préalable créé un environnement virtuel.
virtualenv venv . venv/bin/activate
Une fois l'environnement virtuel activé, nous pouvons installer pyspark.
pip install pyspark==3.5.1
Ça y est, PySpark est installé !
Nous allons effectuer les 3 étapes de notre pipeline
Pour lire la donnée source avec pyspark, il faut tout d'abord créer une session Spark.
Cette session Spark va piloter les différents travaux à effectuer.
from pyspark.sql import SparkSession, DataFrame spark = SparkSession.builder.appName("Bike calculation").getOrCreate()
Ensuite, nous allons instruire le chargement du fichier CSV.
Le fichier contient un en-tête et le caractère de séparation est un point-virgule. Il sera nécessaire de le spécifier lors du chargement.
source_file = "source/244400404_comptages-velo-nantes-metropole.csv" df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file)
Note
On peut ajouter les instructions .show()
et .printSchema()
pour afficher les premiers éléments.
df.printSchema() df.show()
Vous pouvez dès à présent exécuter le fichier pour voir le résultat.
(venv) [thierry@travail:~/eleven/data] % python main.py (...) root |-- Numéro de boucle: string (nullable = true) |-- Libellé: string (nullable = true) |-- Total: string (nullable = true) |-- Probabilité de présence d'anomalies: string (nullable = true) |-- Jour de la semaine: string (nullable = true) |-- Boucle de comptage: string (nullable = true) |-- Date formatée: string (nullable = true) |-- Vacances: string (nullable = true) +----------------+--------------------+-----+-----------------------------------+------------------+--------------------+-------------+--------------------+ |Numéro de boucle| Libellé|Total|Probabilité de présence d'anomalies|Jour de la semaine| Boucle de comptage|Date formatée| Vacances| +----------------+--------------------+-----+-----------------------------------+------------------+--------------------+-------------+--------------------+ | 0674|Pont Haudaudine v...| 657| null| 2|0674 - Pont Hauda...| 2021-03-16| Hors Vacances| | 0674|Pont Haudaudine v...| 689| null| 4|0674 - Pont Hauda...| 2021-03-18| Hors Vacances| | 0674|Pont Haudaudine v...| 589| null| 5|0674 - Pont Hauda...| 2021-03-26| Hors Vacances| | 0674|Pont Haudaudine v...| 591| null| 4|0674 - Pont Hauda...| 2021-04-15| Hors Vacances| | 0674|Pont Haudaudine v...| 481| null| 2|0674 - Pont Hauda...| 2021-05-04|Vacances de print...| | 0674|Pont Haudaudine v...| 583| null| 1|0674 - Pont Hauda...| 2021-05-10| Hors Vacances| | 0674|Pont Haudaudine v...| 421| null| 6|0674 - Pont Hauda...| 2021-05-22| Hors Vacances| | 0674|Pont Haudaudine v...| 279| null| 7|0674 - Pont Hauda...| 2021-05-23| Hors Vacances| | 0674|Pont Haudaudine v...| 512| null| 6|0674 - Pont Hauda...| 2021-06-12| Hors Vacances| | 0674|Pont Haudaudine v...| 338| null| 7|0674 - Pont Hauda...| 2021-06-13| Hors Vacances| | 0674|Pont Haudaudine v...| 948| null| 2|0674 - Pont Hauda...| 2021-06-15| Hors Vacances| | 0674|Pont Haudaudine v...| 688| null| 1|0674 - Pont Hauda...| 2021-06-21| Hors Vacances| | 0674|Pont Haudaudine v...| 381| null| 6|0674 - Pont Hauda...| 2021-07-10| Vacances d'été| | 0674|Pont Haudaudine v...| 413| null| 6|0674 - Pont Hauda...| 2021-07-17| Vacances d'été| | 0674|Pont Haudaudine v...| 648| null| 5|0674 - Pont Hauda...| 2021-07-23| Vacances d'été| | 0675|Pont Haudaudine v...| 541| null| 3|0675 - Pont Hauda...| 2021-01-06| Hors Vacances| | 0675|Pont Haudaudine v...| 549| null| 4|0675 - Pont Hauda...| 2021-01-07| Hors Vacances| | 0675|Pont Haudaudine v...| 520| null| 1|0675 - Pont Hauda...| 2021-01-18| Hors Vacances| | 0675|Pont Haudaudine v...| 184| null| 7|0675 - Pont Hauda...| 2021-01-24| Hors Vacances| | 0675|Pont Haudaudine v...| 418| null| 1|0675 - Pont Hauda...| 2021-02-01| Hors Vacances| +----------------+--------------------+-----+-----------------------------------+------------------+--------------------+-------------+--------------------+ only showing top 20 rows
La lecture s'est bien passée. Nous retrouvons toutes les colonnes attendues ! Passons maintenant à la transformation.
Avant de stocker notre donnée à un endroit, nous pouvons faire quelques transformations élémentaires. Dans un paradigme d'architecture orienté Datalake, la donnée est très peu transformée, car ce sont les consommateurs qui vont transformer cette donnée pour lui donner de la valeur. Par exemple : effectuer une aggrégation, puis ensuite la consommer sur un outil de Data Visualisation (aka DataViz).
Dnas notre exemple, d'après les premières observations :
Nous allons conserver les colonnes qui nous intéressent et typer les colonnes. Nous allons prévoir de les stocker au format parquet car cela permet de conserver le typage des colonnes, et d'effectuer un partitionnement par jour.
Important
Voici la transformation
df_clean = (
df.select(
col("Numéro de boucle").alias("loop_number"),
col("Libellé").alias("label"),
col("Total").cast(IntegerType()).alias("total"),
col("Date formatée").cast(DateType()).alias("date"),
col("Vacances").alias("holiday_name"),
)
.where(col("Probabilité de présence d'anomalies").isNull())
)
df_clean.show()
Et le résultat :
(venv) [thierry@travail:~/eleven/data] % python main.py +-----------+--------------------+-----+----------+--------------------+ |loop_number| label|total| date| holiday_name| +-----------+--------------------+-----+----------+--------------------+ | 0674|Pont Haudaudine v...| 657|2021-03-16| Hors Vacances| | 0674|Pont Haudaudine v...| 689|2021-03-18| Hors Vacances| | 0674|Pont Haudaudine v...| 589|2021-03-26| Hors Vacances| | 0674|Pont Haudaudine v...| 591|2021-04-15| Hors Vacances| | 0674|Pont Haudaudine v...| 481|2021-05-04|Vacances de print...| | 0674|Pont Haudaudine v...| 583|2021-05-10| Hors Vacances| +-----------+--------------------+-----+----------+--------------------+
Nous allons stocker le résultat au format parquet. Ce format offre l'avantage de :
Pour cela, il faut faire appel au DataFrameWriter. Le support du format parquet est natif.
df_clean.write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet")
Ainsi, dans l'arboresence, nous avons nos données partitionnées par date.
Bravo, vous venez de créer votre premier pipeline Spark. Un nouveau monde s'ouvre à vous. À travers cet article, nous avons vu l'installation de Spark et PySpark. Avec la création du pipeline, nous avons lu la source de données, effectué quelques transformations, et enfin stocké la donnée à un endroit défini. Ce stockage permettra à d'autre corps de métier de la data de l'exploiter.
Code complet
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DateType
spark = SparkSession.builder.appName("Bike calculation").getOrCreate()
source_file = "source/244400404_comptages-velo-nantes-metropole.csv"
df = spark.read.format("csv").option("delimiter", ";").option("header", True).load(source_file)
df_clean = (
df.select(
col("Numéro de boucle").alias("loop_number"),
col("Libellé").alias("label"),
col("Total").cast(IntegerType()).alias("total"),
col("Date formatée").cast(DateType()).alias("date"),
col("Vacances").alias("holiday_name"),
)
.where(col("Probabilité de présence d'anomalies").isNull())
)
df_clean.write.format("parquet").partitionBy("date").save("datalake/count-bike-nantes.parquet")
Auteur(s)
Thierry T.
Super Data Boy
Vous souhaitez en savoir plus sur le sujet ?
Organisons un échange !
Notre équipe d'experts répond à toutes vos questions.
Nous contacterDécouvrez nos autres contenus dans le même thème
Automatiser la vérification du commit avec commitlint
Dans le domaine de la data, la qualité de la donnée est primordiale. Pour s'en assurer, plusieurs moyens existent, et nous allons nous attarder dans cet article sur l'un d'entre eux : tester unitairement avec Pytest.
Découvrez Crossplane, un outil open-source qui étend Kubernetes avec des concepts de platform engineering et permet de gérer le cycle de vie de vos ressources cloud dans une boucle de réconciliation.