kafka的docker安装和压缩包安装

记录一下分别使用docker和压缩包如何安装kafka

kafka的Docker安装

安装zookeeper


  运行kafka 需要使用Zookeeper,所以你需要先启动Zookeeper

1
-v /etc/localtime:/etc/localtime

安装kafka


1
docker run  -d --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.14:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.14:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

  参数意义:

  #在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

  • -e KAFKA_BROKER_ID=0

  #zookeeper集群的地址,可以是多个,多个之间用逗号分割

  • -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.14:2181

  #发布到ZooKeeper供客户端使用的监听器

  • -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.14:9092

  #配置kafka的监听端口,指定hostname为0.0.0.0绑定所有接口

  • -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

  #容器时间同步虚拟机的时间

  • -v /etc/localtime:/etc/localtime

kafka的安装包安装

准备安装包


  Apache Kafka下载教程:Apache Kafka下载 - OrcHome

  下载官网:https://kafka.apache.org/downloads.html

  下载并且解压它

文件配置(单机演示模式)


  kafka的配置文件在config/server.properties文件中,配置:Socket Server Settings,供外部程序连接

1
advertised.listeners=PLAINTEXT:

  总结:advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。

  • 关于listeners配置

kafka listeners 和 advertised.listeners 的区别及应用_一一可可的博客-CSDN博客

  • 其他配置

kafka安装及配置过程 - zhaoshizi - 博客园

启动服务


  注意:你的本地环境必须安装有Java 8+。

  运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。

1
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

  加-daemon参数,可以在后台启动Zookeeper,输出的信息在保存在执行目录的logs/zookeeper.out文件中。

  启动kafka服务:

1
bin/kafka-server-start.sh -daemon config/server.properties

  注:

  kafka如果直接启动会出现问题,就是信息会打印在控制台,就会出现在控制台。

  如果关闭窗口,那么kafka会随之关闭,因此需要加-daemon参数,当然,还有另一种方法,使用nohup也可以实现程序后台运行

1
nohup bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 &

  停止kafka/zookeeper,可以使用kill/killall命令终止进程

  java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用。下面我贴出代码。

  pom.xml

1
<groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>

Java程序测试

生产者


1
package com.swadian.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;public static String topic = "test_topic";public static void main(String[] args) throws InterruptedException {Properties p = new Properties();        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.14:9092");        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);String msg = "Hello," + new Random().nextInt(100);                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);                kafkaProducer.send(record);                System.out.println("消息发送成功:" + msg);

  注意:

  1. kafka如果是集群,多个地址用逗号分割(,)
  2. Properties的put方法,第一个参数可以是字符串,如:p.put(“bootstrap.servers”,“192.168.1.14:9092”)
  3. kafkaProducer.send(record)可以通过返回的Future来判断是否已经发送到kafka,增强消息的可靠性。同时也可以使用send的第二个参数来回调,通过回调判断是否发送成功。
  4. p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);设置序列化类,可以写类的全路径

消费者


1
package com.swadian.kafka;import java.util.Collections;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;public static void main(String[] args) {Properties p = new Properties();        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.14:9092");        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        p.put(ConsumerConfig.GROUP_ID_CONFIG, "test_topic");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);        kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("topic:%s,offset:%d,消息:%s",                         record.topic(), record.offset(), record.value()));

  注意:

  1. 订阅消息可以订阅多个主题
  2. ConsumerConfig.GROUP_ID_CONFIG表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询。
  3. 主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区。
  4. 注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。