Перейти к содержимому

Подключение к кластеру

Подключиться к кластеру можно двумя способами:

  • По внутренней сети. Используйте этот способ, если подключаетесь к кластеру с ВМ в облаке MWS.
  • По внешней сети. Используйте этот способ, если подключаетесь к кластеру из-за пределов облака MWS.
  • Веб-консоль
  • MWS CLI
  • API
  1. В веб-консоли выберите нужный проект.

  2. В списке сервисов выберите Managed Kafka.

  3. Нажмите на имя нужного кластера.

  4. Перейдите на вкладку Сети.

  5. Сохраните адрес любого брокера из блока Внутреннее подключение. Адреса находятся в столбце Bootstrap сервер.

  6. Перейдите на вкладку Топики.

  7. Нажмите на имя нужного топика.

  8. Найдите идентификатор топика вида .../<имя топика>. Сохраните имя топика.

  9. Перейдите на вкладку Пользователи.

  10. В списке пользователей найдите и сохраните имя нужного пользователя.

  11. Убедитесь, что у вас есть ВМ в сервисе Compute, подключенная к одной сети с кластером. Если нет, создайте ее.

  12. Скачайте по прямой ссылке сертификат шифрования. Он сохранится в файле с именем mws-root-ca.crt.

  13. Установите на виртуальную машину среду выполнения Java 11 и выше, совместимую с Apache Kafka.

  14. Установите на виртуальную машину Apache Kafka версии не ниже, чем в кластере.

  15. Задайте переменные окружения:

    bash
    export BOOTSTRAP="<адрес брокера>"
    export TOPIC="<имя топика>"
    export USERNAME="<имя пользователя>"
    export PASSWORD="<пароль>"
    export CONF="$HOME/<файл конфигурации Kafka>.properties"

    Для подключения к кластеру достаточно указать по одному брокеру и топику. Брокер возвращает полную информацию о кластере Kafka.

  16. Создайте файл конфигурации Kafka:

    bash
    cat > "$CONF" <<EOF
    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-256
    ssl.truststore.type=PEM ssl.truststore.location=<путь до сертификата>/mws-root-ca.crt
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";
    ssl.endpoint.identification.algorithm=
    EOF

    Здесь:

    • security.protocol=SASL_SSL — SASL-аутентификация по защищенному SSL-каналу;
    • sasl.mechanism=SCRAM-SHA-256 — алгоритм аутентификации;
    • ssl.truststore.type=PEM — тип хранилища доверенных сертификатов (PEM-формат);
    • sasl.jaas.config — учетные данные.
  • Веб-консоль
  • MWS CLI
  • API
  1. Для сети, к которой подключен кластер, добавьте правило файрвола для входящего трафика:

    • источник трафика — IP-адрес клиента;
    • назначение трафика — внешний IP-адрес ресурса;
    • протокол и порт — TCP:9101.
  2. В веб-консоли выберите нужный проект.

  3. В списке сервисов выберите Managed Kafka.

  4. Нажмите на имя нужного кластера.

  5. Перейдите на вкладку Сети.

  6. Сохраните адрес любого брокера из блока Внешнее подключение. Адреса находятся в столбце Bootstrap сервер.

  7. Перейдите на вкладку Топики.

  8. Нажмите на имя нужного топика.

  9. Найдите идентификатор топика вида .../<имя топика>. Сохраните имя топика.

  10. Перейдите на вкладку Пользователи.

  11. В списке пользователей найдите и сохраните имя нужного пользователя.

  12. Скачайте по прямой ссылке сертификат шифрования. Он сохранится в файле с именем mws-root-ca.crt.

  13. Установите среду выполнения Java 11 и выше, совместимую с Apache Kafka.

  14. Установите Apache Kafka версии не ниже, чем в кластере.

  15. Задайте переменные окружения:

    bash
    export BOOTSTRAP="<адрес брокера>"
    export TOPIC="<имя топика>"
    export USERNAME="<имя пользователя>"
    export PASSWORD="<пароль>"
    export CONF="$HOME/<файл конфигурации Kafka>.properties"

    Для подключения к кластеру достаточно указать по одному брокеру и топику. Брокер возвращает полную информацию о кластере Kafka.

  16. Создайте файл конфигурации Kafka:

    bash
    cat > "$CONF" <<EOF
    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-256
    ssl.truststore.type=PEM ssl.truststore.location=<путь до сертификата>/mws-root-ca.crt
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";
    ssl.endpoint.identification.algorithm=
    EOF

    Здесь:

    • security.protocol=SASL_SSL — SASL-аутентификация по защищенному SSL-каналу;
    • sasl.mechanism=SCRAM-SHA-256 — алгоритм аутентификации;
    • ssl.truststore.type=PEM — тип хранилища доверенных сертификатов (PEM-формат);
    • sasl.jaas.config — учетные данные.
  1. Получите список доступных топиков кластера:

    bash
    "$KAFKA_HOME/bin/kafka-topics.sh" \ # Запуск утилиты для работы с топиками Kafka
    --bootstrap-server "$BOOTSTRAP" \ # Адрес брокера
    --command-config "$CONF" \ # Файл конфигурации Kafka
    --list # Флаг для вывода списка всех доступных топиков

    Если подключение к кластеру успешно, в ответе будет выведен список доступных топиков кластера. Если вы еще не создали ни одного топика, будет выведен только системный топик Kafka __consumer__offsets.

  2. Получите адреса всех брокеров в кластере:

    bash
    "$KAFKA_HOME/bin/kafka-broker-api-versions.sh" \ # Запуск утилиты для получения информации о брокерах и поддерживаемых версиях API
    --bootstrap-server "$BOOTSTRAP" \ # Адрес брокера
    --command-config "$CONF" # Файл конфигурации Kafka

    В ответе будет получен список брокеров с идентификатором и адресом для каждого брокера.

  3. Получите информацию о брокерах-лидерах в разделах топика:

    bash
    "$KAFKA_HOME/bin/kafka-topics.sh" \ # Запуск утилиты для работы с топиками Kafka
    --bootstrap-server "$BOOTSTRAP" \ # Адрес брокера
    --describe \ # Флаг для получения описания топика
    --topic "$TOPIC" \ # Имя топика
    --command-config "$CONF" # Файл конфигурации Kafka

    В ответе будет получено описание топика. Для каждого раздела топика будет указан идентификатор брокера-лидера. Сопоставьте идентификаторы брокеров с их адресами, полученными в предыдущем шаге.

  4. Запустите консольного производителя для записи и отправки сообщений:

    bash
    "$KAFKA_HOME/bin/kafka-console-producer.sh" \ # Запуск консольного производителя для отправки сообщений в топик
    --bootstrap-server "$BOOTSTRAP" \ # Адрес брокера-лидера раздела
    --producer.config "$CONF" \ # Файл конфигурации Kafka
    --topic "$TOPIC" # Целевой топик

    После запуска введите сообщение для записи и отправки потребителю.

  5. Запустите консольного потребителя для чтения сообщений:

    bash
    "$KAFKA_HOME/bin/kafka-console-consumer.sh" \ # Запуск консольного потребителя для чтения сообщений из топика
    --bootstrap-server "$BOOTSTRAP" \ # Адрес брокера-лидера раздела
    --consumer.config "$CONF" \ # Файл конфигурации Kafka
    --topic "$TOPIC" \ # Целевой топик
    --group <группа потребителей> \ # Идентификатор группы потребителей
    --from-beginning # Чтение всех сообщений с начала топика

    В ответе будут выведены сообщения, отправленные в топик ранее.