记录一下分别使用docker和压缩包如何安装kafka
kafka的Docker安装
运行kafka
需要使用Zookeeper,所以你需要先启动Zookeeper
1
|
-v /etc/localtime:/etc/localtime
|
安装kafka
1
|
docker run -d --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.14:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.14:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
|
参数意义:
#在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
#zookeeper集群的地址,可以是多个,多个之间用逗号分割
- -e KAFKA_ZOOKEEPER_CONNECT=192.168.1.14:2181
#发布到ZooKeeper供客户端使用的监听器
- -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.14:9092
#配置kafka的监听端口,指定hostname为0.0.0.0绑定所有接口
- -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
#容器时间同步虚拟机的时间
- -v /etc/localtime:/etc/localtime
kafka的安装包安装
准备安装包
Apache Kafka下载教程:Apache Kafka下载 - OrcHome
下载官网:https://kafka.apache.org/downloads.html
下载并且解压它
文件配置(单机演示模式)
kafka的配置文件在config/server.properties文件中,配置:Socket Server Settings,供外部程序连接
1
|
advertised.listeners=PLAINTEXT:
|
总结:advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。
kafka listeners 和 advertised.listeners 的区别及应用_一一可可的博客-CSDN博客
kafka安装及配置过程 - zhaoshizi - 博客园
启动服务
注意:你的本地环境必须安装有Java 8+。
运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。
1
|
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
|
加-daemon参数,可以在后台启动Zookeeper,输出的信息在保存在执行目录的logs/zookeeper.out文件中。
启动kafka服务:
1
|
bin/kafka-server-start.sh -daemon config/server.properties
|
注:
kafka如果直接启动会出现问题,就是信息会打印在控制台,就会出现在控制台。
如果关闭窗口,那么kafka会随之关闭,因此需要加-daemon参数,当然,还有另一种方法,使用nohup也可以实现程序后台运行
1
|
nohup bin/zookeeper-server-start.sh config/zookeeper.properties 1>/dev/null 2>&1 &
|
停止kafka/zookeeper,可以使用kill/killall命令终止进程
java操作kafka非常的简单,然后kafka也提供了很多缺省值,一般情况下我们不需要修改太多的参数就能使用。下面我贴出代码。
pom.xml
1
|
<groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
|
Java程序测试
生产者
1
|
package com.swadian.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;public static String topic = "test_topic";public static void main(String[] args) throws InterruptedException {Properties p = new Properties(); p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.14:9092"); p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);String msg = "Hello," + new Random().nextInt(100); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg); kafkaProducer.send(record); System.out.println("消息发送成功:" + msg);
|
注意:
- kafka如果是集群,多个地址用逗号分割(,)
- Properties的put方法,第一个参数可以是字符串,如:p.put(“bootstrap.servers”,“192.168.1.14:9092”)
- kafkaProducer.send(record)可以通过返回的Future来判断是否已经发送到kafka,增强消息的可靠性。同时也可以使用send的第二个参数来回调,通过回调判断是否发送成功。
- p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);设置序列化类,可以写类的全路径
消费者
1
|
package com.swadian.kafka;import java.util.Collections;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;public static void main(String[] args) {Properties p = new Properties(); p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.14:9092"); p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); p.put(ConsumerConfig.GROUP_ID_CONFIG, "test_topic");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p); kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
|
注意:
- 订阅消息可以订阅多个主题
- ConsumerConfig.GROUP_ID_CONFIG表示消费者的分组,kafka根据分组名称判断是不是同一组消费者,同一组消费者去消费一个主题的数据的时候,数据将在这一组消费者上面轮询。
- 主题涉及到分区的概念,同一组消费者的个数不能大于分区数。因为:一个分区只能被同一群组的一个消费者消费。出现分区小于消费者个数的时候,可以动态增加分区。
- 注意和生产者的对比,Properties中的key和value是反序列化,而生产者是序列化。