В предыдущей статье мы узнали, как написать свою собственную логику аутентификации для Kafka PLAIN. Это был класс CustomAuthenticationCallbackHandler, который запускался Kafka. Вся логика заключалась в том, что мы сверяли предоставленный нам пароль и логин с тем, который записан в нашем классе.
Но, как Вы понимаете, жестко задавать пароли в классе небезопасно и такой подход лишен гибкости. Проще говоря, так в промышленном коде делать нельзя!
Репозиторий примера к текущей статье — https://github.com/BlockWit/esh-kse-13 .
Предыдущая статья. Следующая статья.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.
Задача
Давайте представим реальную задачу, в которой нам потребуется кастомный аутентификатор. Например, когда у нас пользователи должны аутентифицироваться через Active Directory сервер. Т.е. с помощью LDAP. В этом случае логика работы такая:
- Клиент пытается присоединиться к Kafka и отдает ей логин и пароль
- Kafka отправляет логин и пароль кастомному аутентификатору
- Кастомный аутентификатор читает из настроек адрес сервера LDAP
- Кастомный аутентификатор соединяется с Active Directory сервером и отправляет запрос на проверку логина и пароля
- Active Directory сервер отправляет ответ о результате проверки
- Кастомный аутентификатор отдает результат Kafka
Достать и сконфигурировать Active Directory сервер не каждому под силу. Да и слишком сложно для нашего примера. Поэтому мы упростим задачу. Вместо Active Directory сервера мы будем использовать простой самописный сервер, в котором есть учетные данные для admin и для alice. Не бойтесь, писать Вам его не придется. Он уже есть в репозитории проекта.
В итоге, что нам нужно сделать:
- Добавить опцию в JAAS конфигурацию, в которой будет указан адрес сервера с учетками
- Научить кастомный аутентификатор читать опцию из конфигурации JASS
- Научить кастомный аутентификатор соединяться с сервером учетных данных и на основе ответа от сервера принимать решение
Чтение из конфига
Пусть опция, которая отвечает за адрес сервера аутентификации будет называться auth_server. Сервер будем запускать на локальной машине на порту 8001. Добавим опцию к конфигурации JAAS конфига Kafka из предыдущей статьи. Тогда конфиг Kafka будет выглядеть так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true advertised.host.name=localhost listeners=SASL_PLAINTEXT://localhost:9093 advertised.listeners=SASL_PLAINTEXT://localhost:9093 security.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ auth_server="http://localhost:8001" \ username="admin" \ password="admin-secret"; listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.blockwit.kafka.security.examples.CustomAuthenticationCallbackHandler |
Не забывайте менять localhost на свой, если это необходимо
Давайте теперь посмотрим, где и как читать JAAS конфигурацию в нашем аутентификаторе. Посмотрите на код аутентификатора из предыдущей статьи. Там есть пустой метод configure, в который и передается список JAAS конфигураций jaasConfigEntries:
1 2 3 4 5 6 |
... @Override public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { } .... |
В jaasConfigEntries могут храниться несколько конфигураций для разных механизмов и протоколов. Как узнать нужную конфигурацию? Очень просто. У AppConfigurationEntry есть метод getLoginModuleName. Он и возвращает название класса конфигурации. В нашем случае указан класс:
1 |
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule |
Поэтому сравнивать мы будем так:
1 |
entry.getLoginModuleName().equals(PlainLoginModule.class.getName() |
И, если найдем удовлетворяющую этому условию entry, то дальше прочесть нужную нам опцию можно так:
1 |
entry.getOptions().get(OPTION_NAME); |
Где OPTION_NAME — название опции.
Давайте в наш класс добавим поле
1 |
private String authServer; |
Оно будет хранить прочитанную опцию. Теперь мы знаем, как написать код, который читает нашу опцию в методе configure:
1 2 3 4 5 |
jaasConfigEntries.stream() .filter(t -> t.getLoginModuleName().equals(PlainLoginModule.class.getName())) .findAny() .map(t -> t.getOptions()) .ifPresent(t -> authServer = t.get("auth_server").toString()); |
Отлично! Осталось только написать код, который соединяется с сервером по адресу authServer и отправляет запрос на проверку пользователя. Для этого мы используем Apache HTTP Client. Мы будем отправлять такой GET запрос на сервер с учетками:
http://localhost:8001/?username=ИМЯ_ПОЛЬЗОВАТЛЯ&password=ПАРОЛЬ
А сервер будет отвечать 200 Ok — если у него такая учетка есть. Для того чтобы написать логику клиента нам потребуется зависимость Apache HttpClient. Добавьте ее в в dependencies в pom.xml:
1 2 3 4 5 |
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> |
Теперь давайте напишем логику метода authenticate:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
protected boolean authenticate(String username, String password) { if (username == null || password == null) return false; log.info("Try custom auth for user " + username); CloseableHttpClient httpClient = HttpClients.createDefault(); String url = authServer + "/?username=" + username + "&password=" + password; HttpGet request = new HttpGet(url); try { CloseableHttpResponse response = httpClient.execute(request); int status = response.getStatusLine().getStatusCode(); response.close(); return status == 200; } catch (IOException e) { e.printStackTrace(); return false; } finally { try { httpClient.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } |
Полный код будет теперь выглядеть так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
package com.blockwit.kafka.security.examples; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; import javax.security.auth.callback.Callback; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; import java.io.IOException; import java.util.List; import java.util.Map; @Slf4j public class CustomAuthenticationCallbackHandler implements AuthenticateCallbackHandler { String authServer; @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { String username = null; for (Callback callback : callbacks) { if (callback instanceof NameCallback) { username = ((NameCallback) callback).getDefaultName(); } else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; plainCallback.authenticated(authenticate(username, String.valueOf(plainCallback.password()))); } else throw new UnsupportedCallbackException(callback); } } protected boolean authenticate(String username, String password) { if (username == null || password == null) return false; log.info("Try custom auth for user " + username); CloseableHttpClient httpClient = HttpClients.createDefault(); String url = authServer + "/?username=" + username + "&password=" + password; HttpGet request = new HttpGet(url); try { CloseableHttpResponse response = httpClient.execute(request); int status = response.getStatusLine().getStatusCode(); response.close(); return status == 200; } catch (IOException e) { e.printStackTrace(); return false; } finally { try { httpClient.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } @Override public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { jaasConfigEntries.stream() .filter(t -> t.getLoginModuleName().equals(PlainLoginModule.class.getName())) .findAny() .map(t -> t.getOptions()) .ifPresent(t -> authServer = t.get("auth_server").toString()); } @Override public void close() { } } |
Теперь можете перезапустить Kafka и убедиться, что в логах Kafka нет ошибок (нет строк со словом ERROR).
Проверка работы аутентификатора
Для проверки работы примера можете воспользоваться репозиторием https://github.com/BlockWit/esh-kse-13
Для того, чтобы наш пример заработал, необходимо (команды справедливы, если ставили Kafka по этой статье):
- Выполнить mvn install. Проект соберется и появится файл target/esh-kse-13-jar-with-dependencies.jar
- Скопировать наш файл в папку с плагинами Kafka
1sudo cp target/esh-kse-13-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/kafka/libs/esh-kse-example.jar - Назначить Kafka владельцем нашего плагина, чтобы Kafka имела к нему доступ
1sudo chown kafka:kafka /opt/kafka/libs/esh-kse-example.jar
В составе репозитория есть скрипт deploy.sh, который выполняет компиляцию, сборку, копирование конфига из примера и перезапуск Kafka. Но делает он это при условии, что Вы запускаете скрипт из той же директории, где он лежит и, что настройка Kafka выполнена по этой статье. Обратите внимание, что скрипт меняет конфиг Kafka! Скрипт можно выполнять только после запуска сервера учеток!
И если Вы уже обновили конфигурацию, как указано в предыдущей главе, то не спешите перезапускать Kafka. Сначала нам нужно запустить наш сервер учеток. В репозитории он находится тут https://github.com/BlockWit/esh-kse-13/blob/master/src/test/java/com/blockwit/kafka/security/examples/credserver/CredentialsServer.java.
Это обычный запускаемый Java класс. Поэтому Вы его без труда запустите в своей среде разработки. Запуск сервера ознаменуется строкой:
1 |
[main] INFO com.blockwit.kafka.security.examples.credserver.CredentialsServer - Server started on port 8001 |
Проверьте, что учетки работают:
1 |
curl "http://localhost:8001/?username=alice&password=alice-secret" -v |
В ответе должен содержаться код 200.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
* Trying 127.0.0.1:8001... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8001 (#0) > GET /?username=alice&password=alice-secret HTTP/1.1 > Host: localhost:8001 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < Date: Fri, 20 Nov 2020 18:31:35 GMT < Transfer-encoding: chunked < * Connection #0 to host localhost left intact |
Такую же процедуру выполните для пользователя admin с паролем admin-secret.
Теперь очистите консоль сервера и перезапустите Kafka. Проверьте, что в логах Kafka нет ERROR. Через какое-то время в консоли сервера появится сообщение:
1 |
[pool-1-thread-1] INFO com.blockwit.kafka.security.examples.credserver.CredentialsServer - User admin successfully bind |
Это значит что контроллер Kafka успешно аутентифицировался через наш кастомный аутентификатор.
Теперь можете попробовать запустить клиент с учеткой Алисы. В репозитории он тут — https://github.com/BlockWit/esh-kse-13/blob/master/src/test/java/com/blockwit/kafka/security/examples/SimpleProducerTest_SASL_SSL_PLAIN.java
Код:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
package com.blockwit.kafka.security.examples; public class SimpleProducerTest_SASL_SSL_PLAIN { public static void main(String[] args) throws InterruptedException { SimpleProducer.runProducer(Helper.of( "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required serviceName=\"Kafka\" username=\"alice\" password=\"alice-secret\";", "security.protocol", "SASL_PLAINTEXT", "sasl.mechanism", "PLAIN"), "localhost:9093", "test.topic"); } } |
После запуска клиента должна начаться отправка сообщений:
1 2 3 4 5 6 7 8 9 |
.... Message sends 1 Message sends 2 Message sends 3 Message sends 4 Message sends 5 Message sends 6 Message sends 7 ..... |
А в консоли сервера должно появиться сообщение об успешной аутентификации Алисы:
1 |
[pool-1-thread-1] INFO com.blockwit.kafka.security.examples.credserver.CredentialsServer - User alice successfully bind |
Резюме
Поздравляю! Мы научились не только писать свою атутентиифкацию, но и читать JAAS конфиг и проверять учетки на стороннем сервере!
Самое время заняться собственной авторизацией, но это уже в следующей статье.
Репозиторий примера к текущей статье — https://github.com/BlockWit/esh-kse-13 .
Предыдущая статья. Следующая статья.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.