Récupérer les messages d'Azure Event Hubs Capture avec Spark

Capture Azure EventHub

Azure Event Hubs Capture est une fonctionnalité d'EventHub qui permet de transmettre automatiquement les données en continu d'un EventHub à un compte Azure Blob Storage ou Azure Data Lake, avec la flexibilité de spécifier un intervalle de temps ou de taille. La configuration de Capture est rapide et automatisée. De plus, il n'y a pas de frais administratifs associés à son exécution. C'est le moyen le plus simple de charger des données en streaming dans un Azure Storage. Ceci permet de se concentrer sur le traitement des données plutôt que sur leur capture.

Lecture usuelle d'une capture avec Python

En se basant sur la documentation de Microsoft relative à la lecture de données provenant d'une capture EventHub (lien), le code est le suivant :

import os
import string
import json
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from azure.storage.blob import BlockBlobService

def processBlob(filename):
   reader = DataFileReader(open(filename, 'rb'), DatumReader())
   dict = {}
   for reading in reader:
       parsed_json = json.loads(reading["Body"])
       if not 'id' in parsed_json:
           return
       if not dict.has_key(parsed_json['id']):
           list = []
           dict[parsed_json['id']] = list
       else:
           list = dict[parsed_json['id']]
           list.append(parsed_json)
   reader.close()
   for device in dict.keys():
       deviceFile = open(device + '.csv', "a")
       for r in dict[device]:
           deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')

def startProcessing(accountName, key, container):
   print 'Processor started using path: ' + os.getcwd()
   block_blob_service = BlockBlobService(account_name=accountName, account_key=key)
   generator = block_blob_service.list_blobs(container)
   for blob in generator:
       if blob.properties.content_length != 0:
           print('Downloaded a non empty blob: ' + blob.name)
           cleanName = string.replace(blob.name, '/', '_')
           block_blob_service.get_blob_to_path(container, blob.name, cleanName)
           processBlob(cleanName)
           os.remove(cleanName)
       block_blob_service.delete_blob(container, blob.name)
startProcessing('YOUR STORAGE ACCOUNT NAME', 'YOUR KEY', 'capture')

En étudiant le comportement de 2 fichiers Avro correspondants à des intervalles de temps différents, on remarque la différence de taille suivante:

comparaison

En utilisant la fonction fournie par Microsoft, processBlob, on observe :

  • Sur le premier fichier : secondfile

    Qu'on parvient à recevoir la donnée attendue.

  • Sur le deuxième fichier : firstfile

    Qu'on obtient l'erreur caractéristique :
    erreur

Sur deux fichiers de capture, la lecture avec la fonction DataFileReader produit des résultats différents. Il n'est donc pas possible d'automatiser un processus de lecture à la volée dans ces conditions. Pour corriger ce problème, on utilise les datasets (dataframes de Spark)

Spark-avro

Spark-avro est représenté sous la forme d'un jar et vu comme le seul véritable moyen par lequel lire un fichier Avro, donc une Capture Event Hub. Poussée par Databricks, son implémentation peut être faite en Java, Python, et Scala. Puisqu'il s'agit d'un jar, on va utiliser dans cet exemple le langage Java.

Code

Pour utiliser Spark-avro, on va, dans un premier temps, insérer sa dépendance dans le fichier pom.xml

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.11</artifactId>
    <version>3.2.0</version>
</dependency>

Une fois installé, le code suivant permet de lire la capture et de décoder le message (Body) puis, d'enregistrer le résultat sous la forme de document json.

import static org.apache.spark.sql.functions.*;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;


public class AvroReader extends InitSpark{

    protected AvroReader(int cpu) {
        super(cpu, AvroReader.class.getName());
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        if (args.length  == 3){
            AvroReader main = new AvroReader(Integer.parseInt(args[0]));
            main.run(args[1], args[2]);
        }
        else{
            System.out.println("You should add the [cpu] [avrofile] [jsonfile]");
        }
    }

    private void run(String...args) {
        // TODO Auto-generated method stub
        String avro = args[0];
        String json = args[1];

        Dataset<Row> avrods = spark.read()
                .format("com.databricks.spark.avro")
                .load(avro);    

        Dataset<Row> ds = avrods
                .withColumn("Body", decode(col("Body"), "UTF-8"));

        Dataset<Row> ds_body = ds.filter(col("Body").isNotNull())
                .select(col("Body"));

        ds_body.write()
            .mode(SaveMode.Overwrite)
            .json(json);

    }

    }

Dans ce sens, le fichier faisant apparaître l'erreur est lu et on obtient le résultat suivant :

obtenue java

On constate que le fichier erreur ne contient aucun élément sur la donnée, mais le schéma est bien présent. Ceci peut expliquer la différence de taille précédemment évoquée.

Le fichier permettant d'obtenir un résultat probant est également lu et on obtient la donnée recherchée :

recherchée java

On peut dès lors penser à automatiser un processus de transformation de captures d'Event Hub, c'est-à-dire la lecture à partir un job Spark, et l'enregistrement sous la forme de document json. À cet effet, on peut "packager" l'application sous la forme d'un jar qui fera les opérations sus-indiquées. Ainsi, en considérant ges.spark comme étant le package et ges-java l'application snapshot, un job sera lancé de la manière suivante :

java -cp ges-java.jar ges.spark.AvroReader