В предыдущей статье мы узнали, как добавлять конфигурацию авторизации и аутентификации к консольному клиенту. Мы еще не перепробовали все механизмы аутентификации Kafka, однако, имеем представление о возможностях. Но, что если нам не хватает тех механизмов, которые предоставляет Kafka? Не проблема! Kafka предоставляет возможность делать свои механизмы аутентификации и авторизации.
В этой статье мы создадим свой простой механизм аутентификации и попробуем его в деле!
Репозиторий к этой статье — https://github.com/BlockWit/esh-kse-12.
Предыдущая статья. Следующая статья.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.
Подготовка к реализации кастомной аутентификации
Прежде чем писать кастомную аутентификацию, давайте подготовим конфигурацию сервера на которой будем тестировать.
Возьмем простую конфигурацию сервера из 3-ей статьи.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true advertised.host.name=localhost listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093 advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093 security.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_robin="robin-secret" \ user_alice="alice-secret"; |
Эта конфигурация содержит:
- Описание соединение с протоколом SASL без SSL с механизмом аутентификации PLAIN и Jaas конфигурацию для PLAIN.
- Соединение для межброкерного взаимодействия без аутентификации, т.е. PLAINTEXT
Давайте внесем кое-какие изменения:
- Уберем всех пользователей из JAAS конфигурации, кроме пользователя для межброкерного взаимодействия.
- Настроим соединение для межброкерного взаимодействия по протоколу SASL_PLAINTEXT.
Зачем мы это сделали?
Такая конфигурация позволит нам тестировать нашу кастомную аутентификацию без запуска клиента. Как мы уже знаем, в JAAS конфигурации есть пользователь для межброкерной коммуникации. Задается он с помощью username и password. Мы также знаем, что когда мы запускаем сервер Kafka, то помимо самого сервера, запускается контроллер — KafkaController (подробнее об этом тут). Этот контроллер будет пытаться соединиться с Kafka сервером с помощью указанного в JAAS пользователя username. И, если в логах Kafka не будет ошибок, то будем нашу кастомную атуентификацию считать работающей.
Если Вы устанавливали и настраивали Kafka по этой статье, то логи Kafka лежат тут — /tmp/kafka-logs/kafka.log . Иначе, чтобы узнать, где Kafka складывает логи, посмотрите описание сервиса /etc/systemd/syste/kafka.service.
Получим следующий конфиг:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true advertised.host.name=localhost listeners=SASL_PLAINTEXT://localhost:9093 advertised.listeners=SASL_PLAINTEXT://localhost:9093 security.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret"; |
Не забывайте менять localhost на Ваш хост, если это необходимо!
Отлично. Можете перезапустить Kafka и убедиться, что в логах Kafka нет ошибок (нет строк со словом ERROR).
Конфигурация у нас готова, теперь преступим к написанию нашей кастомной аутентификации.
Кастомная аутентификая
Наш аутентификатор будет выполнять простую задачу — он будет проверять, что имя клиента совпадает с «admin» а пароль с «admin-secret». Этого вполне достаточно для примера.
Чтобы сделать кастомную аутентификацию нужно:
- Создать класс, реализующий интерфейс AuthenticateCallbackHandler
- Подсунуть этот класс Kafka, чтобы Kafka знала где его искать
- Указать в конфигурации Kafka использовать наш класс
- Перезапустить Kafka
Начнем с создания класса.
Для работы создайте Java Maven проект и добавьте туда зависимости:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.16</version> <scope>provided</scope> </dependency> </dependencies> |
Тут у нас три зависимости:
- Kafka библиотека
- Slf4J — для удобного логирования
- Lombok — чтобы не писать лишнего кода (например, чтобы не создавать логер ручками мы просто прописываем аннотацию над классом)
Также в pom.xml нужно добавить следующий код:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> |
Этот код добавляет к конечному Jar все зависимые библиотеки. Дело в том, что наш jar будет подсовываться Kafka в том виде, в котором он есть. Поэтому Kafka не будет скачивать зависимости.
Теперь создадим наш класс аутентификатора:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package com.blockwit.kafka.security.examples; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import javax.security.auth.callback.Callback; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; import java.util.List; import java.util.Map; @Slf4j public class CustomAuthenticationCallbackHandler implements AuthenticateCallbackHandler { @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { String username = null; for (Callback callback : callbacks) { if (callback instanceof NameCallback) { username = ((NameCallback) callback).getDefaultName(); } else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; plainCallback.authenticated(authenticate(username, String.valueOf(plainCallback.password()))); } else throw new UnsupportedCallbackException(callback); } } protected boolean authenticate(String username, String password) { if (username == null || password == null) return false; log.info("Try custom auth for user: " + username + " -> " + password); return username.equals("admin") && password.equals("admin-secret"); } @Override public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { } @Override public void close() { } } |
Это шаблонный код. Вся логика сосредоточена в методе authenticate. Ничего сложного там нет. Как мы и хотели, мы сравниваем username с «admin» а password с «admin-secret» и возвращаем результат. Вы можете логику authenticate заменить на ту, которая Вам необходима. Например, Вы можете соединяться с LDAP сервером и проверять на нем, что предоставленные username и password валидны, и затем возвращать результат.
Отлично, с классом мы закончили. Теперь соберите проект. Для этого в корне проекта выполните
1 |
mvn install |
После того как maven соберет проект, он напишет где лежит готовый jar. Для нашего проекта это:
target/esh-kse-12-1.0-SNAPSHOT-jar-with-dependencies.jar
Теперь нам нужно сделать так, чтобы Kafka узнала о нашем jar. Для этого есть два пути:
- Указать в Classpath
- Скопировать в директорию плагинов Kafka. Если Вы устанавливали Kafka по этой статьей, то директория плагинов у нас тут /opt/kafka/libs/
Мы пойдем по второму пути и скопируем в директорию плагинов (директорию замените на свою)
1 |
sudo cp target/esh-kse-12-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/kafka/libs/esh-kse-example.jar |
И сделайте Kafka владельцем нашего jar, чтобы Kafka имела к нему доступ:
1 |
sudo chown kafka:kafka /opt/kafka/libs/esh-kse-example.jar |
Осталось указать в конфиге Kafka, что мы хотим использовать наш класс кастомной аутентификации:
1 |
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.blockwit.kafka.security.examples.CustomAuthenticationCallbackHandler |
Обратите внимание, что название опции содержит имя протокола — sasl_plaintext и механизма plain: listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class. В случае протокола SASL_SSL опция будет выглядеть по-другому: listener.name.sasl_ssl.plain.sasl.server.callback.handler.class
Теперь наш конфиг будет выглядеть так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true advertised.host.name=localhost listeners=SASL_PLAINTEXT://localhost:9093 advertised.listeners=SASL_PLAINTEXT://localhost:9093 security.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret"; listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.blockwit.kafka.security.examples.CustomAuthenticationCallbackHandler |
Запускаем Kafka и ищем в логах ERROR. Если их там нет, то наш кастомный механизм аутентификации прижился!
В составе репозитория есть скрипт deploy.sh, который выполняет компиляцию, сборку, копирование конфига из примера и перезапуск Kafka. Но делает он это при условии, что Вы запускаете скрипт из той директории, где он лежит, и что настройка Kafka выполнена по этой статье. Обратите внимание, что скрипт меняет конфиг Kafka!
А теперь проверим и клиент! Для этого достаточно взять клиент из этой статьи и заменить имя пользователя и пароль на «admin» и «admin-secret» соответственно:
1 2 3 4 5 6 7 8 9 10 11 12 |
package com.blockwit.kafka.security.examples; public class SimpleProducerTest_SASL_PLAINTEXT_PLAIN { public static void main(String[] args) throws InterruptedException { SimpleProducer.runProducer(Helper.of("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required serviceName=\"Kafka\" username=\"admin\" password=\"admin-secret\";", "sasl.mechanism", "PLAIN", "security.protocol", "SASL_PLAINTEXT"), "localhost:9093", "stats.store"); } } |
Запустим и увидим:
1 2 3 4 5 6 7 8 9 |
.... Message sends 1 Message sends 2 Message sends 3 Message sends 4 Message sends 5 Message sends 6 Message sends 7 ..... |
Наш клиент успешно аутентифицировался и отправляет сообщения!
Резюме
В этой статье мы узнали, как написать свою собственную аутентификацию. В следующей статье мы узнаем, как задавать настройки нашему аутентификатору.
Репозиторий к этой статье — https://github.com/BlockWit/esh-kse-12.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.