Инструкция по ручному развертыванию Kafka в кластерном режиме для Wisla
В данном руководстве описано как подготовить систему и развернуть Kafka в кластерном режиме для работы с Wisla в отказоустойчивом контуре.
Предварительные условия:
1.Архив OpenJDK 17 ()
2.Архив Kafka 2.13-4.1.1
3.Как минимум 3 сервера для развертывания
4.Пользователь wisla
Если пользователь wisla еще не существует в системе его необходимо создать используя следующие команды пошагово.
1.Команда создания пользователя:
sudo useradd -d /home/wisla -m -s /bin/bash wisla
2.Команда установки пароля:
sudo passwd wisla
3.Добавление пользователя в файлы sudoers:
echo "wisla ALL=(ALL:ALL) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/wisla
4.Настройка системных лимитов для пользователя wisla:
# Создаем файл лимитов
cat > /etc/security/limits.d/wisla << 'EOF'
wisla soft nofile 32768
wisla hard nofile 32768
wisla soft nproc 32768
wisla hard nproc 32768
EOF
5. Добавляем в PAM (если нет):
echo "session required pam_limits.so" >> /etc/pam.d/common-session
Этап 1. Подготовка каталогов на всех узлах
1.Создание каталогов:
sudo mkdir -p /opt/kafka
sudo mkdir -p /var/lib/kafka
sudo mkdir -p /var/log/kafka
2.Выдача прав:
sudo chown -R wisla:wisla /opt/kafka
sudo chown -R wisla:wisla /var/lib/kafka
sudo chown -R wisla:wisla /var/log/kafka
3.Проверка прав:
ls -ld /opt/kafka /var/lib/kafka /var/log/kafka
Этап 2. Копирование и распаковка Java на всех узлах
1.Скопируем архив Java в домашнюю директорию wisla используя следующую команду:
cp OpenJDK17U-jdk_x64_linux_hotspot_17.0.17_10.tar.gz /home/wisla/
2.Смена пользователя у архива:
sudo chown wisla:wisla /home/wisla/OpenJDK17U-jdk_x64_linux_hotspot_17.0.17_10.tar.gz
3.Распаковка архива в /opt/kafka:
- Переходим в каталог kafka используя следующую команду:
cd /opt/kafka - Распаковка архива:
tar -xzf /home/wisla/OpenJDK17U-jdk_x64_linux_hotspot_17.0.17_10.tar.gz - Проверка распаковки:
ls -la /opt/kafka/
4.Настройка переменных окружения:
export JAVA_HOME="/opt/kafka/jdk-17.0.17+10"
export PATH="$JAVA_HOME/bin:$PATH"
export KAFKA_HOME="/opt/kafka"
5.Применение изменений:
source ~/.bashrc
6.Проверка:
java -version
Этап 3. Копирование и распаковка Kafka на всех узлах
1.Скопируем архив Kafka в домашнюю директорию wisla используя следующую команду:
cp kafka_2.13-4.1.1.tgz /opt/kafka/
2.Смена пользователя у архива:
sudo chown wisla:wisla /opt/kafka/kafka_2.13-4.1.1.tgz
3.Распаковка архива в /opt/kafka:
- Переходим в каталог kafka используя следующую команду:
cd /opt/kafka - Распаковка архива:
tar -xzf kafka_2.13-4.1.1.tgz --strip-components=1 - Проверка распаковки:
ls -la /opt/kafka/bin/
Этап 4. Генерация уникального ID кластера (только на 1 сервере)
1.Переходим в каталог kafka используя следующую команду:
cd /opt/kafka
2.Запускаем скрипт генерации ID:
./bin/kafka-storage.sh random-uuid
Записать полученный ID, например:
Этап 5. Конфигурация кластера на всех узлах
1.Настройка конфигурации на сервере 1:
cat > /opt/kafka/config/kraft/server.properties << 'EOF'
# Базовые настройки
process.roles=broker,controller
node.id=1
# Кворум контроллеров (3 узла для отказоустойчивости)
controller.quorum.voters=1@wisla01:9093,2@wisla02:9093,3@wisla03:9093
# Сетевые настройки wisla01
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://wisla01:9092
advertised.controller.listener.name=CONTROLLER
# Директории данных
log.dirs=/var/lib/kafka
# Настройки репликации для 3 узлов
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
# Системные топики (репликация на все узлы)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# Настройки отказоустойчивости контроллеров
controller.listener.names=CONTROLLER
controller.quorum.election.timeout.ms=1000
controller.quorum.fetch.timeout.ms=2000
controller.quorum.request.timeout.ms=2000
# Настройки брокера
num.io.threads=8
num.network.threads=3
num.replica.fetchers=2
# Настройки логов
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# Настройки безопасности (опционально)
# ssl.client.auth=required
# ssl.keystore.location=/path/to/keystore.jks
# ssl.keystore.password=password
# ssl.truststore.location=/path/to/truststore.jks
# ssl.truststore.password=password
EOF
2.Настройка конфигурации на сервере 2:
cat > /opt/kafka/config/kraft/server.properties << 'EOF'
# Базовые настройки
process.roles=broker,controller
node.id=2
# Кворум контроллеров
controller.quorum.voters=1@node1:9093,2@node2:9093,3@node3:9093
# Сетевые настройки node2
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://node2:9092
advertised.controller.listener.name=CONTROLLER
# Директории данных
log.dirs=/var/lib/kafka
# Настройки репликации
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
# Системные топики
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# Настройки контроллеров
controller.listener.names=CONTROLLER
controller.quorum.election.timeout.ms=1000
controller.quorum.fetch.timeout.ms=2000
controller.quorum.request.timeout.ms=2000
# Настройки брокера
num.io.threads=8
num.network.threads=3
num.replica.fetchers=2
# Настройки логов
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.flush.interval.messages=10000
log.flush.interval.ms=1000
EOF
3.Настройка конфигурации на сервере 3:
cat > /opt/kafka/config/kraft/server.properties << 'EOF'
# Базовые настройки
process.roles=broker,controller
node.id=3
# Кворум контроллеров
controller.quorum.voters=1@wisla01:9093,2@wisla02:9093,3@wisla03:9093
# Сетевые настройки wisla03
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://wisla03:9092
advertised.controller.listener.name=CONTROLLER
# Директории данных
log.dirs=/var/lib/kafka
# Настройки репликации
num.partitions=3
default.replication.factor=3
min.insync.replicas=2
# Системные топики
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# Настройки контроллеров
controller.listener.names=CONTROLLER
controller.quorum.election.timeout.ms=1000
controller.quorum.fetch.timeout.ms=2000
controller.quorum.request.timeout.ms=2000
# Настройки брокера
num.io.threads=8
num.network.threads=3
num.replica.fetchers=2
# Настройки логов
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.flush.interval.messages=10000
log.flush.interval.ms=1000
EOF
4.Проверка конфигураций:
cat /opt/kafka/config/kraft/server.properties
Этап 6. Инициализация хранилищ на всех узлах
1.На каждом сервере по очереди
- Экспорт кластер ID:
export CLUSTER_ID="wisla-3node-cluster-12345" - Переходим в каталог kafka:
cd /opt/kafka - Добавляем кластер ID:
./bin/kafka-storage.sh format \ -t $CLUSTER_ID \ -c /opt/kafka/config/kraft/server.properties - Проверяем
cat /var/lib/kafka/meta.properties
Этап 7. Создаем сервис службы Kafka на всех узлах
1.Создаем Unit:
sudo cat > /etc/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Apache Kafka Service (Kraft mode) - Wisla 3-Node Cluster
After=network.target
Wants=network.target
[Service]
Type=simple
User=wisla
Group=wisla
WorkingDirectory=/opt/kafka
Environment="JAVA_HOME=/opt/kafka/jdk-17.0.17+10"
Environment="PATH=/opt/kafka/jdk-17.0.17+10/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
Environment="CLUSTER_ID=wisla-3node-cluster-12345"
Environment="KAFKA_HEAP_OPTS=-Xmx4G -Xms4G"
Environment="KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
Environment="KAFKA_OPTS=-Dlog4j.configuration=file:/opt/kafka/config/log4j.properties"
# Инициализация хранилища при первом запуске
ExecStartPre=/bin/bash -c "if [ ! -f /var/lib/kafka/meta.properties ]; then echo 'Инициализация хранилища для кластера...' && /opt/kafka/bin/kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties; fi"
# Запуск Kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
# Остановка
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
# Логи
StandardOutput=journal
StandardError=journal
SuccessExitStatus=0 143
# Перезапуск при сбоях
Restart=on-failure
RestartSec=10
# Ограничения
LimitNOFILE=65536
LimitNPROC=65536
[Install]
WantedBy=multi-user.target
EOF
2.Создание конфигурации log4j:
cat > /opt/kafka/config/log4j.properties << 'EOF'
log4j.rootLogger=INFO, stdout, kafkaAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=/var/log/kafka/kafka.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka=INFO
log4j.logger.kafka.network.RequestChannel$=WARN
log4j.logger.kafka.server.KafkaApis=WARN
log4j.logger.org.apache.zookeeper=WARN
log4j.logger.org.I0Itec.zkclient=WARN
EOF
Этап 8. Создание скриптов управления кластером
1. Создаем скрипт запуска кластера на первом сервере:
sudo cat > /usr/local/bin/kafka-cluster-start << 'EOF'
#!/bin/bash
echo "Запуск кластера Kafka на всех узлах..."
# Запуск на локальном узле
systemctl start kafka
echo "Node $(hostname): Kafka запущен"
# Если настроен SSH-доступ к другим узлам, можно добавить:
# ssh wisla@wisla02 "systemctl start kafka"
# ssh wisla@wisla03 "systemctl start kafka"
# echo "wisla02: Kafka запущен"
# echo "wisla03: Kafka запущен"
echo "Кластер Kafka (3 узла) запущен"
echo "Для проверки выполните: kafka-cluster-status"
EOF
2.Создаем скрипт остановки кластера на первом сервере:
sudo cat > /usr/local/bin/kafka-cluster-stop << 'EOF'
#!/bin/bash
echo "Остановка кластера Kafka..."
# Остановка в правильном порядке (сначала реплики, потом лидеры)
systemctl stop kafka
echo "Node $(hostname): Kafka остановлен"
echo "Кластер Kafka остановлен"
EOF
3.Создание скрипта проверки статуса на первом сервере:
sudo cat > /usr/local/bin/kafka-cluster-status << 'EOF'
#!/bin/bash
echo "=== Статус кластера Kafka (3 узла) ==="
echo ""
echo "Локальный узел ($(hostname)):"
systemctl status kafka --no-pager | grep -A 3 "Active:"
echo ""
echo "Для проверки репликации выполните на любом узле:"
echo " cd /opt/kafka && ./bin/kafka-topics.sh --describe --bootstrap-server wisla01:9092"
echo ""
echo "Для просмотра логов: sudo journalctl -u kafka -f"
EOF
4.Создание скрипта управления топиками:
sudo cat > /usr/local/bin/kafka-topic-manage << 'EOF'
#!/bin/bash
KAFKA_HOME="/opt/kafka"
BOOTSTRAP_SERVER="wisla01:9092,wisla02:9092,wisla03:9092"
case $1 in
create)
echo "Создание топика: $2"
$KAFKA_HOME/bin/kafka-topics.sh --create \
--bootstrap-server $BOOTSTRAP_SERVER \
--replication-factor 3 \
--partitions 3 \
--topic $2
;;
list)
echo "Список топиков:"
$KAFKA_HOME/bin/kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP_SERVER
;;
describe)
echo "Описание топика: $2"
$KAFKA_HOME/bin/kafka-topics.sh --describe \
--bootstrap-server $BOOTSTRAP_SERVER \
--topic $2
;;
delete)
echo "Удаление топика: $2"
$KAFKA_HOME/bin/kafka-topics.sh --delete \
--bootstrap-server $BOOTSTRAP_SERVER \
--topic $2
;;
*)
echo "Использование: $0 {create|list|describe|delete} [topic-name]"
echo "Пример:"
echo " $0 create wisla-topic"
echo " $0 list"
echo " $0 describe wisla-topic"
echo " $0 delete wisla-topic"
;;
esac
EOF
5.Предоставление прав на исполнение:
sudo chmod +x /usr/local/bin/kafka-*
6. Настройка /etc/hosts
sudo cat >> /etc/hosts << 'EOF'
# Kafka Cluster Nodes
192.168.1.101 Wisla01
192.168.1.102 Wisla02
192.168.1.103 Wisla03
EOF
7.Настройка брандмауэра (Опционально):
sudo ufw allow 9092/tcp # Client connections
sudo ufw allow 9093/tcp # Controller connections
sudo ufw allow 9094/tcp # Inter-broker connections (если настроены)
Этап 9. Запуск кластера
1.Перезагрузка systemd на всех узлах:
sudo systemctl daemon-reload
2.Добавление в автозапуск на всех узлах:
sudo systemctl enable kafka
3.Запуск кластера (ВАЖНО: запускать на всех узлах в течение 30 секунд):
# На wisla01:
sudo systemctl start kafka
# На wisla02 (в течение 30 секунд после wisla01):
sudo systemctl start kafka
# На wisla03 (в течение 30 секунд после wisla02):
sudo systemctl start kafka
4.Проверка статуса службы:
sudo systemctl status kafka
Дополнительная проверка:
Так же после установки и запуска службы Kafka можно создать скрипт проверки статусов работы кластера Kafka.
1.Создать файл скрипта командой:
sudo nano /usr/local/bin/kafka-cluster-monitor
2.Предоставить права на исполнение:
sudo chmod +x /usr/local/bin/kafka-cluster-monitor
3.Добавить скрипт в файл
#!/bin/bash
echo "========================================="
echo " Мониторинг кластера Kafka (3 узла) "
echo "========================================="
echo ""
echo "1. СТАТУС СЛУЖБ:"
for node in cluster01 cluster02 cluster03; do
if [ "$node" = "cluster01" ]; then
status=$(sudo systemctl is-active kafka 2>/dev/null || echo "unknown")
else
status=$(ssh wisla@$node "sudo systemctl is-active kafka 2>/dev/null" 2>/dev/null || echo "unknown")
fi
case $status in
active) color="\e[32m" ;;
inactive|failed) color="\e[31m" ;;
unknown) color="\e[33m" ;;
*) color="\e[33m" ;;
esac
echo -e " $node: ${color}${status}\e[0m"
done
echo -e "\n2. СПИСОК БРОКЕРОВ:"
cd /opt/kafka 2>/dev/null
if [ -f /opt/kafka/bin/kafka-broker-api-versions.sh ]; then
/opt/kafka/bin/kafka-broker-api-versions.sh \
--bootstrap-server cluster01:9092 2>/dev/null | grep -E "^cluster" | head -3
else
echo " Kafka не установлен локально"
fi
echo -e "\n3. СПИСОК ТОПИКОВ:"
cd /opt/kafka 2>/dev/null
if [ -f /opt/kafka/bin/kafka-topics.sh ]; then
/opt/kafka/bin/kafka-topics.sh \
--list --bootstrap-server cluster01:9092 2>/dev/null | head -10
else
echo " Kafka не установлен локально"
fi
echo -e "\n4. ПРОВЕРКА РЕПЛИКАЦИИ:"
cd /opt/kafka 2>/dev/null
if [ -f /opt/kafka/bin/kafka-topics.sh ]; then
result=$(/opt/kafka/bin/kafka-topics.sh \
--describe --bootstrap-server cluster01:9092 \
--under-replicated-partitions 2>/dev/null)
if echo "$result" | grep -q "Topic"; then
echo " Есть недореплицированные партиции"
else
echo " Все партиции реплицированы"
fi
else
echo " Kafka не установлен локально"
fi
echo -e "\n5. ПОСЛЕДНИЕ СОБЫТИЯ:"
for node in cluster01 cluster02 cluster03; do
if [ "$node" = "cluster01" ]; then
errors=$(sudo journalctl -u kafka --since '5 minutes ago' 2>/dev/null | grep -E 'ERROR|WARN' | tail -2)
else
errors=$(ssh wisla@$node "sudo journalctl -u kafka --since '5 minutes ago' 2>/dev/null | grep -E 'ERROR|WARN' | tail -2" 2>/dev/null)
fi
if [ -n "$errors" ]; then
echo " $node:"
echo "$errors" | sed 's/^/ /'
fi
done
echo -e "\n========================================="
echo " Кластер готов к работе!"
echo "========================================="
4.Запустить скрипт для проверки
./kafka-cluster-monitor
No comments to display
No comments to display