消費組和消費者 1. 消費組和消費者是一對多的關係。 2. 同一個消費組的消費者可以消費多個分區,且是獨占的。 3. 消費者的分區分配策略由介面 定義,內置三種分配策略 、`RoundRobinAssignor StickyAssignor`,支持自定義策略。 4. 不同消費組可以消費相同的分區,互 ...
消費組和消費者
- 消費組和消費者是一對多的關係。
- 同一個消費組的消費者可以消費多個分區,且是獨占的。
- 消費者的分區分配策略由介面
PartitionAssignor
定義,內置三種分配策略RangeAssignor
、RoundRobinAssignor
、StickyAssignor
,支持自定義策略。 - 不同消費組可以消費相同的分區,互不幹擾。
消費者協調器和組協調器
- 客戶端的消費者協調器
ConsumerCoordinator
和服務端的組協調器GroupCoordinator
通過心跳不斷保持通信。 - 消費者進行消費之前,需要確保協調器是 ready 的。
- 選擇具有最少請求的節點
Node
,即具有最少的InFlightRequests
的節點。 - 向該節點發送獲取協調器節點的請求,發送流程類似發送拉取請求。
- 向找到的協調器節點發送加入組請求,此時會禁止心跳線程。
- 加入組響應處理器
JoinGroupResponseHandler
對響應進行處理,響應包含generationId
、memberId
、leaderId
、protocol
。 - 如果是 leader 消費者,即
memberId=leaderId
,則需要根據分配策略protocol
計算分區分配。 - 將分區分配結果封裝到同步組請求,再向協調器節點發送同步組請求。
- 同步組響應處理器
SyncGroupResponseHandler
對上述請求的響應進行處理。 - 如果第5步判斷不是 follower 消費者,同樣需要向協調器發送同步組請求,只是請求中不需要封裝分區分配結果,而是從組協調器獲取。
- 加入組成功後,啟動心跳線程。
- 更新本地緩存的分區分配,此處會調用消費者再平衡監聽器。
- 選擇具有最少請求的節點
消費者狀態
- UNJOINED:消費者初始狀態為
UNJOINED
,表示未加入消費組。 - REBALANCING:消費者向協調器發送加入組請求之前,狀態變更為
REBALANCING
,表示再平衡狀態 - STABLE:消費者監聽到消息成功返回,狀態變更為
STABLE
,表示穩定狀態,如果是失敗的消息,狀態重置為UNJOINED
心跳線程
- 消費者加入消費組之後會啟動心跳線程,並保持和組協調器的通信。
- 如果消費者狀態不是
STABLE
,則不發送心跳。 - 如果組協調器未知,則等待一段時間重試。
- 如果心跳會話超時,則標記協調器節點未知。
- 如果心跳輪詢超時,則發送離開組請求。
- 如果暫不需要發送心跳,則等待一段時間重試。
- 發送心跳,註冊響應監聽器,接收到響應後,設置接收時間,併進行下一輪的心跳。
偏移量
拉取偏移量
- 如果有指定的分區,消費者協調器從組協調器拉取一組分區和已提交偏移量的映射關係,緩存到
SubscriptionState
。 - 設置偏移量重置策略:
LATEST
,EARLIEST
,NONE
。 - 非同步地更新消費的偏移量位置。
提交偏移量
- 消費者協調器獲取當前的協調器節點。
- 向該節點發送提交偏移量請求,返回
Future
。
加入組流程
消費者加入組流程的源碼分析
boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
final long startMs = time.milliseconds();
if (!coordinator.poll(timeoutMs)) { // 獲取協調器
return false;
}
// 更新偏移量
return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
}
// 獲取協調器
public boolean poll(final long timeoutMs) {
final long startTime = time.milliseconds();
long currentTime = startTime;
long elapsed = 0L;
if (subscriptions.partitionsAutoAssigned()) { // 是自動分配主題類型
// 更新心跳的上一次的輪詢時間
pollHeartbeat(currentTime);
if (coordinatorUnknown()) { // 協調器未知
// 確保協調器已經 ready
if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
return false;
}
}
if (rejoinNeededOrPending()) { // 需要加入消費組
// 加入組、同步組
if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
return false;
}
currentTime = time.milliseconds();
}
} else { // 指定分區類型
if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {// 如果沒有準備就緒的節點
// 阻塞等待元數據更新
final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
return false; // 更新元數據失敗
}
currentTime = time.milliseconds();
}
}
maybeAutoCommitOffsetsAsync(currentTime); // 非同步自動提交偏移量
return true;
}
// 確保協調器已經 ready
protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
final long startTimeMs = time.milliseconds();
long elapsedTime = 0L;
while (coordinatorUnknown()) { // 如果協調器未知
final RequestFuture<Void> future = lookupCoordinator(); // 向當前請求隊列最少的節點,發送獲取協調器的請求
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (!future.isDone()) {
break; // 響應未完成,退出
}
}
return !coordinatorUnknown();
}
// 加入組、同步組
boolean ensureActiveGroup(long timeoutMs, long startMs) {
startHeartbeatThreadIfNeeded(); // 啟動心跳線程
return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
}
boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
long elapsedTime = 0L;
while (rejoinNeededOrPending()) {
// 發送加入組請求
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (!future.isDone()) {
// we ran out of time
return false;
}
if (future.succeeded()) { // 加入成功,回調處理響應,更新緩存的分區分配
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
}
}
return true;
}
// 發送加入組請求
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
disableHeartbeatThread(); // 暫停心跳線程
state = MemberState.REBALANCING; // 狀態改為 REBALANCING
joinFuture = sendJoinGroupRequest(); // 向協調器發送加入組請求
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // 響應監聽器
@Override
public void onSuccess(ByteBuffer value) { // 成功
synchronized (AbstractCoordinator.this) {
state = MemberState.STABLE; // 狀態改為 STABLE
rejoinNeeded = false; // 不需要加入了
if (heartbeatThread != null)
heartbeatThread.enable(); // 啟動暫停了的心跳
}
}
@Override
public void onFailure(RuntimeException e) { // 失敗
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED; // 狀態改為 UNJOINED
}
}
});
}
return joinFuture;
}
// 向協調器發送加入組請求
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
groupId,
this.sessionTimeoutMs,
this.generation.memberId,
protocolType(),
metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler()); // 非同步回調響應處理類
}
// 非同步回調響應處理類
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) { // 如果是 REBALANCING,狀態異常
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
if (joinResponse.isLeader()) { // 當前消費組是 leader
onJoinLeader(joinResponse).chain(future);
} else { // 當消費者是 follower
onJoinFollower().chain(future);
}
}
}
}
}
}
// 發送 leader 消費者同步組請求
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// 根據響應的分配策略,給消費者分配分區
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
// 發送 follower 消費者同步組請求
private RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
Collections.<String, ByteBuffer>emptyMap()); // 發送不帶分配信息的請求
return sendSyncGroupRequest(requestBuilder);
}