前提 Tomcat 10.1.x Tomcat線程池介紹 Tomcat線程池,源於JAVA JDK自帶線程池。由於JAVA JDK線程池策略,比較適合處理 CPU 密集型任務,但是對於 I/O 密集型任務,如資料庫查詢,rpc 請求調用等,不是很友好,所以Tomcat在其基礎上進行了擴展。 任務處理 ...
雲伺服器(Linux)安裝部署Kafka
前期準備
kafka的安裝需要依賴於jdk,需要在伺服器上提前安裝好該環境,這裡使用用jdk1.8。
下載安裝包
官網地址:
較新的版本已自帶Zookeeper,無需額外下載。這裡使用3.2.0做演示。
註意要下載Binary downloads標簽下的tgz包,Source download標簽下的包為源碼。無法直接運行,需要編譯。
上傳安裝包到雲伺服器
使用ssh連接工具將kafka_2.12-3.2.0.tgz這個包上傳到雲伺服器上的一個目錄。
打開命令行,進入到放有壓縮包的目錄,執行
tar -zxvf kafka_2.12-3.2.0.tgz
配置kafka
然後使用cd命令進入到/kafka_2.12-3.2.0/config/下,使用
vi server.properties
編輯配置文件。
刪除listeners和advertised前方的#號,改成如下配置:
listeners=PLAINTEXT://雲伺服器內網ip:9092(本地訪問用本地ip)
# 如果要提供外網訪問則必須配置此項
advertised.listeners=PLAINTEXT://雲伺服器公網ip:9092(若要遠程訪問需配置此項為雲伺服器的公網ip)
# zookeeper連接地址,集群配置格式為ip:port,ip:port,ip:port
zookeeper.connect=雲伺服器公網ip:2181
開放雲伺服器埠
在雲伺服器控制台內進入安全組頁面,添加兩條新的入站規則,tcp/9092和tcp/2181
開放linux防火牆埠
先查看使用的防火牆類型iptables/firewalld
iptables操作命令
1.打開/關閉/重啟防火牆
開啟防火牆(重啟後永久生效):chkconfig iptables on
關閉防火牆(重啟後永久生效):chkconfig iptables off
開啟防火牆(即時生效,重啟後失效):service iptables start
關閉防火牆(即時生效,重啟後失效):service iptables stop
重啟防火牆:service iptables restartd
2.查看打開的埠
/etc/init.d/iptables status
3.開啟埠
iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.保存並重啟防火牆
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart
Centos7預設安裝了firewalld,如果沒有安裝的話,可以使用 yum install firewalld firewalld-config進行安裝。
操作指令如下:
1.啟動防火牆
systemctl start firewalld
2.禁用防火牆
systemctl stop firewalld
3.設置開機啟動
systemctl enable firewalld
4.停止並禁用開機啟動
sytemctl disable firewalld
5.重啟防火牆
firewall-cmd --reload
6.查看狀態
systemctl status firewalld或者 firewall-cmd --state
7.在指定區域打開埠(記得重啟防火牆)
firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)
打開tcp/9092和tcp/2181這兩個埠後,重啟防火牆,並查看開放的埠確實生效。
啟動kafka服務
cd命令進入kafka_2.12-3.2.0目錄下,執行
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動zookeeper,不加-daemon方便排除啟動錯誤,新建一個shell視窗,進入該目錄再執行
bin/kafka-server-start.sh config/server.properties
啟動kafka,若列印日誌未報錯,若未出現error日誌,說明啟動成功。
測試單機連通性
查詢kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因為kafka使用zookeeper作為配置中心,一些topic信息需要查詢該kafka對應的zookeeper
創建topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
開啟生產者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
開啟消費者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test
Springboot連接kafak
在pom.xml文件中引入kafka依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
在application.yml配置文件中配置kafka
server:
port: 8080
spring:
kafka:
bootstrap-servers: 雲伺服器外網ip地址:9092
producer: # 生產者
retries: 3 # 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
# RECORD
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後提交
# BATCH
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
# TIME
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
# COUNT
# TIME | COUNT 有一個條件滿足時提交
# COUNT_TIME
# 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之後, 手動調用Acknowledgment.acknowledge()後提交
# MANUAL
# 手動調用Acknowledgment.acknowledge()後立即提交,一般使用這種
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
生產者
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "test-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public String send(@RequestParam("msg") String msg) {
kafkaTemplate.send(TOPIC_NAME, "key", msg);
return String.format("消息 %s 發送成功!", msg);
}
}
消費者
@Component
public class DemoConsumer {
/**
* @param record record
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同組下的消費者個數,就是併發消費數,必須小於等於分區總數
*/
@KafkaListener(topics = "test-topic", groupId = "testGroup1")
public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println("testGroup1 message: " + value);
System.out.println("testGroup1 record: " + record);
//手動提交offset,一般是提交一個banch,冪等性防止重覆消息
// === 每條消費完確認性能不好!
ack.acknowledge();
}
//配置多個消費組
@KafkaListener(topics = "test--topic", groupId = "testGroup2")
public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println("testGroup2 message: " + value);
System.out.println("testGroup2 record: " + record);
//手動提交offset
ack.acknowledge();
}
}
使用swagger測試發送消息
控制台列印消息