kafka-3-单节点集群安装kafka并做成系统服务

    科技2022-07-10  113

    1 版本控制

    kafka_2.12-2.2.0.tgz zookeeper-3.4.8.tar.gz jdk1.8.0_181 centos 7

    2 java的多版本切换

    #alternatives --install /usr/bin/java java /usr/bin/java 3 #alternatives --install /usr/bin/java java /usr/local/jdk1.8.0_181/bin/java 4 #alternatives --config java

    选择对应版本的序号。 /root/.bashrc环境变量

    export JAVA_HOME=/usr/local/jdk1.8.0_181 export PATH=$PATH:$JAVA_HOME/bin export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin export ZOOKEEPER_HOME=/usr/local/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin

    3 关闭防火墙

    CentOS 7.0默认使用的是firewall作为防火墙 #查看防火墙状态 firewall-cmd --state #停止firewall systemctl stop firewalld.service #禁止firewall开机启动 systemctl disable firewalld.service

    4 zookeeper单机集群模式

    (1)配置,主要更改zookeeper的数据存放地址和日志地址 install_zookeeper.sh

    #! /bin/bash ## author:zb ## date:2020.10.24 ## 需要提前定义好的信息 myidvalue="1" hostname=`cat /etc/hostname` echo '开始安装zookeeper' echo '(1)解压' tar -xzvf zookeeper-3.4.8.tar.gz -C /usr/local echo '(2)配置环境变量ZOOKEEPER_HOME' ZOOKEEPER_HOME='/usr/local/zookeeper-3.4.8' echo "export ZOOKEEPER_HOME=${ZOOKEEPER_HOME}" >> /root/.bashrc echo 'export PATH=$PATH:$ZOOKEEPER_HOME/bin' >> /root/.bashrc source /root/.bashrc echo '(3)创建数据和日志目录' mkdir -p ${ZOOKEEPER_HOME}/data mkdir -p ${ZOOKEEPER_HOME}/datalog echo '(4)创建myid文件' cd ${ZOOKEEPER_HOME}/data touch myid echo ${myidvalue} >> myid echo '(5)修改配置文件zoo.cfg' cd ${ZOOKEEPER_HOME}/conf echo "tickTime=2000" >> zoo.cfg echo "initLimit=10" >> zoo.cfg echo "syncLimit=5" >> zoo.cfg echo "dataDir=${ZOOKEEPER_HOME}/data" >> zoo.cfg echo "dataLogDir=${ZOOKEEPER_HOME}/datalog" >> zoo.cfg echo "clientPort=2181" >> zoo.cfg echo "server.1=${hostname}:2888:3888" >> zoo.cfg

    注意写入的配置文件zoo.cfg的内容如下:

    tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper-3.4.8/data dataLogDir=/usr/local/zookeeper-3.4.8/datalog clientPort=2181 server.1=pda1:2888:3888

    注意创建的配置文件myid中的内容如下:

    1

    (2)启动 zkServer.sh start zkServer.sh status查看服务状态 启动之后查看状态,虽然是按照集群的模式配置的,但是因为只有一个节点的缘故,导致模式显示为standalone。

    5 Kafka单机集群模式

    (1)单节点集群配置 为集群上的每一台Kafka服务器单独配置配置文件,最基本的配置如下: install_kafka.sh

    #! /bin/bash ## author:zb ## date:2020.10.24 ## 需要提前定义好的信息 brokeridvalue="1" hostname=`cat /etc/hostname` echo '开始安装kafka' echo '(1)解压' tar -xzvf kafka_2.12-2.2.0.tgz -C /usr/local cd /usr/local mv kafka_2.12-2.2.0 kafka echo '(2)配置环境变量KAFKA_HOME' KAFKA_HOME='/usr/local/kafka' echo "export KAFKA_HOME=${KAFKA_HOME}" >> /root/.bashrc echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /root/.bashrc source /root/.bashrc echo '(3)创建日志目录' mkdir -p ${KAFKA_HOME}/datalog echo '(4)修改配置文件server0.properties' cd ${KAFKA_HOME}/config echo "broker.id=${brokeridvalue}" >> server0.properties echo "listeners=PLAINTEXT://${hostname}:9092" >> server0.properties echo "log.dir=${KAFKA_HOME}/datalog" >> server0.properties echo "num.partitions=5" >> server0.properties echo "log.retention.hours=24" >> server0.properties echo "zookeeper.connect=${hostname}:2181" >> server0.properties echo "zookeeper.connection.timeout.ms=60000" >> server0.properties echo "offsets.topic.replication.factor=1" >> server0.properties

    注意写入的配置文件内容server0.properties如下:

    broker.id=1 listeners=PLAINTEXT://pda1:9092 log.dir=/usr/local/kafka/datalog num.partitions=5 log.retention.hours=24 zookeeper.connect=pda1:2181 zookeeper.connection.timeout.ms=60000 offsets.topic.replication.factor=1

    (2)启动kafka zkServer.sh start启动zookeeper 再启动kafka

    cd /usr/local/kafka/ nohup ./bin/kafka-server-start.sh ./config/server0.properties >> /tmp/kafkaoutput.log 2>&1 &

    【注意若要kafka tools连接,需要在windows操作系统中配置具体的IP地址】 C:\Windows\System32\drivers\etc 编辑hosts文件

    192.168.43.48 pda1

    6 写入数据到kafka

    (1)创建kafka的topic

    cd /usr/local/kafka/ ./bin/kafka-topics.sh --create --zookeeper pda1:2181 --replication-factor 1 --partitions 1 --topic test

    (2)查看创建的topic

    kafka-topics.sh --list --zookeeper pda1:2181

    (3)测试应用 (3-1)发送消息 Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。 (3-2)启动生产者

    kafka-console-producer.sh --broker-list pda1:9092 --topic test #输入几条消息发送到服务器 >This is a message >This is another message

    (3-3)启动消费者 消费者可以将消息转储到标准输出 打开第四个个终端

    kafka-console-consumer.sh --bootstrap-server pda1:9092 --topic test --from-beginning #然后就可以在命令行看到生产者发送的消息了 This is a message This is another message

    (4)程序写入kafka消息

    # coding: utf-8 import csv import time from kafka import KafkaProducer # 实例化一个KafkaProducer示例,用于向Kafka投递消息 producer = KafkaProducer(bootstrap_servers=['192.168.43.48:9092']) i = 0 while True: print(i) data_str = "消息" + str(i) producer.send("test",data_str.encode("utf-8")) i = i+1 time.sleep(0.1)

    7 kafka做成系统服务

    在/etc/systemd/system下新建 zookeeper.service与kafka.service。 需要先启动zookeeper再启动kafka ,否则启动不了kafka。 #cd /etc/systemd/system/ (1)zookeeper.service

    [Unit] Description=Zookeeper After=network.target remote-fs.target nss-lookup.target [Service] Type=forking Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8.0_181/bin" ExecStart=/usr/local/zookeeper-3.4.8/bin/zkServer.sh start /usr/local/zookeeper-3.4.8/conf/zoo.cfg ExecReload=/bin/kill -s HUP $MAINPID ExecStop=/usr/local/zookeeper-3.4.8/bin/zkServer.sh stop [Install] WantedBy=multi-user.target

    #systemctl daemon-reload #systemcti enable zookeeper.service #systemctl start zookeeper #systemctl status zookeeper

    (2)kafka.service

    [Unit] Description=kafka After=network.target remote-fs.target nss-lookup.target zookeeper.service [Service] Type=forking Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8.0_181/bin" ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server0.properties ExecReload=/bin/kill -s HUP $MAINPID ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh #PrivateTmp=true [Install] WantedBy=multi-user.target

    #systemctl daemon-reload #systemcti enable kafka.service #systemctl start kafka #systemctl status kafka

    (3)注意

    注意 [service] Type 服务启动类型, forking:后台启动 Environment 为依赖的环境,“/usr/local/jdk1.8.0_201/bin”是自己jdk的安装路径 ExecStart 定义启动的命令,“/usr/local/kafka/” 为自己的kafka安装路径,“-daemon”:后台启动 ExecReload? 定义重启命令,“/bin/kill -s HUP $MAINPID” 通用重启命令 ExecStop 定义停止命令 PrivateTmp 是否分配独立空间,若分配,则jps将查不到。

    8 开机自启动

    /etc/rc.local

    #!/bin/sh -e # #rc.local # #This script is executed at the end of each multiuser runlevel. #Make sure that the script will "exit 0" on success or any other #value on error. # #In order to enable or disable this script just change the execution #bits. # #By default this script does nothing. /software/zookeeper-3.4.8/bin/zkServer.sh start nohup /software/kafka/bin/kafka-server-start.sh /software/kafka/config/server.properties >> /software/kafka/kafka_output.log 2>&1 & exit 0
    Processed: 0.020, SQL: 8