Эта статья представляет собой описание самого простого клиента получения сообщений с сервера Kafka с самой простой конфигурацией.
Статья, нацелена на проверку работы Java Kafka Consumer Example. Минимум объяснений, архитектуры и настроек необходимых для работы из коробки. Хотите более подробную информацию по Kafka — Вам сюда.
Для проверки работы клиента, Вам потребуется еще создать и producer. Как это сделать описано тут.
Пример проверен при следующих условиях:
- JDK не менее чем 1.8
- Kafka 2.6.0
- Дефолтные настройки Kafka и ZooKeeper
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.
Пример java Kafka consumer
Для запуска необходим установленный на локальной машине Kafka 2.6.0 с дефолтными настройками. Пример проверялся на данных настройках. Как правило, если Вы конфигурацию не меняли, то такие настройки уже есть и Вам не нужно их перезаписывать.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 |
Maven проект на Java
Сам проект есть на github — https://github.com/BlockWit/kafka-security-examples. Код самого примера тут https://github.com/BlockWit/kafka-security-examples/blob/master/src/main/java/com/blockwit/kafka/security/examples/cookbook/ConsumerExample.java. Код независим от остальных примеров.
Как создать проект самостоятельно:
Создаем проект Maven
Добавляем в Maven зависимости:
1 2 3 4 5 6 7 8 9 10 11 12 |
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> </dependencies> |
Добавляем в проект код клиента:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package com.blockwit.kafka.security.examples.cookbook; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.List; import java.util.Properties; public class JavaKafkaConsumerExample { public static void main(String[] args) { String server = "localhost:9092"; String topicName = "test.topic"; String groupName = "test.group"; final Properties props = new Properties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); final Consumer<Long, String> consumer = new KafkaConsumer<>(props); TopicPartition tp = new TopicPartition(topicName, 0); List<TopicPartition> tps = Arrays.asList(tp); consumer.assign(tps); consumer.seekToBeginning(tps); ConsumerRecords<Long, String> consumerRecords = consumer.poll(30000); if (!consumerRecords.isEmpty()) { System.out.println("SUCCESS"); System.out.println(consumerRecords.iterator().next().value()); } consumer.close(); } } |
Порядок запуска:
- Запускаем сначала Kafka если она не запущена,
- Запускаем producer из этой статьи, если у Вас в топике test.topic нет сообщений
- Запускаем наш JavaKafkaConsumerExample
В итоге должны получить что-то вроде этого
1 2 3 4 5 |
SUCCESS example message [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-test.group-1, groupId=test.group] Discovered group coordinator icedev:9092 (id: 2147483647 rack: null) Process finished with exit code 0 |
Строчка, означающая успешное выполнение:
1 |
SUCCESS |
Также consumer вывел нам сообщение которое он прочитал — «example message».
Что-то пошло нет так? Пишите в этот телеграмм канал — вместе разберемся.
Параметры Java Kafka consumer example
Основных параметра в Java Kafka Consumer три:
- String server = «localhost:9092»; — адрес сервера Kafka. Предполагается что настройки Kafka из коробки Вы не меняли, поэтому сервер на локальной машине.
- String topicName = «test.topic»; — название топика куда писал producer и откуда читает наш conusmer
- String groupName = «test.group»; — название группы из которой читает наш consumer
Резюме
Это краткая статья, нацелена на проверку работы Java Kafka Consumer Example. Т.е. на проверку возможности чтения сообщений из Kafka. Минимум объяснений, архитектуры и настроек необходимых для работы из коробки. Хотите более подробную информацию по Kafka — Вам сюда.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.