В предыдущей статье мы научились читать настройки JAAS в Kafka PLAIN самописном аутентификаторе и даже проверяли учетные данные на внешнем сервере.
Пришло время узнать, как писать и свой авторизатор.
Репозиторий примера к текущей статье — https://github.com/BlockWit/esh-kse-14 .
Предыдущая статья. Следующая статья.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.
Задача
В предыдущей статье мы уже написали кастомный аутентификатор, который соединяется с сервером с учетными данными и отправляет ему запрос на проверку аккаунта клиента.
В этой статье мы добавим в нашу схему авторизатор, который будет подключаться к тому же серверу и запрашивать группы пользователей, в которых состоит пользователь. А доступ к топикам будем давать группам пользователей.
Например, если мы дадим группе ALICE_TOPIC_WRITE_GROUP доступ на запись в топик topic.alice, то все пользователи, которые состоят в группе ALICE_TOPIC_WRITE_GROUP смогут писать в топик topic.alice. Т.е. задача нашего авторизатора получить список групп по пользователю у сервера с учетными данными и проверить, имеет ли право пользователь выполнять действия на основе списка ACL.
В итоге, что нам нужно сделать:
- Написать кастомный авторизатор
- Подсунуть его Kafka
- Указать в конфиге Kafka, что будем использовать свой авторизатор
Реализация
Сервер с учетными данными в репозитории к статье https://github.com/BlockWit/esh-kse-14/blob/master/src/test/java/com/blockwit/kafka/security/examples/credserver/CredentialsServer.java уже умеет отдавать группы по имени пользователя и пароля. Делает он это по URL:
/groups?username=username&password=password
И возвращает группы, разделенные запятой, для соответствующего пользователя.
Поскольку мы будем пользоваться не чисто своей реализацией авторизатора, а реализацией AclAuthorizer, то нам достаточно заменить имя пользователя, которое приходит в авторизатор, на список групп. Мы ведь будем устанавливать ACL списки по именам групп.
Есть только небольшая загвоздка. В авторизатор приходит имя пользователя, который пытается получить права. А вот пароль не приходит! А нам нужно соединяться с сервером учетных данных по логину и паролю, чтобы получить группы.
Есть одно решение. Аутентификатор, который мы писали в предыдущей статье, создается каждый раз, когда авторизуется новый пользователь. Давайте сделаем статический Map, в который будем сохранять имя пользователя и пароль всякий раз, когда пользователь успешно прошел аутентификацию.
1 |
public static Map<String, String> authenticatedUsers = new ConcurrentHashMap<>(); |
А если пользователей много? Бесконтрольное увеличение нашего Map’а крайне нежелательно. Чтобы удалять пользователей по мере необходимости из Map, мы еще добавим поле username. Это поле будет хранить имя пользователя для текущего экземпляра аутентификатора. А если Вы внимательно посмотрите на класс аутентификатора, то увидите там метод close. В нем мы и будем удалять нашего пользователя из Map.
1 2 3 4 |
@Override public void close() { authenticatedUsers.remove(username); } |
Отлично. Теперь, чтобы получить пароль по имени пользователя, авторизатор будет смотреть в публичный статичный Map нашего аутентификатора примерно следующим образом:
1 |
String password = CustomAuthenticationCallbackHandler.authenticatedUsers.get(username); |
Давайте приведем код измененного аутентификатора:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
package com.blockwit.kafka.security.examples; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; import org.apache.kafka.common.security.plain.PlainLoginModule; import javax.security.auth.callback.Callback; import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j public class CustomAuthenticationCallbackHandler implements AuthenticateCallbackHandler { public static Map<String, String> authenticatedUsers = new ConcurrentHashMap<>(); String authServer; String username; @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { String username = null; for (Callback callback : callbacks) { if (callback instanceof NameCallback) { username = ((NameCallback) callback).getDefaultName(); } else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; plainCallback.authenticated(authenticate(username, String.valueOf(plainCallback.password()))); } else throw new UnsupportedCallbackException(callback); } } protected boolean authenticate(String username, String password) { if (username == null || password == null) return false; log.info("Try custom auth for user " + username); CloseableHttpClient httpClient = HttpClients.createDefault(); String url = authServer + "/?username=" + username + "&password=" + password; HttpGet request = new HttpGet(url); try { CloseableHttpResponse response = httpClient.execute(request); int status = response.getStatusLine().getStatusCode(); response.close(); if (status == 200) { this.username = username; authenticatedUsers.put(username, password); return true; } return false; } catch (IOException e) { e.printStackTrace(); return false; } finally { try { httpClient.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } @Override public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { jaasConfigEntries.stream() .filter(t -> t.getLoginModuleName().equals(PlainLoginModule.class.getName())) .findAny() .map(t -> t.getOptions()) .ifPresent(t -> authServer = t.get("auth_server").toString()); } @Override public void close() { authenticatedUsers.remove(username); } } |
Отлично. Одну проблему решили. Теперь у нас есть все для того, чтобы написать авторизатор.
Авторизатор у нас наследуется от kafka.security.authorizer.AclAuthorizer. Даваейте назовем его CustomAclAuthorizer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
package com.blockwit.kafka.security.examples; import kafka.security.authorizer.AclAuthorizer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.server.authorizer.Action; import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.AuthorizationResult; import java.util.List; import java.util.Map; @Slf4j public class CustomAclAuthorizer extends AclAuthorizer { @Override public List<AuthorizationResult> authorize(AuthorizableRequestContext authRequestContext, List<Action> actions) { super.authorize(authRequestContext, actions); } @Override public void configure(Map<String, ?> javaConfigs) { super.configure(javaConfigs); } } |
Как и у аутентификатора, у авторизатора есть возможность читать конфиг. Но тут читается полный конфиг Kafka. Давайте условимся, что в общем случае адрес сервера для предоставления групп может отличаться от адреса сервера для проверки учетных данных. Введем в конфиг сервера Kafka опцию с адресом сервера для получения групп. Пусть она называется так:
custom.acl.authorizer.auth.server
Давайте добавим поле authServer в наш авторизатор и в методе чтения настроек прочтем адрес сервера групп:
1 2 3 4 5 |
@Override public void configure(Map<String, ?> javaConfigs) { super.configure(javaConfigs); this.authServer = javaConfigs.get("custom.acl.authorizer.auth.server").toString(); } |
Обратите внимание, что мы сначала вызываем у AclAuthorizer чтение настроек, поскольку мы наследуемся от AclAuthorizer и нам нужен его функционал.
Осталось написать метод замены имен пользователя на группы. Посмотрите метод authorize. Он получает на вход список actions — список операций, которые запрашивает пользователь. И возвращает список AuthorizationResult. AuthorizationResult может принимать два значения: либо AuthorizationResult.ALLOW, либо AuthorizationResult.DENIED. Возвращаемый список полностью соответствует списку actions. Т.е. каждому action’у соответствует свой AuthorizationResult.
Начнем с алгоритма:
- Получаем имя пользователя
- Если он суперпользователь, то возвращаем список AuthorizationResult со всеми разрешениями.
- Если это не суперпользователь, то получаем пароль
- По паролю и логину получаем у сервера учетных данных список групп, в которых состоит пользователь
- Для каждой группы вызываем метод authorize у AclAuthorizer
- Если хотя бы для одной из групп, в которой состоит пользователь, действие разрешено, то возвращаем ALLOW.
- Возвращаем получившийся список разрешений
Приступим к реализации! Проверка, не является ли наш пользовтаель суперпользователем, уже написана в AclAuthhorizer. Нам нужно только ее правильно вызвать:
1 2 |
KafkaPrincipal sessionPrincipal = authRequestContext.principal(); isSuperUser(sessionPrincipal); |
Давайте создадим список разрешений, соответствующих списку actions, и, если пользователь окажется суперпользователем, то заполним весь список ALLOW и вернем его.
1 2 3 4 5 6 7 8 |
KafkaPrincipal sessionPrincipal = authRequestContext.principal(); List<AuthorizationResult> authResults = new ArrayList<>(); if (isSuperUser(sessionPrincipal)) { actions.forEach(a -> authResults.add(AuthorizationResult.ALLOWED)); return authResults; } |
Теперь давайте получим по имени пользователя пароль:
1 2 |
String username = sessionPrincipal.getName(); String password = CustomAuthenticationCallbackHandler.authenticatedUsers.get(username); |
Подключимся к серверу и вернем список групп, но перед этим заполним наш список разрешений DENIED. В случае ошибки обращения к серверу, мы вернем этот список сразу. Итак, заполнение списка и получение групп пользователя с сервера:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
actions.forEach(a -> authResults.add(AuthorizationResult.DENIED)); String username = sessionPrincipal.getName(); String password = CustomAuthenticationCallbackHandler.authenticatedUsers.get(username); CloseableHttpClient httpClient = HttpClients.createDefault(); String url = authServer + "/groups?username=" + username + "&password=" + password; HttpGet request = new HttpGet(url); String[] groups = null; try { CloseableHttpResponse response = httpClient.execute(request); int status = response.getStatusLine().getStatusCode(); response.close(); if (status == 200) { String content = EntityUtils.toString(response.getEntity()); groups = content.split(","); } else return authResults; } catch (IOException e) { e.printStackTrace(); return authResults; } finally { try { httpClient.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } |
Отлично. Группы есть. Теперь нам нужно на каждую группу вызвать метод authorize так, будто мы его вызывали для пользователя. К сожалению, метод authorize на вход не принимает имя пользователя или, в нашем случае, группы. А принимает он AuthorizableRequestContext. Тогда давайте будем создавать AuthorizableRequestContext каждый раз при вызове authorize. При этом у нашего AuthorizableRequestContext мы заменим только одно поле — имя пользователя на имя группы:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
RequestContext requestContext = (RequestContext) authRequestContext; for (int i = 0; i < groups.length; i++) { KafkaPrincipal groupKafkaPrincipal = new KafkaPrincipal(username, groups[i]); RequestContext groupRequestContext = new RequestContext( requestContext.header, requestContext.connectionId, requestContext.clientAddress, groupKafkaPrincipal, requestContext.listenerName, requestContext.securityProtocol, requestContext.clientInformation); List<AuthorizationResult> groupAuthResult = super.authorize(groupRequestContext, actions); for (int j = 0; i < groupAuthResult.size(); j++) { if (authResults.get(j) == AuthorizationResult.DENIED && groupAuthResult.get(j) == AuthorizationResult.ALLOWED) authResults.set(j, AuthorizationResult.ALLOWED); } } |
Остается только вернуть список authResults. Давайте посмотрим полный код нашего авторизатора:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
package com.blockwit.kafka.security.examples; import kafka.security.authorizer.AclAuthorizer; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.server.authorizer.Action; import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.AuthorizationResult; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @Slf4j public class CustomAclAuthorizer extends AclAuthorizer { String authServer; @Override public List<AuthorizationResult> authorize(AuthorizableRequestContext authRequestContext, List<Action> actions) { KafkaPrincipal sessionPrincipal = authRequestContext.principal(); List<AuthorizationResult> authResults = new ArrayList<>(); if (isSuperUser(sessionPrincipal)) { actions.forEach(a -> authResults.add(AuthorizationResult.ALLOWED)); return authResults; } actions.forEach(a -> authResults.add(AuthorizationResult.DENIED)); String username = sessionPrincipal.getName(); String password = CustomAuthenticationCallbackHandler.authenticatedUsers.get(username); CloseableHttpClient httpClient = HttpClients.createDefault(); String url = authServer + "/groups?username=" + username + "&password=" + password; HttpGet request = new HttpGet(url); String[] groups = null; try { CloseableHttpResponse response = httpClient.execute(request); int status = response.getStatusLine().getStatusCode(); response.close(); if (status == 200) { String content = EntityUtils.toString(response.getEntity()); groups = content.split(","); } else return authResults; } catch (IOException e) { e.printStackTrace(); return authResults; } finally { try { httpClient.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } RequestContext requestContext = (RequestContext) authRequestContext; for (int i = 0; i < groups.length; i++) { KafkaPrincipal groupKafkaPrincipal = new KafkaPrincipal(username, groups[i]); RequestContext groupRequestContext = new RequestContext( requestContext.header, requestContext.connectionId, requestContext.clientAddress, groupKafkaPrincipal, requestContext.listenerName, requestContext.securityProtocol, requestContext.clientInformation); List<AuthorizationResult> groupAuthResult = super.authorize(groupRequestContext, actions); for (int j = 0; i < groupAuthResult.size(); j++) { if (authResults.get(j) == AuthorizationResult.DENIED && groupAuthResult.get(j) == AuthorizationResult.ALLOWED) authResults.set(j, AuthorizationResult.ALLOWED); } } return authResults; } @Override public void configure(Map<String, ?> javaConfigs) { super.configure(javaConfigs); this.authServer = javaConfigs.get("custom.acl.authorizer.auth.server").toString(); } } |
Напоминаю, что весь код доступен в репозитории для этой статьи: https://github.com/BlockWit/esh-kse-14.
Подготовка к тестированию
Для подготовки к тестированию нам сначала нужно поправить конфигурацию сервера Kafka, а именно:
- Добавить к конфигурации прошлой статьи конфигурацию ACL и указать наш авторизатор
1authorizer.class.name=com.blockwit.kafka.security.examples.CustomAclAuthorizer - Добавить суперпользователя ACL
1super.users=User:admin - Добавить настройку нашего авторизатора, в которой указан сервер учетных данных
1custom.acl.authorizer.auth.server=http://localhost:8001
Теперь конфиг будет выглядеть так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true advertised.host.name=localhost listeners=SASL_PLAINTEXT://localhost:9093 advertised.listeners=SASL_PLAINTEXT://localhost:9093 security.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ auth_server="http://localhost:8001" \ username="admin" \ password="admin-secret"; listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.blockwit.kafka.security.examples.CustomAuthenticationCallbackHandler authorizer.class.name=com.blockwit.kafka.security.examples.CustomAclAuthorizer super.users=User:admin custom.acl.authorizer.auth.server=http://localhost:8001 |
Не забывайте менять localhost на Ваш при необходимости. Также конфигурация сервера Kafka есть в репозитории к статье — https://github.com/BlockWit/esh-kse-14/blob/master/src/test/resources/server.properties
Теперь для тестирования нам нужно раздать доступы группам. Но прежде всего я хочу рассказать про настройки сервера учетных данных. Там сейчас стоит такая конфигурация:
- Пользователь alice, пароль alice-secret, группы:
- STATS_VIEWERS
- STATS_WRITERS
- Пользователь robin , пароль robin-secret, группы:
- STATS_VIEWERS
Для экспериментов возьмем топик test.topic. Пусть все, кто в группе STATS_WRITERS, могут писать в топик, а все, кто в группе STATS_VIEWERS, могут читать из топика. Для чтения будем использовать группу test.group. Тогда, чтобы сконфигурировать списки ACL, нужно выполнить команды, описанные ниже. Но перед их выполнением убедитесь, что у Вас пустые списки ACL. Это делается командой:
1 |
sudo /opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list |
Итак, назначаем права:
- Даем доступ на чтение из топика test.topic группе STATS_VIEWERS
1sudo /opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:STATS_VIEWERS --operation READ --topic test.topic - Даем доступ на чтение из группы топика test.group группе пользователей STATS_VIEWERS
1sudo /opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:STATS_VIEWERS --operation READ --group test.group - Даем доступ на запись в топик test.topic и на создание топика группе STATS_WRITERS
1sudo /opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:STATS_WRITERS --operation WRITE --operation CREATE --topic test.topic
Теперь выведем все списки ACL командой
1 |
sudo /opt/kafka/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list |
Должно получиться:
1 2 3 4 5 6 7 |
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test.topic, patternType=LITERAL)`: (principal=User:STATS_VIEWERS, host=*, operation=READ, permissionType=ALLOW) (principal=User:STATS_WRITERS, host=*, operation=CREATE, permissionType=ALLOW) (principal=User:STATS_WRITERS, host=*, operation=WRITE, permissionType=ALLOW) Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test.group, patternType=LITERAL)`: (principal=User:STATS_VIEWERS, host=*, operation=READ, permissionType=ALLOW) |
Теперь нам нужно запустить сервер учетных данных. Вы можете сделать это из IDE. Сервер учетных данных — это обычный исполняемый java класс. Склонируйте репозиторий к статье и запустите класс src/test/com/blockwit/kafka/security/examples/credserver/CredentialsServer.java. Репозиторий к статье лежит тут — https://github.com/BlockWit/esh-kse-14.
После запуска сервер выдаст:
1 |
[main] INFO com.blockwit.kafka.security.examples.credserver.CredentialsServer - Server started on port 8001 |
Теперь нужно подсунуть Kafka наш авторизатор. Для этого необходимо (команды справедливы, если ставили Kafka по этой статье):
- Выполнить mvn install — тогда проект соберется и появится файл target/esh-kse-14-jar-with-dependencies.jar
- Скопировать наш файл в папку с плагинами Kafka
1sudo cp target/esh-kse-14-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/kafka/libs/esh-kse-example.jar
- Назначить Kafka владельцем нашего плагина. Чтобы Kafka имела доступ
1sudo chown kafka:kafka /opt/kafka/libs/esh-kse-example.jar
В составе репозитория есть скрипт deploy.sh, который выполняет компиляцию, сборку, копирование конфига из примера и перезапуск Kafka. Но делает он это, при условии, что Вы запускаете скрипт из той директории, где он лежит и, что настройка Kafka выполнена по этой статье. Обратите внимание, что скрипт меняет конфиг Kafka! Скрипт можно выполнять только после запуска сервера учеток!
Теперь перезапустите Kafka. Если все отлично, то в консоли сервера учеток Вы увидите
1 |
[pool-1-thread-1] INFO com.blockwit.kafka.security.examples.credserver.CredentialsServer - User admin successfully bind |
Теперь приступим к тестированию наших настроек ACL.
Тестирование
Запустите клиент отправки сообщения из репозитория — src/test/java/com/blockwit/kafka/security/examples/SimpleProducerTest_SASL_PLAINTEXT_PLAIN_ACL.java. В результате Вы должны получить:
1 2 3 4 5 6 7 8 9 |
.... Message sends 1 Message sends 2 Message sends 3 Message sends 4 Message sends 5 Message sends 6 Message sends 7 ..... |
А теперь запустите клиент приема сообщений src/test/java/com/blockwit/kafka/security/examples/SimpleProducerTest_SASL_PLAINTEXT_PLAIN_ACL.java. Результат работы — одно прочитанное сообщение:
1 2 3 |
... Consumer Record:.. ... |
После выполнения всех операций в консоли сервера учетных данных Вы увидите сообщения об успешной отправке списков групп для alice и robin:
1 2 |
[pool-1-thread-1] INFO com.blockwit.kafka.security.examples.credserver.CredentialsServer - User alice successfully take groups STATS_VIEWERS,STATS_WRITERS [pool-1-thread-1] INFO com.blockwit.kafka.security.examples.credserver.CredentialsServer - User robin successfully take groups STATS_VIEWERS |
Это значит, что наш кастомный авторизатор работает!
Резюме
Итак, мы умеем писать и свою реализацию аутентификатора, и свою реализацию авторизатора. Теперь для нас не составит никакой сложности реализовать любую схему безопасности в Kafka.
Репозиторий примера к текущей статье — https://github.com/BlockWit/esh-kse-14.
Предыдущая статья. Следующая статья.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.