kafka-4-二节点集群安装kafka

    科技2022-07-11  89

    1 版本控制

    kafka_2.12-2.2.0.tgz zookeeper-3.4.8.tar.gz jdk1.8.0_181 centos 7

    2 免密登录

    (1)查看系统中是否安装SSH dpkg -L ssh |grep ssh (2)安装SSH yum install -y openssh-server (3)测试SSH命令是否可用 ssh localhost (4)设置SSH无密码登录 cd ~/.ssh ssh-keygen -t rsa有提示,全部回车 cat id_rsa.pub >> ./authorized_keys加入授权 (5)测试SSH无密码登录 ssh localhost

    3 java的多版本切换

    alternatives --install /usr/bin/java java /usr/bin/java 3 alternatives --install /usr/bin/java java /usr/local/jdk1.8.0_181/bin/java 4 alternatives --config java

    选择对应版本的序号。 /root/.bashrc环境变量

    export JAVA_HOME=/usr/local/jdk1.8.0_181 export PATH=$PATH:$JAVA_HOME/bin export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin export ZOOKEEPER_HOME=/usr/local/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin

    4 关闭防火墙

    CentOS 7.0默认使用的是firewall作为防火墙 (1)查看防火墙状态 firewall-cmd --state (2)停止firewall systemctl stop firewalld.service (3)禁止firewall开机启动 systemctl disable firewalld.service

    5 zookeeper集群模式

    5.1 zookeeper集群安装

    在实际情况下,我们的zookeeper都会以集群的方式进行安装。ZooKeeper的集群模式下,多个Zookeeper服务器在工作前会选举出一个Leader,在接下来的工作中这个被选举出来的Leader死了,而剩下的Zookeeper服务器会知道这个Leader死掉了,在活着的Zookeeper集群中会继续选出一个Leader,选举出Leader的目的是为了可以在分布式的环境中保证数据的一致性。

    确认集群服务器的数量:由于ZooKeeper集群中,会有一个Leader负责管理和协调其他集群服务器,因此服务器的数量通常都是单数,例如3,5,7…等,这样2n+1的数量的服务器就可以允许最多n台服务器的失效。

    5.1.1 解压安装

    (1)二个主机中均进行如下安装 tar -xzvf zookeeper-3.4.13.tar.gz -C /usr/local cd /usr/local mv zookeeper-3.4.13/ zookeeper gedit /root/.bashrc

    export ZOOKEEPER_HOME=/usr/local/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin

    5.1.2 创建myid文件

    集群模式下还要配置一个文件myid,这个文件在dataDir目录下,这个文件里面就只有一个数据,Zookeeper启动时会读取这个文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。

    注意一定不要忘记在dataDir目录下创建myid文件,创建myid文件。注意:myid文件要自己创建,在dataDir目录下。 node1机器的内容为:1 node2机器的内容为:2。

    root@node1:# cd /extendspace/zookeeper/ root@node1:# mkdir data root@node1:# touch data/myid root@node1:# echo 1 >> data/myid

    root@node2:# cd /extendspace/zookeeper/ root@node2:# mkdir data root@node2:# touch data/myid root@node2:# echo 2 >> data/myid

    5.1.3 修改配置文件

    cd /usr/local/zookeeper/conf cp zoo_sample.cfg zoo.cfg 主要更改zookeeper的数据存放地址和日志地址 gedit zoo.cfg

    # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial synchronization phase can take initLimit=10 # The number of ticks that can pass between sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/extendspace/zookeeper/data dataLogDir=/extendspace/zookeeper/datarizhi # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # Be sure to read the maintenance section of the administrator guide before turning on autopurge. # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.1=node1:2888:3888 server.2=node2:2888:3888

    配置完成以后把配置文件分发到其他的主机上; 参数解释: (1)tickTime 发送心跳的间隔时间,单位:毫秒 (2)dataDir zookeeper保存数据的目录。 (3)clientPort 客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口,接受客户端的访问请求。 (4)initLimit 这个配置项是用来配置Zookeeper接受客户端(这里所说的客户端不是用户连接Zookeeper服务器的客户端,而是Zookeeper服务器集群中连接到Leader的Follower服务器)初始化连接时最长能忍受多少个心跳时间间隔数。 当已经超过5个心跳的时间(也就是tickTime)长度后Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是52000=10秒 (5)syncLimit 这个配置项标识Leader与Follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是22000=4秒 (6)server.A=B:C:D A是一个数字,表示这个是第几号服务器; B是这个服务器的ip地址; C表示这个服务器与集群中的Leader服务器交换信息的端口; D表示的是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于B都是一样,所以不同的Zookeeper实例通信端口号不能一样,所以要给它们分配不同的端口号。

    5.1.4 启动zookeeper

    启动所有节点 root@node1:# zkServer.sh start root@node2:# zkServer.sh start #zkServer.sh status查看服务状态 #zkServer.sh stop停止集群,每个主机均停止

    6 Kafka

    6.1 kafka原理

    Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景。

    通常来讲,消息模型可以分为两种:队列和发布-订阅式。队列的处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型:消费者组(consumer group)。消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。假如所有的消费者都在一个组中,那么这就变成了queue模型。假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。更通用的,我们可以创建一些消费者组作为逻辑上的订阅者。每个组包含数目不等的消费者,一个组内多个消费者可以用来扩展性能和容错。

    并且,kafka能够保证生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说,如果一个消息M1和M2使用相同的producer发送,M1先发送,那么M1将比M2的offset低,并且优先的出现在日志中。消费者收到的消息也是此顺序。如果一个Topic配置了复制因子(replication facto)为N,那么可以允许N-1服务器宕机而不丢失任何已经提交(committed)的消息。此特性说明kafka有比传统的消息系统更强的顺序保证。但是,相同的消费者组中不能有比分区更多的消费者,否则多出的消费者一直处于空等待,不会收到消息。 #tar -xzvf kafka_2.11-0.10.1.0.tgz -C /usr/local #cd /usr/local #mv kafka_2.11-0.10.1.0/ kafka #gedit /root/.bashrc

    export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin

    6.2 二节点配置

    为集群上的每一台Kafka服务器单独配置配置文件,比如我们需要设置有二节点的Kafka集群,那么节点1、2的最基本的配置如下: (1)节点1

    broker.id=1 listeners=PLAINTEXT://192.168.0.209:9092 log.dir=/extendspace/kafka/logdata num.partitions=5 log.retention.hours=24 zookeeper.connect=192.168.0.209:2181 zookeeper.connection.timeout.ms=6000 offsets.topic.replication.factor=2

    (2)节点2

    broker.id=2 listeners=PLAINTEXT://192.168.0.210:9092 log.dir=/extendspace/kafka/logdata num.partitions=5 log.retention.hours=24 zookeeper.connect=192.168.0.210:2181 zookeeper.connection.timeout.ms=6000 offsets.topic.replication.factor=2

    6.3 启动kafka集群

    (1)启动zookeeper集群

    root@node1:~# zkServer.sh start root@node2:~# zkServer.sh start

    (2)启动kafka集群

    root@node1:# cd /usr/local/kafka/ root@node1:# ./bin/kafka-server-start.sh ./config/server1.properties root@node2:# cd /usr/local/kafka/ root@node2:# ./bin/kafka-server-start.sh ./config/server2.properties

    用下面的命令启动

    nohup ./bin/kafka-server-start.sh ./config/server2.properties >> /extendspace/kafkaoutput.log 2>&1 &

    (3)创建kafka的topic root@node1:~# cd /usr/local/kafka/ root@node1:/usr/local/kafka# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic kafka-test (4)查看创建的topic root@node1:~# kafka-topics.sh --list --zookeeper localhost:2181 或者 root@node2:~# kafka-topics.sh --list --zookeeper localhost:2181 5.4 测试应用 (1)发送消息 Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。 启动生产者 bin/kafka-console-producer.sh --broker-list node1:9092 --topic test

    输入几条消息发送到服务器 This is a message This is another message (2)启动消费者 消费者可以将消息转储到标准输出 打开第四个个终端 bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --from-beginning 或者 bin/kafka-console-consumer.sh --bootstrap-server node2:9092 --topic test --from-beginning

    然后就可以在命令行看到生产者发送的消息了 This is a message This is another message

    Processed: 0.037, SQL: 8