Подключение к кластеру
Подключиться к кластеру можно двумя способами:
- По внутренней сети. Используйте этот способ, если подключаетесь к кластеру с ВМ в облаке MWS.
- По внешней сети. Используйте этот способ, если подключаетесь к кластеру из-за пределов облака MWS.
Подключение по внутренней сети
Заголовок раздела «Подключение по внутренней сети»- Веб-консоль
- MWS CLI
- API
В веб-консоли выберите нужный проект.
В списке сервисов выберите Managed Kafka.
Нажмите на имя нужного кластера.
Перейдите на вкладку Сети.
Сохраните адрес любого брокера из блока Внутреннее подключение. Адреса находятся в столбце Bootstrap сервер.
Перейдите на вкладку Топики.
Нажмите на имя нужного топика.
Найдите идентификатор топика вида
.../<имя топика>. Сохраните имя топика.Перейдите на вкладку Пользователи.
В списке пользователей найдите и сохраните имя нужного пользователя.
Скачайте по прямой ссылке сертификат шифрования. Он сохранится в файле с именем
mws-root-ca.crt.Установите на виртуальную машину среду выполнения Java 11 и выше, совместимую с Apache Kafka.
Установите на виртуальную машину Apache Kafka версии не ниже, чем в кластере.
Задайте переменные окружения:
bash export BOOTSTRAP="<адрес брокера>"export TOPIC="<имя топика>"export USERNAME="<имя пользователя>"export PASSWORD="<пароль>"export CONF="$HOME/<файл конфигурации Kafka>.properties"Для подключения к кластеру достаточно указать по одному брокеру и топику. Брокер возвращает полную информацию о кластере Kafka.
Создайте файл конфигурации Kafka:
bash cat > "$CONF" <<EOFsecurity.protocol=SASL_SSLsasl.mechanism=SCRAM-SHA-256ssl.truststore.type=PEM ssl.truststore.location=<путь до сертификата>/mws-root-ca.crtsasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";ssl.endpoint.identification.algorithm=EOFЗдесь:
security.protocol=SASL_SSL— SASL-аутентификация по защищенному SSL-каналу;sasl.mechanism=SCRAM-SHA-256— алгоритм аутентификации;ssl.truststore.type=PEM— тип хранилища доверенных сертификатов (PEM-формат);sasl.jaas.config— учетные данные.
Подключение по внешней сети
Заголовок раздела «Подключение по внешней сети»- Веб-консоль
- MWS CLI
- API
Для сети, к которой подключен кластер, добавьте правило файрвола для входящего трафика:
- источник трафика — IP-адрес клиента;
- назначение трафика — внешний IP-адрес ресурса;
- протокол и порт —
TCP:9101.
В веб-консоли выберите нужный проект.
В списке сервисов выберите Managed Kafka.
Нажмите на имя нужного кластера.
Перейдите на вкладку Сети.
Сохраните адрес любого брокера из блока Внешнее подключение. Адреса находятся в столбце Bootstrap сервер.
Перейдите на вкладку Топики.
Нажмите на имя нужного топика.
Найдите идентификатор топика вида
.../<имя топика>. Сохраните имя топика.Перейдите на вкладку Пользователи.
В списке пользователей найдите и сохраните имя нужного пользователя.
Скачайте по прямой ссылке сертификат шифрования. Он сохранится в файле с именем
mws-root-ca.crt.Установите среду выполнения Java 11 и выше, совместимую с Apache Kafka.
Установите Apache Kafka версии не ниже, чем в кластере.
Задайте переменные окружения:
bash export BOOTSTRAP="<адрес брокера>"export TOPIC="<имя топика>"export USERNAME="<имя пользователя>"export PASSWORD="<пароль>"export CONF="$HOME/<файл конфигурации Kafka>.properties"Для подключения к кластеру достаточно указать по одному брокеру и топику. Брокер возвращает полную информацию о кластере Kafka.
Создайте файл конфигурации Kafka:
bash cat > "$CONF" <<EOFsecurity.protocol=SASL_SSLsasl.mechanism=SCRAM-SHA-256ssl.truststore.type=PEM ssl.truststore.location=<путь до сертификата>/mws-root-ca.crtsasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";ssl.endpoint.identification.algorithm=EOFЗдесь:
security.protocol=SASL_SSL— SASL-аутентификация по защищенному SSL-каналу;sasl.mechanism=SCRAM-SHA-256— алгоритм аутентификации;ssl.truststore.type=PEM— тип хранилища доверенных сертификатов (PEM-формат);sasl.jaas.config— учетные данные.
Проверка подключения
Заголовок раздела «Проверка подключения»Получите список доступных топиков кластера:
bash "$KAFKA_HOME/bin/kafka-topics.sh" \ # Запуск утилиты для работы с топиками Kafka--bootstrap-server "$BOOTSTRAP" \ # Адрес брокера--command-config "$CONF" \ # Файл конфигурации Kafka--list # Флаг для вывода списка всех доступных топиковЕсли подключение к кластеру успешно, в ответе будет выведен список доступных топиков кластера. Если вы еще не создали ни одного топика, будет выведен только системный топик Kafka
__consumer__offsets.Получите адреса всех брокеров в кластере:
bash "$KAFKA_HOME/bin/kafka-broker-api-versions.sh" \ # Запуск утилиты для получения информации о брокерах и поддерживаемых версиях API--bootstrap-server "$BOOTSTRAP" \ # Адрес брокера--command-config "$CONF" # Файл конфигурации KafkaВ ответе будет получен список брокеров с идентификатором и адресом для каждого брокера.
Получите информацию о брокерах-лидерах в разделах топика:
bash "$KAFKA_HOME/bin/kafka-topics.sh" \ # Запуск утилиты для работы с топиками Kafka--bootstrap-server "$BOOTSTRAP" \ # Адрес брокера--describe \ # Флаг для получения описания топика--topic "$TOPIC" \ # Имя топика--command-config "$CONF" # Файл конфигурации KafkaВ ответе будет получено описание топика. Для каждого раздела топика будет указан идентификатор брокера-лидера. Сопоставьте идентификаторы брокеров с их адресами, полученными в предыдущем шаге.
Запустите консольного производителя для записи и отправки сообщений:
bash "$KAFKA_HOME/bin/kafka-console-producer.sh" \ # Запуск консольного производителя для отправки сообщений в топик--bootstrap-server "$BOOTSTRAP" \ # Адрес брокера-лидера раздела--producer.config "$CONF" \ # Файл конфигурации Kafka--topic "$TOPIC" # Целевой топикПосле запуска введите сообщение для записи и отправки потребителю.
Запустите консольного потребителя для чтения сообщений:
bash "$KAFKA_HOME/bin/kafka-console-consumer.sh" \ # Запуск консольного потребителя для чтения сообщений из топика--bootstrap-server "$BOOTSTRAP" \ # Адрес брокера-лидера раздела--consumer.config "$CONF" \ # Файл конфигурации Kafka--topic "$TOPIC" \ # Целевой топик--group <группа потребителей> \ # Идентификатор группы потребителей--from-beginning # Чтение всех сообщений с начала топикаВ ответе будут выведены сообщения, отправленные в топик ранее.