В предыдущей статье мы рассмотрели базовые понятия, поверхностно разобрали механизмы безопасности и узнали, в каком виде они есть в Kafka.
В этой статье мы рассмотрим настройки безопасности Kafka и опишем конфигурацию, с которой будем дальше экспериментировать.
Предыдущая статья.Следующая статья.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.
Настройки безопасности Kafka
Прежде всего напомним понятия:
- аутентификация — проверка что клиент является тем за кого себя выдает
- авторизация — проверка прав клиента на доступ к ресурсу
В предыдущей статье мы разобрали три протокола аутентификации в Kafka.
- Без аутентификации
- SSL
- SASL
При этом канал может быть защищен SSL.
В конфигурации сервера Kafka, выбранный протокол задается с помощью опций, следующим образом:
- Без аутентификации — по умолчанию, можно ничего не указывать
- SSL — необходимо в конфиге сервера и клиента указать
1security.protocol=SSL - SASL — необходимо в конфиге сервера и клиента указать:
если без SSL:
1security.protocol=SASL_PLAINTEXT
если c SSL:
1security.protocol=SASL_SSL
А затем нам необходимо уточнить SASL механизм аутентификации (мы рассмотрим только два):
Для наглядности приведем диаграмму с настройками выбора
Помните, что security.protocol=SASL_SSL — это аутентификация протоколами SASL по каналу, защищенному SSL. А security.protocol=SSL — это аутентификация с помощью SSL по каналу защищенному SSL.
Важно: если указать на сервере security.protocol=SASL_SSL, но не указать sasl.enabled.mechanism, то мы получим ошибку при запуске сервера:
1 java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka configОднако, такая же ошибка может выскочить, если мы не указали serviceName в конфигурации JAAS клиента (с JAAS начнем знакомиться в следующей статье)
Затем на сервере указываем адрес и порт и протокол аутентификации:
listeners=[security_protocol]://[domain]:[port]
advertised.listeners=[security_protocol]://[domain]:[port]
Где [port], [domain] — порт и домен на котором Kafka будет работать с протоколом аутентификации [security_protocol]. При этом [security_protocol] — это то что Вы писали в свойстве security.protocol на сервере.
Например:
1 2 |
listeners=SASL_PLAINTEXT://localhost:9093 advertised.listeners=SASL_PLAINTEXT://localhost:9093 |
Kafka умеет работать с несколькими протоколами аутентификации одновременно, но на разных портах и IP, для этого просто необходимо через запятую в listeners, например:
12 listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093,SSL://localhost:9094advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093,SSL://localhost:9094Kafka также умеет работать с несколькими SASL механизмами, для этого их достаточно перечислить в sasl.enabled.mechanisms через запятую.
В Kafka можно выделить два типа соединений:
- Соединения с обычными клиентами
- Межброкерное взаимодейтсвие. Т.е. взаимодействие между узлами Kafka. Зачем это нужно? Kafka — это масштабируемая отказоустойчивая система. Вкратце. Для обеспечения масштабируемости и отказоустойчивости Kafkа запускают на нескольких серверах. Грубо говоря, чтобы дублировать сообщения. Если один сервер выйдет из строя, то его заменит другой. В этом случае, каждый сервер Kafka должен уметь общаться с остальными, чтобы получать копии сообщений и держать актуальное состояние! Это и есть межброкерное или inter-broker взаимодействие.
В listeners описываются параметры всех соединений, как для обычных соединений так и для соединений межброкерного взаимодействия!
Вдаваться в подробности зачем нужен advertised.listeners мы пока не будем. Просто примем за правило что advertised.listeners дублирует listeners.
В наших статьях мы занимаемся настройкой прежде всего защиты соединения между внешними клиентами и Kafka! А межброкерное взаимодействие мы пока оставляем как есть, т.е. PLAINTEXT! Поэтому у нас всегда будет описано одно соединение с PLAINTEXT и одно дополнительное для наших клиентов, если оно отличается от PLAINTEXT.
1 2 |
listeners=PLAINTEXT://localhost:9092,еще одно соединение если отличается от PLAINTEXT advertised.listeners=PLAINTEXT://localhost:9092,еще одно соединение если отличается от PLAINTEXT |
Если Kafka не найдет описание соединения для межброкерного взаимодействия в listeners, то может быть такая ошибка:
1 java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are SASL_PLAINTEXTТут упоминается SASL_PLAINTEXT. Дело в том что протокол межброкерного взаимодействия указывается отдельно. А если не указан, то ставится по умолчанию PLAINTEXT. Поэтому в listeners и должно быть описано одно соединение с PLAINTEXT. В данном случае пользователь про это забыл и указал только соединение SASL_PLAINTEXT.
Пока что, в наших статьях, мы будем настраивать защиту именно для внешних клиентов!
Помимо настроек выбора протокола аутентификации и механизма, нам еще потребуется указать сами настройки для каждого конкретного механизма. Их мы будем разбирать на конкретных примерах в следующих статьях.
Конфигурация для работы
Работать мы будем с Kafka 2.6. Установка и настройка выходит за рамки данной статьи.
Приведем конфигурацию без аутентификации на которой будем экспериментировать.
Дальше в статьях в качестве сервера будет фигурировать localhost, соответственно Вы должны заменять его на свой, , если нужно.
Свойства для сервера:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true advertised.host.name=localhost listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:9092 |
PLAINTEXT работает у Kafka по умолчанию, поэтому нет необходимости указывать какой протокол выбран. По этой причине мы не отметили на изображении выше опцию и не указали ее в настройках сервера или клиента:
1 security.protocol=PLAINTEXT
Мы не стали добавлять новое соединение в listeners на порту 9093, поскольку оно будет отличаться только портом. Т.е. банально бы дублировало первое соединений по протоколу и IP или имени. Но если все же мы бы добавили соединение то получили бы от Kakfa такую ошибку:
1 |
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different name, listeners: PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093 |
Тестировать соединение мы будем с помощью клиента написанного на Java. Клиент запускает отправку сообщений в топик stats.store.
Для клиента создайте maven проект на Java, пропишите туда зависимость
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.6.0</version> </dependency> |
И создайте класс SimpleProducer следующего содержания:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Map; import java.util.Properties; public class SimpleProducer { private static Producer<Long, String> createProducer(Map<String, String> inProps, String server) { final Properties props = new Properties(); props.putAll(inProps); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", "100"); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final Producer<Long, String> producer = new KafkaProducer<>(props); return producer; } static void runProducer(Map<String, String> inProps, String server, String topicName) throws InterruptedException { final Producer<Long, String> producer = createProducer(inProps, server); for (int i = 0; i < 10000000; i++) { try { RecordMetadata meat = (RecordMetadata) producer.send(new ProducerRecord(topicName, "msg" + i)).get(); if (!meat.hasOffset()) { throw new Exception("Record has not posted to topic"); } else { System.out.println("Message sends " + i); } } catch (Exception e) { e.printStackTrace(); } } producer.close(); System.out.println("DONE"); } } |
Затем создайте класс Helper (работаем мы с JDK 1.8, а там нет некоторых удобных фич, типа Map.of как в JDK 1.9, поэтому мы их добавляем самостоятельно):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
package com.blockwit.kafka.security.examples; import java.util.HashMap; import java.util.Map; public class Helper { // instead of JDK 1.9 Map.of public static Map<String, String> of(String... args) { Map<String, String> map = new HashMap<>(); for (int i = 0; i < args.length / 2; i++) map.put(args[i * 2], args[i * 2 + 1]); return map; } } |
Запускать клиент мы будем так (не забывайте заменять localhost на свой, если нужно):
1 2 3 4 5 6 7 8 9 10 11 12 |
package com.blockwit.kafka.security.examples; import java.util.Collections; public class SimpleProducer_PLAINTEXT { public static void main(String[] args) throws InterruptedException { SimpleProducer.runProducer(Collections.emptyMap(), "localhost:9092", "stats.store"); } } |
Запустите Kafka, а затем клиент! Если все успешно, то клиент ознаменует успешный процесс отправки сообщений в топик следующим текстом:
1 2 3 4 5 6 7 |
Message sends 1 Message sends 2 Message sends 3 Message sends 4 Message sends 5 Message sends 6 ... |
Резюме
В этой статье мы узнали как выбирать протокол и механизм аутентификации в конфигурациях клиента и сервера Kafka. Настроили Kafka на работу без аутентификации и запустили клиент, который отправляет сообщения в топик без аутентификации.
В следующей статье мы узнаем как настраивать SASL_PLAINTEXT и со встроенным механизмом PLAIN. Проще говоря настроим в Kafka самую простую встроенную аутентификацию!
Предыдущая статья.Следующая статья.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.