K8s construction Kafka:2.13-2.6.0 And Zookeeper:3.6.2 Clustering

itread01 2020-11-06 20:10:13
k8s construction kafka 2.13-2.6.0 2.6.0


## build Kafka:2.13-2.6.0 and Zookeeper:3.6.2 Cluster ### One 、 Service version information : * **Kafka**:v2.13-2.6.0 * **Zookeeper**:v3.6.2 * **Kubernetes**:v1.18.4 ### Two 、 Make Zookeeper The image Zookeeper It uses docker hub The official image provided in , Use the following command to download directly : ```bash docker pull zookeeper:3.6.2 ``` As the start script used in the official image is not suitable for internal use in our company , So for one of them docker-entrypoint.sh The command code and Dockerfile Some changes have been made . #### 1. modify docker-entrypoint.sh Instruction code Revised docker-entrypoint.sh The instruction code is as follows ( The contents of the original instruction code can be referred to :https://github.com/31z4/zookeeper-docker/tree/2373492c6f8e74d3c1167726b19babe8ac7055dd/3.6.2): ```bash #!/bin/bash set -e HOST=$(hostname -s) DOMAIN=$(hostname -d) CLIENT_PORT=2181 SERVER_PORT=2888 ELECTION_PORT=3888 function createConfig(){ if [[ ! -f "$ZOO_CONF_DIR/${HOST}/zoo.cfg" ]]; then # Create a directory based on the variables passed in mkdir -p $ZOO_CONF_DIR/${HOST} mkdir -p $ZOO_DATA_DIR/${HOST} mkdir -p $ZOO_DATA_LOG_DIR/${HOST} # towards zoo.cfg Write some necessary configuration items in the , These variables are in Dockerfile Well defined in , If you need to modify it, you can do it in yaml The file defines env CONFIG="$ZOO_CONF_DIR/${HOST}/zoo.cfg" { echo "dataDir=$ZOO_DATA_DIR/${HOST}" echo "dataLogDir=$ZOO_DATA_LOG_DIR/${HOST}" echo "tickTime=$ZOO_TICK_TIME" echo "initLimit=$ZOO_INIT_LIMIT" echo "syncLimit=$ZOO_SYNC_LIMIT" echo "autopurge.snapRetainCount=$ZOO_AUTOPURGE_SNAPRETAINCOUNT" echo "autopurge.purgeInterval=$ZOO_AUTOPURGE_PURGEINTERVAL" echo "maxClientCnxns=$ZOO_MAX_CLIENT_CNXNS" echo "standaloneEnabled=$ZOO_STANDALONE_ENABLED" echo "admin.enableServer=$ZOO_ADMINSERVER_ENABLED" } >> ${CONFIG} if [[ -n $ZOO_4LW_COMMANDS_WHITELIST ]]; then echo "4lw.commands.whitelist=$ZOO_4LW_COMMANDS_WHITELIST" >> ${CONFIG} fi # If you need to add other configuration items , Can be in yaml Archival env Set in configuration ZOO_CFG_EXTRA Variables , Write all the additional configuration items in this variable # It should be noted that , Add additional configuration items ,value Be sure to use zookeeper A recognizable name , Because there is no format conversion below for cfg_extra_entry in $ZOO_CFG_EXTRA; do echo "$cfg_extra_entry" >> ${CONFIG} done fi } # Because of sts In order to “ Service name - No ” To be named after Pod, The following is used to get the numeric number in the host name and the name of the service function getHostNum(){ if [[ $HOST =~ (.*)-([0-9]+)$ ]]; then NAME=${BASH_REMATCH[1]} ORD=${BASH_REMATCH[2]} else echo "Fialed to parse name and ordinal of Pod" exit 1 fi } # establish Zookeeper A mass of myid, This ensures that the generated myid It's unique and incremental function createID(){ ID_FILE="$ZOO_DATA_DIR/${HOST}/myid" MY_ID=$((ORD+1)) echo $MY_ID > $ID_FILE } # Write the information of each node to the configuration file , So that clustering works . It should be noted that , Be sure to pass... Into the container SERVERS Variables , And the value of this variable should be consistent with the number of copies # So when you want to expand the capacity of nodes in the future , Just change the number of copies and SERVERS The value of the variable is function addServer(){ for (( i=1; i<=$SERVERS; i++ )) do s="server.$i=$NAME-$((i-1)).$DOMAIN:$SERVER_PORT:$ELECTION_PORT;$CLIENT_PORT" [[ $(grep "$s" $ZOO_CONF_DIR/${HOST}/zoo.cfg) ]] || echo $s >> $ZOO_CONF_DIR/${HOST}/zoo.cfg done } # Authorize working and data catalogs , Allow to use --user zookeeper Start function userPerm(){ if [[ "$1" = 'zkServer.sh' && "$(id -u)" = '0' ]]; then chown -R zookeeper "$ZOO_DATA_DIR" "$ZOO_DATA_LOG_DIR" "$ZOO_LOG_DIR" "$ZOO_CONF_DIR" exec gosu zookeeper "$0" " [email protected]" fi } # Start Zookeeper, Due to changing the path of the configuration file , So be sure to use --config Options # The default configuration file directory is ZOO_CONF_DIR=/conf, Already in Dockerfile It's defined in , So if you don't change the default path , You can --config Get rid of function startZK(){ /apache-zookeeper-3.6.2-bin/bin/zkServer.sh --config "$ZOO_CONF_DIR/$(hostname -s)" start-foreground } createConfig getHostNum createID addServer userPerm startZK ``` #### 2. modify Dockerfile I'm here for Dockerfile The changes are very small , Just put the original ENTRYPOINT Comment out the configuration item ,CMD Configuration item changed to docker-entrypoint.sh Start : ```dockerfile FROM openjdk:11-jre-slim ENV ZOO_CONF_DIR=/conf \ ZOO_DATA_DIR=/data \ ZOO_DATA_LOG_DIR=/datalog \ ZOO_LOG_DIR=/logs \ ZOO_TICK_TIME=2000 \ ZOO_INIT_LIMIT=5 \ ZOO_SYNC_LIMIT=2 \ ZOO_AUTOPURGE_PURGEINTERVAL=0 \ ZOO_AUTOPURGE_SNAPRETAINCOUNT=3 \ ZOO_MAX_CLIENT_CNXNS=60 \ ZOO_STANDALONE_ENABLED=true \ ZOO_ADMINSERVER_ENABLED=true # Add a user with an explicit UID/GID and create necessary directories RUN set -eux; \ groupadd -r zookeeper --gid=1000; \ useradd -r -g zookeeper --uid=1000 zookeeper; \ mkdir -p "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR"; \ chown zookeeper:zookeeper "$ZOO_DATA_LOG_DIR" "$ZOO_DATA_DIR" "$ZOO_CONF_DIR" "$ZOO_LOG_DIR" # Install required packges RUN set -eux; \ apt-get update; \ DEBIAN_FRONTEND=noninteractive \ apt-get install -y --no-install-recommends \ ca-certificates \ dirmngr \ gosu \ gnupg \ netcat \ wget; \ rm -rf /var/lib/apt/lists/*; \ # Verify that gosu binary works gosu nobody true ARG GPG_KEY=BBE7232D7991050B54C8EA0ADC08637CA615D22C ARG SHORT_DISTRO_NAME=zookeeper-3.6.2 ARG DISTRO_NAME=apache-zookeeper-3.6.2-bin # Download Apache Zookeeper, verify its PGP signature, untar and clean up RUN set -eux; \ ddist() { \ local f="$1"; shift; \ local distFile="$1"; shift; \ local success=; \ local distUrl=; \ for distUrl in \ 'https://www.apache.org/dyn/closer.cgi?action=download&filename=' \ https://www-us.apache.org/dist/ \ https://www.apache.org/dist/ \ https://archive.apache.org/dist/ \ ; do \ if wget -q -O "$f" "$distUrl$distFile" && [ -s "$f" ]; then \ success=1; \ break; \ fi; \ done; \ [ -n "$success" ]; \ }; \ ddist "$DISTRO_NAME.tar.gz" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz"; \ ddist "$DISTRO_NAME.tar.gz.asc" "zookeeper/$SHORT_DISTRO_NAME/$DISTRO_NAME.tar.gz.asc"; \ export GNUPGHOME="$(mktemp -d)"; \ gpg --keyserver ha.pool.sks-keyservers.net --recv-key "$GPG_KEY" || \ gpg --keyserver pgp.mit.edu --recv-keys "$GPG_KEY" || \ gpg --keyserver keyserver.pgp.com --recv-keys "$GPG_KEY"; \ gpg --batch --verify "$DISTRO_NAME.tar.gz.asc" "$DISTRO_NAME.tar.gz"; \ tar -zxf "$DISTRO_NAME.tar.gz"; \ mv "$DISTRO_NAME/conf/"* "$ZOO_CONF_DIR"; \ rm -rf "$GNUPGHOME" "$DISTRO_NAME.tar.gz" "$DISTRO_NAME.tar.gz.asc"; \ chown -R zookeeper:zookeeper "/$DISTRO_NAME" WORKDIR $DISTRO_NAME VOLUME ["$ZOO_DATA_DIR", "$ZOO_DATA_LOG_DIR", "$ZOO_LOG_DIR"] EXPOSE 2181 2888 3888 8080 ENV PATH=$PATH:/$DISTRO_NAME/bin \ ZOOCFGDIR=$ZOO_CONF_DIR COPY docker-entrypoint.sh / # Will ENTRYPOINT Content notes # ENTRYPOINT ["/docker-entrypoint.sh"] # Put the original CMD Note , And add the following configuration # CMD ["zkServer.sh", "start-foreground"] CMD ["/docker-entrypoint.sh"] ``` #### 3. Package image and upload private server stay Dockerfile Under the root directory of , Use the following command to package the image , And modify tag ```bash docker build --tag 10.16.12.204/ops/zookeeper:custom-v3.6.2 -f Dockerfile . ``` Upload to image warehouse : ```bash docker push 10.16.12.204/ops/zookeeper:custom-v3.6.2 ``` ### 3、 ... and 、 Make Kafka The image Make Kafka The image is based on docker hub in wurstmeister The image made , The original image file can be downloaded using the following command : ```bash docker pull wurstmeister/kafka:2.13-2.6.0 ``` This image uses start-kafka.sh Script to initialize Kafka Configure and start , But some of them don't fit in K8S Requirements for deployment in , So modify the instruction code . #### 1. modify start-kafka.sh Instruction code The original start-kafka.sh The contents of the instruction code can be found in https://github.com/wurstmeister/kafka-docker In view of . The revised contents are as follows : ```bash #!/bin/bash -e # Allow specific kafka versions to perform any unique bootstrap operations OVERRIDE_FILE="/opt/overrides/${KAFKA_VERSION}.sh" if [[ -x "$OVERRIDE_FILE" ]]; then echo "Executing override file $OVERRIDE_FILE" eval "$OVERRIDE_FILE" fi # Store original IFS config, so we can restore it at various stages ORIG_IFS=$IFS # Set zookeeper Connection address , If the variable is not specified, an error will be reported if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT" exit 1 fi # Set kafka The port of , If no port is specified, the default port is used if [[ -z "$KAFKA_PORT" ]]; then export KAFKA_PORT=9092 fi # kafka Automatic creation after startup topic, If there is no designation KAFKA_CREATE_TOPICS It will not be automatically created topic create-topics.sh & unset KAFKA_CREATE_TOPICS # If there is no direct designation KAFKA_BROKER_ID, Then pass BROKER_ID_COMMAND Variable contains the command to automatically generate broker id, This ensures that broker id It's unique and incremental if [[ -z "$KAFKA_BROKER_ID" ]]; then if [[ -n "$BROKER_ID_COMMAND" ]]; then KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND") export KAFKA_BROKER_ID else export KAFKA_BROKER_ID=-1 fi fi # If there is no designation kafka log Catalog , Use the default address , The default directory name will have the current host name if [[ -z "$KAFKA_LOG_DIRS" ]]; then export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME" fi # If you specify KAFKA_HEAP_OPTS To configure , Write it to kafka-server-start.sh In the script if [[ -n "$KAFKA_HEAP_OPTS" ]]; then sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh" unset KAFKA_HEAP_OPTS fi # The function here is if you want the container to use the host name as the host name according to the result of executing the specified command after startup , Then assign this command to HOSTNAME_COMMAND # Then use eval To execute the command in the variable to get the result , And then assign it to HOSTNAME_VALUE Variables if [[ -n "$HOSTNAME_COMMAND" ]]; then HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND") # Replace any occurences of _{HOSTNAME_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}" fi done IFS=$ORIG_IFS fi # The function here is if you want the container to use the port number according to the returned result of executing the specified command after startup , Then assign this command to PORT_COMMAND # Then use eval To execute the command in the variable to get the result , And then assign it to PORT_VALUE Variables if [[ -n "$PORT_COMMAND" ]]; then PORT_VALUE=$(eval "$PORT_COMMAND") # Replace any occurences of _{PORT_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}" fi done IFS=$ORIG_IFS fi if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND") export KAFKA_BROKER_RACK fi # Here's to check if it's set KAFKA_LISTENERS Variables , The value is usually set to PLAINTEXT://:9092 if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS" exit 1 elif [[ -z "$HOSTNAME_VALUE" ]]; then echo "ERROR: No listener or advertised hostname configuration provided in environment." echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME" exit 1 fi # Maintain existing behaviour # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE" fi #Issue newline to config file in case there is not one already echo "" >> "$KAFKA_HOME/config/server.properties" ( function updateConfig() { key=$1 value=$2 file=$3 # Omit $value here, in case there is sensitive information echo "[Configuring] '$key' in '$file'" # If config exists in file, replace it. Otherwise, append to file. if grep -E -q "^#?$key=" "$file"; then sed -r -i " [email protected]^#?$key=.*@ [email protected]" "$file" #note that no config values may contain an '@' char else echo "$key=$value" >> "$file" fi } # KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|' # Define the initialization configuration to exclude , These configurations already exist in the configuration file , So there is no need to change or add new ones EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|" IFS=$'\n' for VAR in $(env) do env_var=$(echo "$VAR" | cut -d= -f1) if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then echo "Excluding $env_var from broker config" continue fi if [[ $env_var =~ ^KAFKA_ ]]; then kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .) updateConfig "$kafka_name" "${!env_var}" "$KAFKA_HOME/config/server.properties" fi if [[ $env_var =~ ^LOG4J_ ]]; then log4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .) updateConfig "$log4j_name" "${!env_var}" "$KAFKA_HOME/config/log4j.properties" fi done # The main thing is to add the configuration here , According to SERVERS Value , Put together BOOTSTRAP_SERVERS The address of , And update the configuration to the configuration file PODNAME=$(hostname -s | awk -F'-' 'OFS="-"{$NF="";print}' |sed 's/-$//g') for ((i=0;i<$SERVERS;i++)) do BOOTSTRAP_SERVERS+="$PODNAME-$i.$(hostname -d):${KAFKA_PORT}," done BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS%?} echo ${BOOTSTRAP_SERVERS} > /opt/log.txt sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/consumer.properties sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/producer.properties ) # If other initialization configuration scripts are defined , Then execute if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then eval "$CUSTOM_INIT_SCRIPT" fi exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties" ``` #### 2. modify Dockerfile Dockerfile No other changes have been made , It's just a revised version of start-kafka.sh The instruction code is added to the image , And use bash Environment to execute the instruction code ( Otherwise, some commands will not be executed ): ```dockerfile FROM wurstmeister/kafka:2.13-2.6.0 ADD start-kafka.sh / CMD ["bash","start-kafka.sh"] ``` #### 3. Package image and upload private server Use the following command to repack the image and modify tag: ```bash docker build --tag 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 -f Dockerfile . ``` Upload the image to the image warehouse : ```bash docker push 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 ``` ### Four 、 Create a namespace The whole Kafka and Zookeeper Clusters must be in the same namespace , So use the following yaml Establishment of Archives ns-kafka The name space : ```yaml --- apiVersion: v1 kind: Namespace metadata: name: ns-kafka labels: name: ns-kafka ``` ### 5、 ... and 、 establish Secret Kubelet To pull an image from the image warehouse needs to be verified , So create one for verification Harbor In the warehouse Secret: ```bash kubectl create secret docker-registry harbor-secret --namespace=ns-kafka --docker-server=http://10.16.12.204 --docker-username=admin --docker-password=Harbor12345 ``` ### 6、 ... and 、 establish PV and PVC In the process of building clusters , Plan to make Kafka Clustering and Zookeeper Clusters use the same PV. Define in the front Pod When initializing the script, you can see ,Kafka and Zookeeper Data directory and log directory in , They are all in the directory named after their own host name , So even if you use the same PV, You can also distinguish between directories . establish PV and PVC Of yaml The contents of the file are as follows : ```yaml --- apiVersion: v1 kind: PersistentVolume metadata: name: kafka-data-pv spec: accessModes: - ReadWriteMany capacity: storage: 500Gi local: path: /opt/ops_ceph_data/kafka_data nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: kafka-cluster operator: In values: - "true" persistentVolumeReclaimPolicy: Retain --- kind: PersistentVolumeClaim apiVersion: v1 metadata: name: kafka-data-pvc namespace: ns-kafka spec: accessModes: - ReadWriteMany resources: requests: storage: 500Gi ``` > One thing to declare is , The storage I currently use is cephfs, And mount it to K8S Of each node of /opt/ops_ceph_data Under the catalogue , So it's building PV The storage type used is local. ### 7、 ... and 、 establish Labels Because of the establishment above PV The specified storage type is local, This PV Only when you meet the specified requirements Label Schedule in the node of , So add a new one for all nodes in the cluster label: ```bash for i in 1 2 3 4 5; do kubectl label nodes k8s-node${i} kafka-cluster=true; done ``` ### 8、 ... and 、 establish Zookeeper Cluster #### 1. establish Service Established for Zookeeper Communicating with other nodes Service,yaml The contents of the file are as follows : ```yaml --- apiVersion: v1 kind: Service metadata: name: zk-inner-service namespace: ns-kafka labels: app: zk spec: selector: app: zk clusterIP: None ports: - name: server port: 2888 - name: leader-election port: 3888 --- apiVersion: v1 kind: Service metadata: name: zk-client-service namespace: ns-kafka labels: app: zk spec: selector: app: zk type: NodePort ports: - name: client port: 2181 nodePort: 31811 ``` #### 2. establish StatefulSet Zookeeper It belongs to stateful service , So use StatefulSet To deploy ,yaml The contents of the file are as follows : ```yaml --- apiVersion: apps/v1 kind: StatefulSet metadata: name: zk namespace: ns-kafka spec: selector: matchLabels: app: zk serviceName: "zk-inner-service" replicas: 3 updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: zk spec: containers: - name: zk imagePullPolicy: Always image: 10.16.12.204/ops/zookeeper:custom-v3.6.2 resources: requests: memory: "500Mi" cpu: "0.5" ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election env: - name: SERVERS # Set SERVERS Variables , Be sure to match the number of copies value: "3" - name: ZOO_CONF_DIR # Set the directory of configuration files value: /opt/conf - name: ZOO_DATA_DIR # Set the directory of the data file value: /opt/data - name: ZOO_DATA_LOG_DIR # Set the directory of the data log file value: /opt/data_log volumeMounts: # Set the directory where you want to persist the data - name: zookeeper-data mountPath: /opt/data subPath: zookeeper-cluster-data/data - name: zookeeper-data mountPath: /opt/data_log subPath: zookeeper-cluster-data/data_log - name: data-conf mountPath: /etc/localtime imagePullSecrets: - name: harbor-secret volumes: - name: zookeeper-data persistentVolumeClaim: claimName: kafka-data-pvc - name: data-conf hostPath: path: /usr/share/zoneinfo/Asia/Shanghai ``` #### 3. Verify cluster state After the cluster is built , Look at zookeeper The current state of each node , Use the following command : ```bash [@k8s-master1 /]# for i in 0 1 2; do kubectl exec -it zk-$i -n ns-kafka -- zkServer.sh --config /opt/conf/zk-$i status; done ZooKeeper JMX enabled by default Using config: /opt/conf/zk-0/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower ZooKeeper JMX enabled by default Using config: /opt/conf/zk-1/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leader ZooKeeper JMX enabled by default Using config: /opt/conf/zk-2/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower ``` You can see that the current cluster is a leader, Two follower. Next, verify the message synchronization of each node in the cluster , First, in the zk-0 Create a message on the node : ```bash [@k8s-master1 /]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] create /testMessage Hello Created /testMessage ``` View this message on the other two nodes : ```bash [@k8s-master1 /]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] get /testMessage Hello [@k8s-master1 /]# kubectl exec -it zk-2 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] get /testMessage Hello ``` You can see the message normally , Represents that the cluster is currently performing normally . ### Nine 、 establish Kafka Cluster #### 1. establish Service Established for Kafka Communication Service,yaml The contents of the file are as follows : ```yaml --- apiVersion: v1 kind: Service metadata: name: kafka-service namespace: ns-kafka labels: app: kafka spec: ports: - port: 9092 name: server clusterIP: None selector: app: kafka ``` #### 2. establish StatefulSet Kafka It belongs to stateful service , So use StatefulSet To deploy ,yaml The contents of the file are as follows : ```yaml --- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka namespace: ns-kafka spec: selector: matchLabels: app: kafka serviceName: "kafka-service" replicas: 3 updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: kafka spec: imagePullSecrets: - name: harbor-secret containers: - name: kafka imagePullPolicy: Always image: 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 resources: requests: memory: "500Mi" cpu: "0.5" env: - name: SERVERS # Make sure that SERVERS The set value is consistent with the number of copies value: "3" - name: KAFKA_LISTENERS value: "PLAINTEXT://:9092" - name: KAFKA_ZOOKEEPER_CONNECT # Set Zookeeper Connection address value: "zk-inner-service.ns-kafka.svc.cluster.local:2181" - name: KAFKA_PORT value: "9092" - name: KAFKA_MESSAGE_MAX_BYTES value: "20000000" - name: BROKER_ID_COMMAND # This variable is used to generate a broker id value: "hostname | awk -F'-' '{print $NF}'" volumeMounts: - name: kafka-log # Just put kafka Of log Directory persistent storage mountPath: /kafka subPath: kafka-cluster-log - name: data-conf mountPath: /etc/localtime volumes: - name: kafka-log persistentVolumeClaim: claimName: kafka-data-pvc - name: data-conf hostPath: path: /usr/share/zoneinfo/Asia/Shanghai ``` #### 3. Verify cluster state ##### 3.1 stay Zookeeper In view of broker ```bash [@k8s-master1 ~]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh Connecting to localhost:2181 [zk: localhost:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, seqid, topics] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0, 1, 2] [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074102"} [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-1.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074079"} [zk: localhost:2181(CONNECTED) 5] get /brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-2.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074009"} ``` You can see 3 One broker It's all in zookeeper Registered in . ##### 3.2 establish Topic stay kafka-0 Create a node named Message Of topic,3 A dividing slot ,3 Copies : ```bash [@k8s-master1 ~]# kubectl exec -it kafka-0 -n ns-kafka -- /bin/bash bash-4.4# kafka-topics.sh --create --topic Message --zookeeper zk-inner-service.ns-kafka.svc.cluster.local:2181 --partitions 3 --replication-factor 3 Created topic Message. ``` stay zk-1 Check whether this exists in the node Topic: ```bash [@k8s-master1 ~]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh Connecting to localhost:2181 [zk: localhost:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, seqid, topics] [zk: localhost:2181(CONNECTED) 3] ls /brokers/topics [Message] ``` You can see Zookeeper There is already this Topic 了 . ##### 3.3 Simulate producers and consumers First stay kafka-1 On the simulation of producer Message Write a message in : ```bash [@k8s-master1 ~]# kubectl exec -it kafka-1 -n ns-kafka -- /bin/bash bash-4.4# kafka-console-producer.sh --topic Message --broker-list kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092 >This is a test message >Welcome to Kafka ``` And then in kafka-2 In order to simulate consumers' consumption of this information : ```bash [@k8s-master1 ~]# kubectl exec -it kafka-2 -n ns-kafka -- /bin/bash bash-4.4# kafka-console-consumer.sh --topic Message --bootstrap-server kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092,kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092 --from-beginning This is a test message Welcome to Kafka ``` It can produce and consume information normally , representative Kafka Cluster execution is normal . ### Ten 、FAQ #### 1. How to be in yaml The file specifies that Topic stay yaml The file specifies the following env, Can be in Pod Start auto build Topic: ```yaml env: - name: KAFKA_CREATE_TOPICS value: "Topic1:1:3,Topic2:1:1:compact" ``` The content above represents Topic1 There will be 1 A dividing slot ,3 Copies ,Topic2 There will be 1 A dividing slot ,1 Copies and copies of cleanup.policy Set to compact. > Automatic creation Topic Be sure to set KAFKA_CREATE_TOPICS Variables , And then it's going to be create_topic.sh Instruction code ( There is... In the image ) Automatically create according to the variable content . #### 2. For Topic Set compaction Don't take effect Please refer to the website :https://github.com/wurstmeister/kafka-docker/wiki#topic-compaction-does-
版权声明
本文为[itread01]所创,转载请带上原文链接,感谢

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云