В этой статье я опишу, что такое межброкерное взаимодействие, почему для него всегда нужно описывать соединение и как я докатился до такой жизни, что мне пришлось об этом узнать)
Эта статья важно к прочтению, если Вы настраиваете аутентификацию в Kafak, и обязательно, если Вы настраиваете авторизацию Kafka и в логах вы увидели что-то из этого списка:
- CLUSTER_AUTHORIZATION_FAILED
- ERROR [KafkaApi-0] Error when handling request: clientId=0, correlationId=0, api=UPDATE_METADATA,…
- java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are ….
- java.lang.IllegalArgumentException: requirement failed: sasl.mechanism.inter.broker.protocol must be included in sasl.enabled.mechanisms when SASL is used for inter-broker communication
Межброкерное взаимодействие у Kafka — это общение внутри инфраструктуры Kafka кластера.
А именно:
- Общение между ZooKeeper и Kafka
- Общение между серверами Kafka в составе кластера, т.е. между брокерами
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.
А есть ли межброкерное взаимодействие с одним брокером?
В этой части описано, как я пришел к выводу о том, что взаимодействия с одним брокером есть. Последовательность моих действий. Если Вам это не интересно, то можете сразу перейти к следующей главе, где описывается межброкерное взаимодействие в схеме с одним Kafka брокером.
Давайте пока будем называть межброкерным взаимодействием только то общение, которое проходит через соединение для межброкерного взаимодействия.
Очевидно, что если у нас один брокер, то и никакого общения между другими брокера не будет.
А что же с общением между ZooKeeper и Kafka? Судя по конфигурации сервера, кажется, что также нет общения через соединение для межброкерного взаимодействия. Ведь в конфигурации сервера описывается для ZooKeeper есть свое соединение, которое задается опцией zookeeper.connect.
Т.е. выглядеть это должно так:
Согласно такой схеме, для одного Kafka сервера нет необходимости в описании соединения для межброкерного взаимодействия! Ведь соединений с другими брокерами у нас нет, потому что брокеров нет. А соединение с ZooKeeper описывается zookeeper.connect. Т.е, выходит, что никакого межброкерного взаимодействия.
Если мы берем конфигурацию Kafka по умолчанию, то выглядит она так (если удалить комментарии):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 |
В ней не описаны соединения для Kafka клиентов и для межброкерного взаимодействия. В таком случае Kafka применяет настройки по умолчанию:
1 2 3 4 |
advertised.host.name=localhost listeners=PLAINTEXT://localhost:9093 advertised.listeners=PLAINTEXT://localhost:9093 |
Пока все соответствует нашей диаграмме.
А теперь изменим описание соединения! Добавим к нему протокол аутентификации. И опишем настройки протокола, чтобы избежать сообщений об ошибках. Пусть протокол будет SASL_PLAINTEXT + PLAIN (в этой статье подробное описание). Т.е. теперь конфиг будет выглядеть так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093 advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093 security.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_robin="robin-secret" \ user_alice="alice-secret"; authorizer.class.name=kafka.security.authorizer.AclAuthorizer |
Если наше предположение о том, что нет межброкреного взаимодействия, то Kafka с такой конфигурацией успешно запустится.
Но, не тут то было, получаем ошибку:
1 |
java.lang.IllegalArgumentException: requirement failed: sasl.mechanism.inter.broker.protocol must be included in sasl.enabled.mechanisms when SASL is used for inter-broker communication |
Kafka нам дает понять, что если среди указанных соединений нет PLAINTEXT то для межброкреного взаимодействия нужно явно указать протокол и его настройки из описаных соединений.
Более того, если Вы добавите PLAINTEXT к описанию соединений, то все заработает. Итак, мы выяснили, что Kafka тем или иным способом требует описание соединения для межброкреного взаимодейтсвия.
Предположим что, что описание соединения требуется всегда, но по факту не используется. Тогда опишем аутентификацию SASL_PLAINTEXT PLAIN с авторизацией ACL (подробный пример с SASL_SSL PLAIN описан тут). И запретим доступ к ресурсам, если ресурса нет в списке ACL.
Т.е. не будем указывать опцию:
1 |
allow.everyone.if.no.acl.found=true |
Тогда конфиг выглядеть будет так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093 advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093 security.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret" \ user_robin="robin-secret" \ user_alice="alice-secret"; authorizer.class.name=kafka.security.authorizer.AclAuthorizer |
Запустим Kafku. Kafka заработает, во всяком случае команда
1 |
sudo systemctl status kafka |
Будет выдавать состояние — Active: active (running)
Но, если мы попробуем отправить что-нибудь в топик этим клиентом:
1 2 3 4 5 6 7 8 9 10 11 12 |
package com.blockwit.kafka.security.examples; public class SimpleProducerTest_SASL_PLAINTEXT_PLAIN { public static void main(String[] args) throws InterruptedException { SimpleProducer.runProducer(Helper.of("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required serviceName=\"Kafka\" username=\"alice\" password=\"alice-secret\";", "sasl.mechanism", "PLAIN", "security.protocol", "SASL_PLAINTEXT"), "localhost:9093", "stats.store"); } } |
Полностью код клиента можно увидеть в репозитории — https://github.com/BlockWit/kafka-security-examples/blob/master/src/main/java/com/blockwit/kafka/security/examples/SimpleProducerTest_SASL_PLAINTEXT_PLAIN.java — или в этой статье. Не забывайте менять имя хоста.
В результате мы получим:
1 |
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 17 : {stats.store=LEADER_NOT_AVAILABLE} |
Но, попытавшись проверить лидера командой:
1 |
sudo /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic stats.store --describe |
Мы ничего не получим — клиент kafka-topics.sh просто зависнет.
Давайте тогда посмотрим логи Kafka. Там мы можем обнаружить такую ошибку:
ERROR [KafkaApi-0] Error when handling request: clientId=0, correlationId=0, api=UPDATE_METADATA ….
Такую:
ERROR [KafkaApi-0] Error when handling request: clientId=0, correlationId=1, api=LEADER_AND_ISR,
По сообщение видно, что этот запрос связан как раз с выбором LEADER топика. И наконец мы можем обнаружить такую ошибку:
org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9092-127.0.0.1:56996-0, session=Session(User:ANONYMOUS,/127.0.0.1), listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null) is not authorized.
Обратите внимание, что кто-то пытается подключится к PLAINTEXT://localhost:9092. А это у нас по умолчанию соединение для межброкерного взаимодействия!
Чтобы убедится что это не клиент коннектится (а может у нас в нем ошибка где-то), мы выключим клиент и перезапустим Kafka. И если посмотреть в логи, то ошибки будут те же.
Что мы имеет в итоге:
- У нас только один брокер
- Клиентов нет
- Кто-то пытается соединиться с PLAINTEXT://localhost:9092
Выходит все же межброкерное взаимодействие у Kafka с одним брокером есть! Тот кто пытается соединиться с PLAINTEXT://localhost:9092 — это есть KafkaController. Более того помимо KafkaController есть и другие клиенты брокера.
В данном примере ошибка возникла из-за того, что у нас стоит авторизация по ACL. А на соединение PLAINTEXT://localhost:9092 — нет протокола аутентификации. А если нельзя установить личность, то и узнать какие права у этой личность мы не можем. Поэтому на запрос от KafkaController наш Kafka сервер отвечает is not authorized.
Давайте теперь выясним что же это за KafkaController , зачем он нужен и как на самом деле работает в Kafka межброкерное взаимодействие с одним брокером.
Межброкерное взаимодействие Kafka
Давайте посмотрим на диаграмму:
Стрелки разного цвета изображают разные типы соединений:
- Черный — внутреннее общение
- Синий — zookeeper.connect — соединение с ZooKeeper
- Оранжевый — соединений для межброкерной коммуникации
На изображении нет только клиентского соединения.
А теперь давайте разберем что у нас на диаграмме. Дело в том, что когда Kafka брокер запускается он поднимает сам сервер Kafka и KafkaController. А KafkaController уже соединяется с ZooKeeper и с KafkaServer.
Первое что приходит в голову — зачем такие сложности? Почему KafkaServer сразу не может соединяться с ZooKeeper, а не через KafkaController? Для ответа на этот вопрос нужно узнать, что вообще делает контроллер. Но более понятно это станет только на примере нескольких брокеров.
Приведем диаграмму кластера из трех брокеров:
Контроллер занимается тем, что получает события от ZooKeeper и на основе этих событий обновляет информацию во всех брокерах. Т.е контроллер соединяется со всеми брокерами в кластере. Но зачем для каждого брокера свой контроллер? На самом деле, активный контроллер только один. На диаграмме активный контроллер зеленый. Остальные на подхвате. Если активный контроллер по каким-от причинам сломается, то ZooKeeper выберет контроллер другого брокера. И новый контроллер будет рассылать информацию. Сделано это для обеспечения отказоустойчивости системы. На диаграмме видно, что активный контроллер — это контроллер брокера 1.
Зачем нужен BrokerToControllerChannelManager? Задача этого компонента найти активный контроллер и отсылать к нему запросы от текущего брокера.
Но что же это за информация , которую рассылает контроллер и которая требуется всем брокерам? Это так называемые метаданные — данные которые хранятся в ZooKeeper и необходимы для обслуживания кластера. Какие брокеры активны, какие брокеры какие топики обслуживают (т.е. лидеры), сколько топиков и на каких брокерах их данные хранятся и так далее. Т.е. это не сами топики, а информация о них, которая необходима для работы брокеров.
Резюме
В этой статье мы узнали, откуда и зачем у Kafka даже при одном брокере есть межброкерное взаимодействие. Важно понимать, что KafkaController не единственный компонент, участвующий в коммуникации между брокерами.
Полный список статей по теме тут.
Заходите в наш телеграмм канал — Enterprise Stack Helper! Делитесь опытом или задавайте вопросы, если что-то непонятно.
Репозитории с примерам из статей по способам аутентификаци и авторизации — https://github.com/BlockWit/kafka-security-examples.