Des millions d'enregistrements de données sont générés chaque jour dans les systèmes informatiques actuels. Il peut s'agir de transactions financières, d'une commande ou de données provenant du capteur de votre voiture. Pour traiter ces flux de données en temps réel et transférer de manière fiable les enregistrements d'événements entre les différents systèmes de l'entreprise, vous avez besoin de Apache Kafka.
Apache Kafka est une solution de flux de données open-source qui traite plus d'un million d'enregistrements par seconde. Outre ce débit élevé, Apache Kafka offre une grande évolutivité et disponibilité, une faible latence et un stockage permanent.
Des entreprises comme LinkedIn, Uber et Netflix s'appuient sur Apache Kafka pour le traitement en temps réel et la diffusion de données en continu. La façon la plus simple de démarrer avec Apache Kafka est de l'installer et de le faire fonctionner sur votre machine locale. Cela vous permet non seulement de voir le serveur Apache Kafka en action, mais aussi de produire et de consommer des messages.

Avec une expérience pratique du démarrage du serveur, de la création de sujets et de l'écriture de code Java à l'aide du client Kafka, vous serez prêt à utiliser Apache Kafka pour répondre à tous vos besoins en matière de pipeline de données.
Comment télécharger Apache Kafka sur votre machine locale ?
Vous pouvez télécharger la dernière version d'Apache Kafka à partir de l'adresse suivante lien officiel. Le contenu téléchargé sera compressé en .tgz
format. Une fois téléchargé, vous devrez l'extraire.
Si vous êtes sous Linux, ouvrez votre terminal. Ensuite, naviguez jusqu'à l'emplacement où vous avez téléchargé la version compressée d'Apache Kafka. Exécutez la commande suivante :
tar -xzvf kafka_2.13-3.5.0.tgz
Une fois la commande terminée, vous constaterez qu'un nouveau répertoire appelé kafka_2.13-3.5.0
. Naviguez à l'intérieur du dossier en utilisant :
cd kafka_2.13-3.5.0
Vous pouvez maintenant dresser la liste du contenu de ce répertoire à l'aide de la commande ls
commande.
Pour les utilisateurs de Windows, vous pouvez suivre les mêmes étapes. Si vous ne parvenez pas à trouver le tar
vous pouvez utiliser un outil tiers tel que WinZip pour ouvrir l'archive.
Comment démarrer Apache Kafka sur votre machine locale ?
Après avoir téléchargé et extrait Apache Kafka, il est temps de l'exécuter. Il n'y a pas d'installateur. Vous pouvez commencer à l'utiliser directement via votre ligne de commande ou votre fenêtre de terminal.
Avant de commencer à utiliser Apache Kafka, assurez-vous que Java 8+ est installé sur votre système. Apache Kafka nécessite une installation Java en cours d'exécution.
#1. Exécuter le serveur Apache Zookeeper
La première étape consiste à lancer Apache Zookeeper. Il est préchargé dans l'archive. Il s'agit d'un service chargé de maintenir les configurations et d'assurer la synchronisation avec d'autres services.
Une fois dans le répertoire où vous avez extrait le contenu de l'archive, exécutez la commande suivante :
Pour les utilisateurs de Linux :
bin/zookeeper-server-start.sh config/zookeeper.properties
Pour les utilisateurs de Windows :
bin/windows/zookeeper-server-start.bat config/zookeeper.properties

Les zookeeper.properties
fournit les configurations nécessaires à l'exécution du serveur Apache Zookeeper. Vous pouvez configurer des propriétés telles que le répertoire local où les données seront stockées et le port sur lequel le serveur fonctionnera.
#2. Démarrer le serveur Apache Kafka
Maintenant que le serveur Apache Zookeeper a été démarré, il est temps de démarrer le serveur Apache Kafka.
Ouvrez un nouveau terminal ou une fenêtre d'invite de commande et naviguez jusqu'au répertoire où se trouvent les fichiers extraits. Vous pouvez ensuite démarrer le serveur Apache Kafka à l'aide de la commande ci-dessous :
Pour les utilisateurs de Linux :
bin/kafka-server-start.sh config/server.properties
Pour les utilisateurs de Windows :
bin/windows/kafka-server-start.bat config/server.properties
Votre serveur Apache Kafka fonctionne. Si vous souhaitez modifier la configuration par défaut, vous pouvez le faire en modifiant le fichier server.properties
fichier. Les différentes valeurs sont présentes dans le fichier documentation officielle.
Comment utiliser Apache Kafka sur votre machine locale ?
Vous êtes maintenant prêt à utiliser Apache Kafka sur votre machine locale pour produire et consommer des messages. Puisque les serveurs Apache Zookeeper et Apache Kafka sont opérationnels, voyons comment créer votre premier sujet, produire votre premier message et le consommer.
Quelles sont les étapes pour créer un sujet dans Apache Kafka ?
Avant de créer votre premier sujet, il convient de comprendre ce qu'est un sujet. Dans Apache Kafka, un sujet est un magasin de données logique qui aide à la diffusion des données en continu. Il s'agit du canal par lequel les données sont transportées d'un composant à l'autre.
Un sujet supporte les multi-producteurs et les multi-consommateurs - plus d'un système peut écrire et lire à partir d'un sujet. Contrairement à d'autres systèmes de messagerie, tout message d'un thème peut être consommé plus d'une fois. En outre, vous pouvez également mentionner la période de conservation de vos messages.
Prenons l'exemple d'un système (producteur) qui produit des données pour les transactions bancaires. Un autre système (consommateur) consomme ces données et envoie une notification à l'utilisateur. Pour faciliter cette opération, un sujet est nécessaire.
Ouvrez une nouvelle fenêtre de terminal ou d'invite de commande et naviguez jusqu'au répertoire où vous avez extrait l'archive. La commande suivante créera un sujet appelé transactions
:
Pour les utilisateurs de Linux :
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
Pour les utilisateurs de Windows :
bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Vous avez créé votre premier sujet et vous êtes prêt à commencer à produire et à consommer des messages.
Comment produire un message pour Apache Kafka ?
Votre sujet Apache Kafka étant prêt, vous pouvez maintenant produire votre premier message. Ouvrez un nouveau terminal ou une nouvelle fenêtre d'invite de commande, ou utilisez la même que celle que vous avez utilisée pour créer le sujet. Ensuite, assurez-vous d'être dans le bon répertoire où vous avez extrait le contenu de l'archive. Vous pouvez utiliser la ligne de commande pour produire votre message sur le sujet en utilisant la commande suivante :
Pour les utilisateurs de Linux :
bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092
Pour les utilisateurs de Windows :
bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092
Une fois la commande exécutée, vous verrez que votre terminal ou fenêtre d'invite de commande attend une entrée. Écrivez votre premier message et appuyez sur Entrée.
> This is a transactional record for $100

Vous avez envoyé votre premier message à Apache Kafka sur votre machine locale. Par la suite, vous êtes maintenant prêt à consommer ce message.
Comment consommer un message provenant d'Apache Kafka ?
Si votre sujet a été créé et que vous avez envoyé un message à votre sujet Kafka, vous pouvez maintenant consommer ce message.
Apache Kafka vous permet d'attacher plusieurs consommateurs au même sujet. Chaque consommateur peut faire partie d'un groupe de consommateurs - un identifiant logique. Par exemple, si deux services doivent consommer les mêmes données, ils peuvent avoir des groupes de consommateurs différents.
Cependant, si vous avez deux instances du même service, vous voudrez éviter de consommer et de traiter deux fois le même message. Dans ce cas, les deux instances auront le même groupe de consommateurs.
Dans la fenêtre du terminal ou de l'invite de commande, assurez-vous que vous êtes dans le bon répertoire. Utilisez la commande suivante pour démarrer le consommateur :
Pour les utilisateurs de Linux :
bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer
Pour les utilisateurs de Windows :
bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Vous verrez le message que vous avez produit précédemment apparaître sur votre terminal. Vous avez maintenant utilisé Apache Kafka pour consommer votre premier message.
Les kafka-console-consumer
prend en compte un grand nombre d'arguments. Voyons ce que chacun d'entre eux signifie :
- Les
--topic
mentionne le sujet à partir duquel vous allez consommer --from-beginning
indique au consommateur de la console de commencer à lire les messages dès le premier message présent- Votre serveur Apache Kafka est mentionné via le fichier
--bootstrap-server
option - En outre, vous pouvez mentionner le groupe de consommateurs en passant l'option
--group
paramètre - En l'absence d'un paramètre relatif au groupe de consommateurs, il est généré automatiquement.
Lorsque le consommateur de la console fonctionne, vous pouvez essayer de produire de nouveaux messages. Vous verrez qu'ils sont tous consommés et s'affichent dans votre terminal.
Maintenant que vous avez créé votre sujet et que vous avez réussi à produire et à consommer des messages, intégrons-le dans une application Java.
Comment créer un producteur et un consommateur Apache Kafka en utilisant Java
Avant de commencer, assurez-vous que Java 8+ est installé sur votre machine locale. Apache Kafka fournit sa propre bibliothèque client qui vous permet de vous connecter de manière transparente. Si vous utilisez Maven pour gérer vos dépendances, ajoutez la dépendance suivante à votre fichier pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
Vous pouvez également télécharger la bibliothèque à partir du site Dépôt Maven et l'ajouter à votre chemin de classe Java.
Une fois votre bibliothèque en place, ouvrez l'éditeur de code de votre choix. Voyons comment vous pouvez démarrer votre producteur et votre consommateur en utilisant Java.
Créer un producteur Java Apache Kafka
Avec la kafka-clients
en place, vous êtes maintenant prêt à créer votre producteur Kafka.
Créons une classe appelée SimpleProducer.java
. Il sera chargé de produire des messages on le sujet que vous avez créé précédemment. Dans cette classe, vous créerez une instance de org.apache.kafka.clients.producer.KafkaProducer
. Par la suite, vous utiliserez ce producteur pour envoyer vos messages.
Pour créer le producteur Kafka, vous avez besoin de l'hôte et du port de votre serveur Apache Kafka. Puisque vous l'exécutez sur votre machine locale, l'hôte sera localhost
. Si vous n'avez pas modifié les propriétés par défaut lors du démarrage du serveur, le port sera le suivant 9092
. Considérez le code suivant qui vous aidera à créer votre producteur :
package org.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SimpleProducer {
private final KafkaProducer<String, String> producer;
public SimpleProducer(String host, String port) {
String server = host + ":" + port;
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
this.producer = new KafkaProducer<>(properties);
}
}
Vous remarquerez que trois propriétés sont définies. Passons rapidement en revue chacune d'entre elles :
- Le BOOTSTRAP_SERVERS_CONFIG vous permet de définir où le serveur Apache Kafka s'exécute
- Le KEY_SERIALIZER_CLASS_CONFIG indique au producteur le format à utiliser pour l'envoi des clés de message.
- Le format d'envoi du message réel est défini à l'aide de la propriété VALUE_SERIALIZER_CLASS_CONFIG.
Comme vous allez envoyer des messages textuels, les deux propriétés sont définies pour utiliser la fonction StringSerializer.class
.
Pour envoyer un message à votre sujet, vous devez utiliser la fonction producer.send()
qui prend en compte un ProducerRecord
. Le code suivant vous donne une méthode qui enverra un message au sujet et imprimera la réponse avec le décalage du message.
public void produce(String topic, String message) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
final Future<RecordMetadata> send = this.producer.send(record);
final RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);
}
Une fois l'ensemble du code mis en place, vous pouvez maintenant envoyer des messages à votre sujet. Vous pouvez utiliser un main
pour tester cette méthode, comme le montre le code ci-dessous :
package org.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SimpleProducer {
private final KafkaProducer<String, String> producer;
public SimpleProducer(String host, String port) {
String server = host + ":" + port;
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
this.producer = new KafkaProducer<>(properties);
}
public void produce(String topic, String message) throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
final Future<RecordMetadata> send = this.producer.send(record);
final RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);
}
public static void main(String[] args) throws Exception{
SimpleProducer producer = new SimpleProducer("localhost", "9092");
producer.produce("transactions", "This is a transactional record of $200");
}
}
Dans ce code, vous créez un SimpleProducer
qui se connecte à votre serveur Apache Kafka sur votre machine locale. Il utilise en interne l'option KafkaProducer
pour produire des messages texte sur votre sujet.

Créer un consommateur Java Apache Kafka
Il est temps de créer un consommateur Apache Kafka en utilisant le client Java. Créez une classe appelée SimpleConsumer.java
. Ensuite, vous créerez un constructeur pour cette classe, qui initialisera le fichier org.apache.kafka.clients.consumer.KafkaConsumer
. Pour créer le consommateur, vous avez besoin de l'hôte et du port où tourne le serveur Apache Kafka. En outre, vous avez besoin du groupe de consommateurs ainsi que du sujet à partir duquel vous souhaitez consommer. Utilisez l'extrait de code ci-dessous :
package org.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class SimpleConsumer {
private static final String OFFSET_RESET = "earliest";
private final KafkaConsumer<String, String> consumer;
private boolean keepConsuming = true;
public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
String server = host + ":" + port;
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(List.of(topic));
}
}
Tout comme le producteur Kafka, le consommateur Kafka prend également en charge un objet Properties. Examinons les différentes propriétés définies :
- BOOTSTRAP_SERVERS_CONFIG indique au consommateur où tourne le serveur Apache Kafka.
- Le groupe de consommateurs est mentionné à l'aide du paramètre GROUP_ID_CONFIG
- Lorsque le consommateur commence à consommer, le paramètre AUTO_OFFSET_RESET_CONFIG vous permet d'indiquer jusqu'à quand vous souhaitez commencer à consommer les messages à partir de
- KEY_DESERIALIZER_CLASS_CONFIG indique au consommateur le type de la clé de message.
- VALUE_DESERIALIZER_CLASS_CONFIG indique le type de consommateur du message actuel.
Puisque, dans votre cas, vous consommerez des messages textuels, les propriétés du désérialiseur sont fixées à StringDeserializer.class
.
Vous allez maintenant consommer les messages de votre sujet. Pour simplifier les choses, une fois le message consommé, vous imprimerez le message sur la console. Voyons comment vous pouvez y parvenir à l'aide du code ci-dessous :
private boolean keepConsuming = true;
public void consume() {
while (keepConsuming) {
final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
if (consumerRecords != null && !consumerRecords.isEmpty()) {
consumerRecords.iterator().forEachRemaining(consumerRecord -> {
System.out.println(consumerRecord.value());
});
}
}
}
Ce code continuera à interroger le sujet. Lorsque vous recevrez un enregistrement de consommateur, le message sera imprimé. Testez votre consommateur en action à l'aide de la méthode main. Vous démarrerez une application Java qui continuera à consommer le sujet et à imprimer les messages. Arrêtez l'application Java pour mettre fin au consommateur.
package org.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class SimpleConsumer {
private static final String OFFSET_RESET = "earliest";
private final KafkaConsumer<String, String> consumer;
private boolean keepConsuming = true;
public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
String server = host + ":" + port;
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(List.of(topic));
}
public void consume() {
while (keepConsuming) {
final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
if (consumerRecords != null && !consumerRecords.isEmpty()) {
consumerRecords.iterator().forEachRemaining(consumerRecord -> {
System.out.println(consumerRecord.value());
});
}
}
}
public static void main(String[] args) {
SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions");
simpleConsumer.consume();
}
}
Lorsque vous exécuterez le code, vous constaterez qu'il consomme non seulement le message produit par votre producteur Java, mais également ceux que vous avez produits via le producteur Console. Cela s'explique par le fait que le producteur AUTO_OFFSET_RESET_CONFIG
a été fixée à earliest
.

Lorsque le SimpleConsumer fonctionne, vous pouvez utiliser le producteur de la console ou l'application Java SimpleProducer pour produire d'autres messages à destination du sujet. Vous verrez qu'ils sont consommés et imprimés sur la console.
Répondez à tous vos besoins en matière de pipeline de données avec Apache Kafka
Apache Kafka vous permet de gérer facilement tous vos besoins en matière de pipeline de données. Une fois Apache Kafka installé sur votre machine locale, vous pouvez explorer toutes les différentes fonctionnalités offertes par Kafka. De plus, le client Java officiel vous permet d'écrire, de vous connecter et de communiquer avec votre serveur Apache Kafka de manière efficace.
Apache Kafka est un système de flux de données polyvalent, évolutif et très performant qui peut véritablement changer la donne. Vous pouvez l'utiliser pour votre développement local ou même l'intégrer dans vos systèmes de production. Tout comme il est facile à mettre en place localement, la mise en place d'Apache Kafka pour des applications plus importantes n'est pas une tâche difficile.
Si vous recherchez des plates-formes de diffusion de données en continu, vous pouvez consulter les sites suivants les meilleures plateformes de données en continu pour l'analyse et le traitement en temps réel.