Инструкция по ручному развертыванию Kafka в кластерном режиме для Wisla

В данном руководстве описано как подготовить систему и развернуть Kafka в кластерном режиме для работы с Wisla в отказоустойчивом контуре. 

 

 Предварительные условия: 

 1.Архив OpenJDK 17 () 2.Архив Kafka 2.13-4.1.1 

 3.Как минимум 3 сервера для развертывания 

 4.Пользователь wisla 

 

 Если пользователь wisla еще не существует в системе его необходимо создать используя следующие команды пошагово. 

 1.Команда создания пользователя: 

 sudo useradd -d /home/wisla -m -s /bin/bash wisla 

 2.Команда установки пароля: 

 sudo passwd wisla 

 3.Добавление пользователя в файлы sudoers: 

 echo "wisla ALL=(ALL:ALL) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/wisla 

 4.Настройка системных лимитов для пользователя wisla: 

 # Создаем файл лимитов

cat > /etc/security/limits.d/wisla << 'EOF'

wisla soft nofile 32768

wisla hard nofile 32768

wisla soft nproc 32768

wisla hard nproc 32768

EOF 

 5. Добавляем в PAM (если нет): 

 echo "session required pam_limits.so" >> /etc/pam.d/common-session 

   

 

 

 Этап 1. Подготовка  каталогов на всех узлах 

 1.Создание каталогов: 

 sudo mkdir -p /opt/kafka

sudo mkdir -p /var/lib/kafka

sudo mkdir -p /var/log/kafka 

 2.Выдача прав: 

 sudo chown -R wisla:wisla /opt/kafka

sudo chown -R wisla:wisla /var/lib/kafka

sudo chown -R wisla:wisla /var/log/kafka 

 3.Проверка прав: 

 ls -ld /opt/kafka /var/lib/kafka /var/log/kafka 

 Этап 2. Копирование и распаковка Java на всех узлах 

 1.Скопируем архив Java в домашнюю директорию wisla используя следующую команду: 

 cp OpenJDK17U-jdk_x64_linux_hotspot_17.0.17_10.tar.gz /home/wisla/ 

 2.Смена пользователя у архива: 

 sudo chown wisla:wisla /home/wisla/OpenJDK17U-jdk_x64_linux_hotspot_17.0.17_10.tar.gz 

 3.Распаковка архива в /opt/kafka: 

 

 Переходим в каталог kafka используя следующую команду: 

 cd /opt/kafka 

 

 Распаковка архива: 

 tar -xzf /home/wisla/OpenJDK17U-jdk_x64_linux_hotspot_17.0.17_10.tar.gz 

 

 Проверка распаковки: 

 ls -la /opt/kafka/ 

 

 

 4.Настройка переменных окружения: 

 export JAVA_HOME="/opt/kafka/jdk-17.0.17+10"

export PATH="$JAVA_HOME/bin:$PATH"

export KAFKA_HOME="/opt/kafka" 

 5.Применение изменений: 

 source ~/.bashrc 

 6.Проверка: 

 java -version 

 

 Этап 3. Копирование и распаковка Kafka на всех узлах 

 1.Скопируем архив Kafka в домашнюю директорию wisla используя следующую команду: 

 cp kafka_2.13-4.1.1.tgz /opt/kafka/ 

 2.Смена пользователя у архива: 

 sudo chown wisla:wisla /opt/kafka/kafka_2.13-4.1.1.tgz 

 3.Распаковка архива в /opt/kafka: 

 

 Переходим в каталог kafka используя следующую команду: 

 cd /opt/kafka 

 

 Распаковка архива: 

 tar -xzf kafka_2.13-4.1.1.tgz --strip-components=1 

 

 Проверка распаковки: 

 ls -la /opt/kafka/bin/ 

 

 

 Этап 4. Генерация уникального ID кластера (только на 1 сервере) 

 1.Переходим в каталог kafka используя следующую команду: 

 cd /opt/kafka 

 2.Запускаем скрипт генерации ID: 

 ./bin/kafka-storage.sh random-uuid 

 Записать полученный ID, например:  

 

 Этап 5. Конфигурация кластера на всех узлах 

 1.Настройка конфигурации на сервере 1: 

 cat > /opt/kafka/config/kraft/server.properties << 'EOF'

# Базовые настройки

process.roles=broker,controller

node.id=1

# Кворум контроллеров (3 узла для отказоустойчивости)

controller.quorum.voters=1@wisla01:9093,2@wisla02:9093,3@wisla03:9093

# Сетевые настройки wisla01

listeners=PLAINTEXT://:9092,CONTROLLER://:9093

advertised.listeners=PLAINTEXT://wisla01:9092

advertised.controller.listener.name=CONTROLLER

# Директории данных

log.dirs=/var/lib/kafka

# Настройки репликации для 3 узлов

num.partitions=3

default.replication.factor=3

min.insync.replicas=2

# Системные топики (репликация на все узлы)

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=2

# Настройки отказоустойчивости контроллеров

controller.listener.names=CONTROLLER

controller.quorum.election.timeout.ms=1000

controller.quorum.fetch.timeout.ms=2000

controller.quorum.request.timeout.ms=2000

# Настройки брокера

num.io.threads=8

num.network.threads=3

num.replica.fetchers=2

# Настройки логов

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

log.flush.interval.messages=10000

log.flush.interval.ms=1000

# Настройки безопасности (опционально)

# ssl.client.auth=required

# ssl.keystore.location=/path/to/keystore.jks

# ssl.keystore.password=password

# ssl.truststore.location=/path/to/truststore.jks

# ssl.truststore.password=password

EOF 

 2.Настройка конфигурации на сервере 2: 

 cat > /opt/kafka/config/kraft/server.properties << 'EOF'

# Базовые настройки

process.roles=broker,controller

node.id=2

# Кворум контроллеров

controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093

# Сетевые настройки node2

listeners=PLAINTEXT://:9092,CONTROLLER://:9093

advertised.listeners=PLAINTEXT://node2:9092

advertised.controller.listener.name=CONTROLLER

# Директории данных

log.dirs=/var/lib/kafka

# Настройки репликации

num.partitions=3

default.replication.factor=3

min.insync.replicas=2

# Системные топики

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=2

# Настройки контроллеров

controller.listener.names=CONTROLLER

controller.quorum.election.timeout.ms=1000

controller.quorum.fetch.timeout.ms=2000

controller.quorum.request.timeout.ms=2000

# Настройки брокера

num.io.threads=8

num.network.threads=3

num.replica.fetchers=2

# Настройки логов

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

log.flush.interval.messages=10000

log.flush.interval.ms=1000

EOF 

 3.Настройка конфигурации на сервере 3: 

 cat > /opt/kafka/config/kraft/server.properties << 'EOF'

# Базовые настройки

process.roles=broker,controller

node.id=3

# Кворум контроллеров

controller.quorum.voters=1@wisla01:9093,2@wisla02:9093,3@wisla03:9093

# Сетевые настройки wisla03

listeners=PLAINTEXT://:9092,CONTROLLER://:9093

advertised.listeners=PLAINTEXT://wisla03:9092

advertised.controller.listener.name=CONTROLLER

# Директории данных

log.dirs=/var/lib/kafka

# Настройки репликации

num.partitions=3

default.replication.factor=3

min.insync.replicas=2

# Системные топики

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=2

# Настройки контроллеров

controller.listener.names=CONTROLLER

controller.quorum.election.timeout.ms=1000

controller.quorum.fetch.timeout.ms=2000

controller.quorum.request.timeout.ms=2000

# Настройки брокера

num.io.threads=8

num.network.threads=3

num.replica.fetchers=2

# Настройки логов

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

log.flush.interval.messages=10000

log.flush.interval.ms=1000

EOF 

 4.Проверка конфигураций: 

 cat /opt/kafka/config/kraft/server.properties 

 Этап 6. Инициализация хранилищ на всех узлах 

 1.На каждом сервере по очереди 

 

 Экспорт кластер ID: 

 export CLUSTER_ID="wisla-3node-cluster-12345" 

 

 Переходим в каталог kafka: 

 cd /opt/kafka 

 

 Добавляем кластер ID: 

 ./bin/kafka-storage.sh format \

 -t $CLUSTER_ID \

 -c /opt/kafka/config/kraft/server.properties 

 

 Проверяем 

 cat /var/lib/kafka/meta.properties 

 

 

 Этап 7. Создаем сервис службы Kafka на всех узлах 

 1.Создаем Unit: 

 sudo cat > /etc/systemd/system/kafka.service << 'EOF'

[Unit]

Description=Apache Kafka Service (Kraft mode) - Wisla 3-Node Cluster

After=network.target

Wants=network.target

[Service]

Type=simple

User=wisla

Group=wisla

WorkingDirectory=/opt/kafka

Environment="JAVA_HOME=/opt/kafka/jdk-17.0.17+10"

Environment="PATH=/opt/kafka/jdk-17.0.17+10/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"

Environment="CLUSTER_ID=wisla-3node-cluster-12345"

Environment="KAFKA_HEAP_OPTS=-Xmx4G -Xms4G"

Environment="KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"

Environment="KAFKA_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"

# Инициализация хранилища при первом запуске

ExecStartPre=/bin/bash -c "if [ ! -f /var/lib/kafka/meta.properties ]; then echo 'Инициализация хранилища для кластера...' && /opt/kafka/bin/kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties; fi"

# Запуск Kafka

ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties

# Остановка

ExecStop=/opt/kafka/bin/kafka-server-stop.sh

# Логи

StandardOutput=journal

StandardError=journal

SuccessExitStatus=0 143

# Перезапуск при сбоях

Restart=on-failure

RestartSec=10

# Ограничения

LimitNOFILE=65536

LimitNPROC=65536

[Install]

WantedBy=multi-user.target

EOF 

 2.Создание конфигурации log4j: 

 cat > /opt/kafka/config/log4j.properties << 'EOF'

log4j.rootLogger=INFO, stdout, kafkaAppender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender

log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH

log4j.appender.kafkaAppender.File=/var/log/kafka/kafka.log

log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout

log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka=INFO

log4j.logger.kafka.network.RequestChannel$=WARN

log4j.logger.kafka.server.KafkaApis=WARN

log4j.logger.org.apache.zookeeper=WARN

log4j.logger.org.I0Itec.zkclient=WARN

EOF 

 Этап 8. Создание скриптов управления кластером 

 1. Создаем скрипт запуска кластера на первом сервере: 

 sudo cat > /usr/local/bin/kafka-cluster-start << 'EOF'

#!/bin/bash

echo "Запуск кластера Kafka на всех узлах..."

# Запуск на локальном узле

systemctl start kafka

echo "Node $(hostname): Kafka запущен"

# Если настроен SSH-доступ к другим узлам, можно добавить:

# ssh wisla@wisla02 "systemctl start kafka"

# ssh wisla@wisla03 "systemctl start kafka"

# echo "wisla02: Kafka запущен"

# echo "wisla03: Kafka запущен"

echo "Кластер Kafka (3 узла) запущен"

echo "Для проверки выполните: kafka-cluster-status"

EOF 

 2.Создаем скрипт остановки кластера на первом сервере: 

 sudo cat > /usr/local/bin/kafka-cluster-stop << 'EOF'

#!/bin/bash

echo "Остановка кластера Kafka..."

# Остановка в правильном порядке (сначала реплики, потом лидеры)

systemctl stop kafka

echo "Node $(hostname): Kafka остановлен"

echo "Кластер Kafka остановлен"

EOF 

 3.Создание скрипта проверки статуса на первом сервере: 

 sudo cat > /usr/local/bin/kafka-cluster-status << 'EOF'

#!/bin/bash

echo "=== Статус кластера Kafka (3 узла) ==="

echo ""

echo "Локальный узел ($(hostname)):"

systemctl status kafka --no-pager | grep -A 3 "Active:"

echo ""

echo "Для проверки репликации выполните на любом узле:"

echo " cd /opt/kafka && ./bin/kafka-topics.sh --describe --bootstrap-server wisla01:9092"

echo ""

echo "Для просмотра логов: sudo journalctl -u kafka -f"

EOF 

 4.Создание скрипта управления топиками: 

 sudo cat > /usr/local/bin/kafka-topic-manage << 'EOF'

#!/bin/bash

KAFKA_HOME="/opt/kafka"

BOOTSTRAP_SERVER="wisla01:9092,wisla02:9092,wisla03:9092"

case $1 in

 create)

 echo "Создание топика: $2"

 $KAFKA_HOME/bin/kafka-topics.sh --create \

 --bootstrap-server $BOOTSTRAP_SERVER \

 --replication-factor 3 \

 --partitions 3 \

 --topic $2

 ;;

 list)

 echo "Список топиков:"

 $KAFKA_HOME/bin/kafka-topics.sh --list \

 --bootstrap-server $BOOTSTRAP_SERVER

 ;;

 describe)

 echo "Описание топика: $2"

 $KAFKA_HOME/bin/kafka-topics.sh --describe \

 --bootstrap-server $BOOTSTRAP_SERVER \

 --topic $2

 ;;

 delete)

 echo "Удаление топика: $2"

 $KAFKA_HOME/bin/kafka-topics.sh --delete \

 --bootstrap-server $BOOTSTRAP_SERVER \

 --topic $2

 ;;

 *)

 echo "Использование: $0 {create|list|describe|delete} [topic-name]"

 echo "Пример:"

 echo " $0 create wisla-topic"

 echo " $0 list"

 echo " $0 describe wisla-topic"

 echo " $0 delete wisla-topic"

 ;;

esac

EOF 

 5.Предоставление прав на исполнение: 

 sudo chmod +x /usr/local/bin/kafka-* 

 6. Настройка /etc/hosts 

 sudo cat >> /etc/hosts << 'EOF'

# Kafka Cluster Nodes

192.168.1.101 Wisla01

192.168.1.102 Wisla02

192.168.1.103 Wisla03

EOF 

 7.Настройка брандмауэра (Опционально): 

 sudo ufw allow 9092/tcp # Client connections

sudo ufw allow 9093/tcp # Controller connections

sudo ufw allow 9094/tcp # Inter-broker connections (если настроены) 

 Этап 9. Запуск кластера 

 1.Перезагрузка systemd на всех узлах: 

 sudo systemctl daemon-reload 

 2.Добавление в автозапуск на всех узлах: 

 sudo systemctl enable kafka 

 3.Запуск кластера (ВАЖНО: запускать на всех узлах в течение 30 секунд): 

 # На wisla01:

sudo systemctl start kafka

# На wisla02 (в течение 30 секунд после wisla01):

sudo systemctl start kafka

# На wisla03 (в течение 30 секунд после wisla02):

sudo systemctl start kafka 

 4.Проверка статуса службы: 

 sudo systemctl status kafka 

   

 Дополнительная проверка: Так же после установки и запуска службы Kafka можно создать скрипт проверки статусов работы кластера Kafka. 1.Создать файл скрипта командой: 

 sudo nano /usr/local/bin/kafka-cluster-monitor 

 2.Предоставить права на исполнение: 

 sudo chmod +x /usr/local/bin/kafka-cluster-monitor 

 3.Добавить скрипт в файл 

 #!/bin/bash

echo "========================================="

echo " Мониторинг кластера Kafka (3 узла) "

echo "========================================="

echo ""

echo "1. СТАТУС СЛУЖБ:"

for node in cluster01 cluster02 cluster03; do

 if [ "$node" = "cluster01" ]; then

 status=$(sudo systemctl is-active kafka 2>/dev/null || echo "unknown")

 else

 status=$(ssh wisla@$node "sudo systemctl is-active kafka 2>/dev/null" 2>/dev/null || echo "unknown")

 fi

 

 case $status in

 active) color="\e[32m" ;; 

 inactive|failed) color="\e[31m" ;; 

 unknown) color="\e[33m" ;; 

 *) color="\e[33m" ;; 

 esac

 echo -e " $node: ${color}${status}\e[0m"

done

echo -e "\n2. СПИСОК БРОКЕРОВ:"

cd /opt/kafka 2>/dev/null 

if [ -f /opt/kafka/bin/kafka-broker-api-versions.sh ]; then

 /opt/kafka/bin/kafka-broker-api-versions.sh \

 --bootstrap-server cluster01:9092 2>/dev/null | grep -E "^cluster" | head -3

else

 echo " Kafka не установлен локально"

fi

echo -e "\n3. СПИСОК ТОПИКОВ:"

cd /opt/kafka 2>/dev/null 

if [ -f /opt/kafka/bin/kafka-topics.sh ]; then

 /opt/kafka/bin/kafka-topics.sh \

 --list --bootstrap-server cluster01:9092 2>/dev/null | head -10

else

 echo " Kafka не установлен локально"

fi

echo -e "\n4. ПРОВЕРКА РЕПЛИКАЦИИ:"

cd /opt/kafka 2>/dev/null 

if [ -f /opt/kafka/bin/kafka-topics.sh ]; then

 result=$(/opt/kafka/bin/kafka-topics.sh \

 --describe --bootstrap-server cluster01:9092 \

 --under-replicated-partitions 2>/dev/null)

 if echo "$result" | grep -q "Topic"; then

 echo " Есть недореплицированные партиции"

 else

 echo " Все партиции реплицированы"

 fi

else

 echo " Kafka не установлен локально"

fi

echo -e "\n5. ПОСЛЕДНИЕ СОБЫТИЯ:"

for node in cluster01 cluster02 cluster03; do

 if [ "$node" = "cluster01" ]; then

 errors=$(sudo journalctl -u kafka --since '5 minutes ago' 2>/dev/null | grep -E 'ERROR|WARN' | tail -2)

 else

 errors=$(ssh wisla@$node "sudo journalctl -u kafka --since '5 minutes ago' 2>/dev/null | grep -E 'ERROR|WARN' | tail -2" 2>/dev/null)

 fi

 

 if [ -n "$errors" ]; then

 echo " $node:"

 echo "$errors" | sed 's/^/ /'

 fi

done

echo -e "\n========================================="

echo " Кластер готов к работе!"

echo "========================================="

 

 4.Запустить скрипт для проверки 

 ./kafka-cluster-monitor