zookeeper源碼(10)node增刪改查及監聽

来源:https://www.cnblogs.com/xugf/p/18065356
-Advertisement-
Play Games

本文將從leader處理器入手,詳細分析node的增刪改查流程及監聽器原理。 回顧數據讀寫流程 leader ZookeeperServer.processPacket封裝Request並提交給業務處理器 LeaderRequestProcessor做本地事務升級 PrepRequestProces ...


本文將從leader處理器入手,詳細分析node的增刪改查流程及監聽器原理。

回顧數據讀寫流程

leader

  1. ZookeeperServer.processPacket封裝Request並提交給業務處理器
  2. LeaderRequestProcessor做本地事務升級
  3. PrepRequestProcessor做事務準備
  4. ProposalRequestProcessor事務操作發proposal給follower節點,持久化到log文件
  5. CommitProcessor讀請求直接轉發給下游處理器,事務操作等待到了quorum狀態轉發給下游處理器
  6. ToBeAppliedRequestProcessor清理toBeApplied集
  7. FinalRequestProcessor將事務寫到ZKDatabase中,給客戶端發響應

follower

  1. 處理PROPOSAL:使用SyncRequestProcessor處理器持久化,之後SendAckRequestProcessor給leader發ack
  2. 處理COMMIT:提交給CommitProcessor處理器,之後FinalRequestProcessor將事務寫到ZKDatabase中

創建node

涉及create、create2、createContainer、createTTL等命令。

PrepRequestProcessor事務準備

反序列化請求參數

switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
    CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
    pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
    break;
case OpCode.createTTL:
    // 預設不支持ttl
    CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
    pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
    break;
// ...

CreateRequest封裝創建node的參數:

public class CreateRequest implements Record {
  private String path;
  private byte[] data;
  private java.util.List<org.apache.zookeeper.data.ACL> acl;
  private int flags;
}

CreateTTLRequest封裝創建node加ttl的參數:

public class CreateTTLRequest implements Record {
  private String path;
  private byte[] data;
  private java.util.List<org.apache.zookeeper.data.ACL> acl;
  private int flags;
  private long ttl;
}

事務準備

protected void pRequest2Txn(int type, long zxid, Request request, Record record)
        throws KeeperException, IOException, RequestProcessorException {
    // ...

    switch (type) {
      case OpCode.create:
      case OpCode.create2:
      case OpCode.createTTL:
      case OpCode.createContainer: {
        pRequest2TxnCreate(type, request, record);
        break;
      }
    // ...
    }
}

private void pRequest2TxnCreate(
        int type, Request request, Record record) throws IOException, KeeperException {
    int flags;
    String path;
    List<ACL> acl;
    byte[] data;
    long ttl;
    if (type == OpCode.createTTL) {
        CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
        // 給flags等參數賦值
    } else {
        CreateRequest createRequest = (CreateRequest) record;
        // 給flags等參數賦值
        ttl = -1;
    }
    // CreateMode:
    // PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL,
    // CONTAINER, PERSISTENT_WITH_TTL, PERSISTENT_SEQUENTIAL_WITH_TTL
    CreateMode createMode = CreateMode.fromFlag(flags);
    // 驗證臨時節點、ttl參數、檢查session
    // 預設不支持ttl
    validateCreateRequest(path, createMode, request, ttl);
    String parentPath = validatePathForCreate(path, request.sessionId); // 父節點path

    List<ACL> listACL = fixupACL(path, request.authInfo, acl); // 請求攜帶的許可權
    ChangeRecord parentRecord = getRecordForPath(parentPath); // 得到父節點
    // 驗證CREATE許可權
    zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
    int parentCVersion = parentRecord.stat.getCversion();
    if (createMode.isSequential()) { // 順序節點
        // 例如/users/admin0000000001
        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
    }
    validatePath(path, request.sessionId);
    // 略
    boolean ephemeralParent = 
        EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
    // 父節點不可以是臨時節點

    int newCversion = parentRecord.stat.getCversion() + 1; // 父節點的childVersion++
    // 檢查位元組限額
    zks.checkQuota(path, null, data, OpCode.create);
    // 不同類型創建對應的Txn對象
    if (type == OpCode.createContainer) {
        request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
    } else if (type == OpCode.createTTL) {
        request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
    } else {
        request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
    }

    TxnHeader hdr = request.getHdr();
    long ephemeralOwner = 0;
    if (createMode.isContainer()) {
        ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
    } else if (createMode.isTTL()) {
        ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
    } else if (createMode.isEphemeral()) {
        ephemeralOwner = request.sessionId; // 臨時節點使用sessionId
    }
    // czxid(created),mzxid(modified),ctime,mtime,version,cversion(childVersion),
    // aversion(aclVersion),ephemeralOwner,pzxid(lastModifiedChildren)
    StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);

    // 父節點
    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount++;
    parentRecord.stat.setCversion(newCversion);
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord);
    // 新增節點
    ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
    nodeRecord.data = data;
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord);
}

protected void addChangeRecord(ChangeRecord c) {
    synchronized (zks.outstandingChanges) {
        zks.outstandingChanges.add(c);
        zks.outstandingChangesForPath.put(c.path, c);
    }
}

outstandingChanges保存未提交的事務變化,比如在生成順序節點時需要使用cversion值,但是在事務提交到ZKDatabase之前,庫裡面的值是舊的,所以在上面的代碼中,是從outstandingChanges查找節點,給cversion++後再生成順序節點。

在事務提交之後,才會清理outstandingChanges集。

ProposalRequestProcessor發Proposal

public void processRequest(Request request) throws RequestProcessorException {
    if (request instanceof LearnerSyncRequest) { // sync命令流程,暫不分析
        zks.getLeader().processSync((LearnerSyncRequest) request);
    } else {
        if (shouldForwardToNextProcessor(request)) {
            nextProcessor.processRequest(request); // 提交給下游處理器
        }
        if (request.getHdr() != null) { // 事務操作需要發proposal並寫磁碟
            try {
                zks.getLeader().propose(request);
            } catch (XidRolloverException e) {
                throw new RequestProcessorException(e.getMessage(), e);
            }
            // 把事務log寫到文件中
            // 之後通過AckRequestProcessor處理器給leader ack
            syncProcessor.processRequest(request);
        }
    }
}

CommitProcessor提交事務

public void processRequest(Request request) {
    request.commitProcQueueStartTime = Time.currentElapsedTime();
    queuedRequests.add(request); // 所有請求隊列
    if (needCommit(request)) { // 需要提交的請求進入到寫隊列
        queuedWriteRequests.add(request);
        numWriteQueuedRequests.incrementAndGet();
    } else {
        numReadQueuedRequests.incrementAndGet();
    }
    wakeup();
}

run方法對比queuedRequests、queuedWriteRequests、committedRequests這幾個隊列,將提交成功的請求或讀請求轉發給下游的ToBeAppliedRequestProcessor處理器。

FinalRequestProcessor應用事務

該處理器位於處理器鏈的末尾,負責將事務應用到ZKDatabase、查詢數據、返迴響應。

applyRequest

該方法將事務應用到ZKDatabase中:

private ProcessTxnResult applyRequest(Request request) {
    // 應用事務
    ProcessTxnResult rc = zks.processTxn(request);

    // closeSession

    // metrics

    return rc;
}

zks.processTxn負責處理session、處理事務、清理outstandingChanges集。重點看一下處理事務的步驟。

processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        case OpCode.create:
            CreateTxn createTxn = (CreateTxn) txn;
            rc.path = createTxn.getPath();
            createNode(
                createTxn.getPath(),
                createTxn.getData(),
                createTxn.getAcl(),
                createTxn.getEphemeral() ? header.getClientId() : 0,
                createTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                null);
            break;
        case OpCode.create2:
            CreateTxn create2Txn = (CreateTxn) txn;
            rc.path = create2Txn.getPath();
            Stat stat = new Stat();
            createNode(
                create2Txn.getPath(),
                create2Txn.getData(),
                create2Txn.getAcl(),
                create2Txn.getEphemeral() ? header.getClientId() : 0,
                create2Txn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createTTL:
            CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
            rc.path = createTtlTxn.getPath();
            stat = new Stat();
            createNode(
                createTtlTxn.getPath(),
                createTtlTxn.getData(),
                createTtlTxn.getAcl(),
                EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
                createTtlTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        case OpCode.createContainer:
            CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
            rc.path = createContainerTxn.getPath();
            stat = new Stat();
            createNode(
                createContainerTxn.getPath(),
                createContainerTxn.getData(),
                createContainerTxn.getAcl(),
                EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                createContainerTxn.getParentCVersion(),
                header.getZxid(),
                header.getTime(),
                stat);
            rc.stat = stat;
            break;
        // ...
        }
    }
    // ...
}

createNode

public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws NoNodeException, NodeExistsException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);
    StatPersisted stat = createStat(zxid, time, ephemeralOwner);
    DataNode parent = nodes.get(parentName); // 父節點必須存在
    if (parent == null) {
        throw new NoNodeException();
    }
    synchronized (parent) {
        Long acls = aclCache.convertAcls(acl);

        Set<String> children = parent.getChildren();
        if (children.contains(childName)) { // 節點不能存在
            throw new NodeExistsException();
        }

        nodes.preChange(parentName, parent);
        if (parentCVersion == -1) { // childVersion++
            parentCVersion = parent.stat.getCversion();
            parentCVersion++;
        }

        if (parentCVersion > parent.stat.getCversion()) {
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
        }
        // 創建node
        DataNode child = new DataNode(data, acls, stat);
        parent.addChild(childName);
        nodes.postChange(parentName, parent);
        nodeDataSize.addAndGet(getNodeSize(path, child.data));
        nodes.put(path, child); // 維護NodeHashMap
        // 處理臨時節點
        EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.add(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.add(path);
        } else if (ephemeralOwner != 0) {
            HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>());
            synchronized (list) {
                list.add(path);
            }
        }
        // 返回節點stat
        if (outputStat != null) {
            child.copyStat(outputStat);
        }
    }
    // now check if its one of the zookeeper node child 略

    // 觸發NodeCreated監聽
    dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
    // 觸發父節點的NodeChildrenChanged監聽
    childWatches.triggerWatch(
        parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

返迴響應

case OpCode.create: {
    lastOp = "CREA";
    rsp = new CreateResponse(rc.path); // 創建Response
    err = Code.get(rc.err); // processTxn的err
    requestPathMetricsCollector.registerRequest(request.type, rc.path);
    break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
    lastOp = "CREA";
    rsp = new Create2Response(rc.path, rc.stat); // 創建Response
    err = Code.get(rc.err); // processTxn的err
    requestPathMetricsCollector.registerRequest(request.type, rc.path);
    break;
}

最後會使用cnxn把響應返回給客戶端:

ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
cnxn.sendResponse(hdr, rsp, "response");

EphemeralType

  • VOID
  • NORMAL
  • CONTAINER
  • TTL

ephemeralOwner標識znode是臨時的,以及哪個會話創建了該節點。通過zookeeper.extendedTypesEnabled屬性可以啟用ttl節點等擴展功能。ephemeralOwner的"特殊位"用於表示啟用了哪個功能,而ephemeral Owner的剩餘位是特定於功能的。

當zookeeper.extendedTypesEnabled為true時,將啟用擴展類型。擴展ephemeralOwner填充高8位(0xff00000000000000L),高8位之後的兩個位元組用於表示ephemeralOwner擴展特征,剩餘5個位元組由該功能指定,可用於所需的任何目的。

目前,唯一擴展功能是TTL節點,擴展特征值為0。對於TTL節點,ephemeralOwner具有0xff的高8位,接下來2個位元組是0,後面的5個位元組是以毫秒為單位的ttl值。因此ttl值為1毫秒的ephemeralOwner是0xff00000000000001。

要添加新的擴展功能:

  • 向枚舉添加新名稱
  • 在ttl之後,定義常量extended_BIT_xxxx,即0x0001
  • 通過靜態初始值設定項向extendedFeatureMap添加映射

註意:從技術上講,容器節點也是擴展類型,但由於它是在該功能之前實現的,因此被特別表示。根據定義,只有高位集(0x8000000000000000L)的臨時所有者是容器節點(無論是否啟用擴展類型)。

ttl節點

  • 預設不開啟,使用
  • Added in 3.5.3
  • 創建PERSISTENT或PERSISTENT_SEQUENTIAL節點時,可以設置以毫秒為單位的ttl。如果znode沒有在ttl內修改,並且沒有子節點,它將在將來的某個時候成為伺服器刪除的候選節點

container節點

  • Added in 3.5.3
  • 當container節點的最後一個子節點被刪除時,該container節點將成為伺服器在未來某個時候刪除的候選節點

Stat類

封裝節點屬性,欄位如下:

  • czxid The zxid of the change that caused this znode to be created.
  • mzxid The zxid of the change that last modified this znode.
  • pzxid The zxid of the change that last modified children of this znode.
  • ctime The time in milliseconds from epoch when this znode was created.
  • mtime The time in milliseconds from epoch when this znode was last modified.
  • version The number of changes to the data of this znode.
  • cversion The number of changes to the children of this znode.
  • aversion The number of changes to the ACL of this znode.
  • ephemeralOwner The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
  • dataLength The length of the data field of this znode.
  • numChildren The number of children of this znode.

刪除node

涉及delete、deleteContainer等命令。

PrepRequestProcessor事務準備

反序列化請求參數

private void pRequestHelper(Request request) {
    try {
        switch (request.type) {
        // ...
        case OpCode.deleteContainer:
            DeleteContainerRequest deleteContainerRequest =
                request.readRequestRecord(DeleteContainerRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
            break;
        case OpCode.delete:
            DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
            break;
        }
        // ...
    }
}

DeleteContainerRequest類:

public class DeleteContainerRequest implements Record {
    private String path;
}

DeleteRequest類:

public class DeleteRequest implements Record {
  private String path;
  private int version;
}

事務準備

protected void pRequest2Txn(int type, long zxid, Request request,
             Record record) throws KeeperException, IOException, RequestProcessorException {
    // 略

    switch (type) {
    // 略
    case OpCode.deleteContainer: {
        DeleteContainerRequest txn = (DeleteContainerRequest) record;
        String path = txn.getPath();
        String parentPath = getParentPathAndValidate(path);
        ChangeRecord nodeRecord = getRecordForPath(path); // 獲取待刪除節點
        if (nodeRecord.childCount > 0) { // 有子節點不允許刪除
            throw new KeeperException.NotEmptyException(path);
        }
        if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
            throw new KeeperException.BadVersionException(path);
        }
        ChangeRecord parentRecord = getRecordForPath(parentPath); // 獲取父節點
        request.setTxn(new DeleteTxn(path));
        // addChangeRecord 略
        break;
    }
    case OpCode.delete:
        zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        DeleteRequest deleteRequest = (DeleteRequest) record;
        String path = deleteRequest.getPath();
        String parentPath = getParentPathAndValidate(path);
        ChangeRecord parentRecord = getRecordForPath(parentPath); // 獲取父節點
        // 檢查DELETE許可權
        zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
        ChangeRecord nodeRecord = getRecordForPath(path); // 獲取待刪除節點
        checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path); // 檢查version
        if (nodeRecord.childCount > 0) { // 有子節點不允許刪除
            throw new KeeperException.NotEmptyException(path);
        }
        request.setTxn(new DeleteTxn(path));
        // addChangeRecord 略
        break;
    }
}

FinalRequestProcessor應用事務

processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        // ...
        case OpCode.delete:
        case OpCode.deleteContainer:
            DeleteTxn deleteTxn = (DeleteTxn) txn;
            rc.path = deleteTxn.getPath();
            deleteNode(deleteTxn.getPath(), header.getZxid());
            break;
        }
        // ...
    }
}

deleteNode

public void deleteNode(String path, long zxid) throws NoNodeException {
    int lastSlash = path.lastIndexOf('/');
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);

    DataNode parent = nodes.get(parentName);
    if (parent == null) { // 獲取父節點且必須存在
        throw new NoNodeException();
    }
    synchronized (parent) {
        nodes.preChange(parentName, parent);
        parent.removeChild(childName);
        if (zxid > parent.stat.getPzxid()) {
            parent.stat.setPzxid(zxid); // The zxid of the change that last modified children of this znode
        }
        nodes.postChange(parentName, parent);
    }

    DataNode node = nodes.get(path); // 獲取刪除節點
    if (node == null) {
        throw new NoNodeException();
    }
    nodes.remove(path); // 從NodeHashMap刪除
    synchronized (node) { // 移除許可權
        aclCache.removeUsage(node.acl);
        nodeDataSize.addAndGet(-getNodeSize(path, node.data));
    }

    // 移除臨時節點、container、ttl等緩存
    synchronized (parent) {
        long owner = node.stat.getEphemeralOwner();
        EphemeralType ephemeralType = EphemeralType.get(owner);
        if (ephemeralType == EphemeralType.CONTAINER) {
            containers.remove(path);
        } else if (ephemeralType == EphemeralType.TTL) {
            ttls.remove(path);
        } else if (owner != 0) {
            Set<String> nodes = ephemerals.get(owner);
            if (nodes != null) {
                synchronized (nodes) {
                    nodes.remove(path);
                }
            }
        }
    }

    // 略

    // 觸發NodeDeleted監聽
    WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
    childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
    // 觸發父節點的NodeChildrenChanged監聽
    childWatches.triggerWatch(
        "".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}

設置node數據

PrepRequestProcessor事務準備

反序列化請求參數

private void pRequestHelper(Request request) {
    try {
        switch (request.type) {
        // ...
        case OpCode.setData:
            SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
            break;
        // other case
        }
    }
    // ...
}

SetDataRequest類:

public class SetDataRequest implements Record {
  private String path;
  private byte[] data;
  private int version;
}

事務準備

protected void pRequest2Txn(int type, long zxid, Request request,
          Record record) throws KeeperException, IOException, RequestProcessorException {
    // 略

    switch (type) {
    // ...
    case OpCode.setData:
        zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
        SetDataRequest setDataRequest = (SetDataRequest) record;
        path = setDataRequest.getPath();
        validatePath(path, request.sessionId);
        nodeRecord = getRecordForPath(path); // 獲取節點對象
        // 檢查許可權
        zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
        // 檢查位元組限額
        zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
        // version++
        int newVersion = checkAndIncVersion(
            nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
        // 創建SetDataTxn
        request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
        // addChangeRecord
        break;
    // other case
    }
}

FinalRequestProcessor應用事務

processTxn

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
        // other case
        case OpCode.setData:
            SetDataTxn setDataTxn = (SetDataTxn) txn;
            rc.path = setDataTxn.getPath();
            rc.stat = setData(
                setDataTxn.getPath(),
                setDataTxn.getData(),
                setDataTxn.getVersion(),
                header.getZxid(),
                header.getTime());
            break;
        // other case
        }
    }
    // ...
}

setData

public Stat setData(String path, byte[] data, int version,
                    long zxid, long time) throws NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    if (n == null) { // 檢查節點存在
        throw new NoNodeException();
    }
    byte[] lastData;
    synchronized (n) {
        lastData = n.data;
        nodes.preChange(path, n);
        n.data = data; // 節點數據
        n.stat.setMtime(time); // 修改時間
        n.stat.setMzxid(zxid); // 修改zxid
        n.stat.setVersion(version); // 版本
        n.copyStat(s);
        nodes.postChange(path, n);
    }

    // 略

    // 觸發NodeDataChanged監聽
    dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
    return s;
}

查詢node數據

PrepRequestProcessor驗證session

經過該處理器時,只做session驗證。

之後的ProposalRequestProcessor、CommitProcessor、ToBeAppliedRequestProcessor都是直接通過,不做事務處理,直接交給FinalRequestProcessor處理器查詢數據、發送響應。

FinalRequestProcessor查詢數據

使用handleGetDataRequest方法查詢數據:

private Record handleGetDataRequest(
        Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
    GetDataRequest getDataRequest = (GetDataRequest) request;
    String path = getDataRequest.getPath();
    DataNode n = zks.getZKDatabase().getNode(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    // 檢查許可權
    zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
    Stat stat = new Stat();
    // 查詢數據
    // 如果watcher參數不為null會給path添加一個監聽器
    byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
    return new GetDataResponse(b, stat);
}

GetDataRequest類:

public class GetDataRequest implements Record {
  private String path;
  private boolean watch;
}

節點監聽

addWatch命令

case OpCode.addWatch: {
    lastOp = "ADDW";
    AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
    // 最終使用DataTree的addWatch方法註冊監聽器
    // cnxn是ServerCnxn對象,實現了Watcher介面
    zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
    rsp = new ErrorResponse(0);
    break;
}

DataTree的addWatch方法

public void addWatch(String basePath, Watcher watcher, int mode) {
    // PERSISTENT|PERSISTENT_RECURSIVE
    WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
    // dataWatches和childWatches是WatchManager類型對象
    dataWatches.addWatch(basePath, watcher, watcherMode);
    if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
        childWatches.addWatch(basePath, watcher, watcherMode);
    }
}

WatcherMode枚舉

public enum WatcherMode {
    STANDARD(false, false),
    PERSISTENT(true, false), // persistent=0
    PERSISTENT_RECURSIVE(true, true), // persistentRecursive=1
    ;
}

PERSISTENT和PERSISTENT_RECURSIVE是3.6.0版本新增的特性。

Watcher介面

實現類需要實現process方法:

void process(WatchedEvent event);

WatchedEvent代表一個監聽事件:

public class WatchedEvent {
    // 當前zk伺服器的狀態
    private final KeeperState keeperState;
    // NodeCreated|NodeDeleted|NodeDataChanged|NodeChildrenChanged等
    private final EventType eventType;
    private final String path;
    private final long zxid;
}

重要的實現類:

  • NIOServerCnxn
  • NettyServerCnxn

WatchManager類

This class manages watches. It allows watches to be associated with a string and removes watchers and their watches in addition to managing triggers.

核心欄位:

// path -> Watcher集
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
// Watcher -> path->WatchStats(PERSISTENT|STANDARD + PERSISTENT|PERSISTENT_RECURSIVE等)
private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();

WatchStats類

public final class WatchStats {
    private static final WatchStats[] WATCH_STATS = new WatchStats[] {
            new WatchStats(0), // NONE
            new WatchStats(1), // STANDARD
            new WatchStats(2), // PERSISTENT
            new WatchStats(3), // STANDARD + PERSISTENT
            new WatchStats(4), // PERSISTENT_RECURSIVE
            new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
            new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
            new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
    };

    public static final WatchStats NONE = WATCH_STATS[0];

    private final int flags;

    private WatchStats(int flags) {
        this.flags = flags;
    }

    private static int modeToFlag(WatcherMode mode) {
        // mode = STANDARD; return 1 << 0 = 1(0001)
        // mode = PERSISTENT; return 1 << 1 = 2(0010)
        // mode = PERSISTENT_RECURSIVE; return 1 << 2 = (0100)
        return 1 << mode.ordinal();
    }

    public WatchStats addMode(WatcherMode mode) {
        int flags = this.flags | modeToFlag(mode); // |計算保留多種狀態
        return WATCH_STATS[flags];
    }

    public WatchStats removeMode(WatcherMode mode) {
        int mask = ~modeToFlag(mode); // 取反
        int flags = this.flags & mask;
        if (flags == 0) {
            return NONE;
        }
        return WATCH_STATS[flags];
    }

    // ...
}

addWatch

public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
    Set<Watcher> list = watchTable.get(path);
    if (list == null) {
        list = new HashSet<>(4);
        watchTable.put(path, list);
    }
    list.add(watcher); // 添加watchTable

    Map<String, WatchStats> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashMap<>();
        watch2Paths.put(watcher, paths);
    }

    WatchStats stats = paths.getOrDefault(path, WatchStats.NONE);
    WatchStats newStats = stats.addMode(watcherMode);

    if (newStats != stats) {
        paths.put(path, newStats);
        if (watcherMode.isRecursive()) {
            ++recursiveWatchQty;
        }
        return true;
    }

    return false;
}

triggerWatch

public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
    Set<Watcher> watchers = new HashSet<>();
    synchronized (this) {
        // path迭代器,從子節點path向前遍歷
        // 例如/apps/app1/name
        // next = /apps/app1/name, next = /apps/app1, next = /apps ...
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        for (String localPath : pathParentIterator.asIterable()) {
            // 獲取遍歷Watcher集
            Set<Watcher> thisWatchers = watchTable.get(localPath);
            Iterator<Watcher> iterator = thisWatchers.iterator(); 
            while (iterator.hasNext()) {
                Watcher watcher = iterator.next();
                // 獲取watcher對應的WatchStats
                Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
                WatchStats stats = paths.get(localPath); // if stats==null continue
                if (!pathParentIterator.atParentPath()) {
                    watchers.add(watcher); // 加入watchers中
                    WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
                    if (newStats == WatchStats.NONE) { // STANDARD模式下會移除監聽器
                        iterator.remove();
                        paths.remove(localPath);
                    } else if (newStats != stats) {
                        paths.put(localPath, newStats);
                    }
                } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
                    // 遞歸模式下才將父節點加入watchers中
                    watchers.add(watcher);
                }
            }
            if (thisWatchers.isEmpty()) {
                watchTable.remove(localPath);
            }
        }
    }
    // 略

    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }

    // 略

    return new WatcherOrBitSet(watchers);
}

NIOServerCnxn

上面查找到watchers之後會觸發process方法,看一下NIOServerCnxn的方法實現:

public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);

    // 轉型成WatcherEvent才能通過網路傳輸
    WatcherEvent e = event.getWrapper();
    // 把事件推送給客戶端
    int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
    ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 系統選擇 目前市面上主流的桌面操作系統在大多數人眼裡只有Windows和MacOS,那為什麼我沒選擇它們兩呢? 首先,不選MacOS的原因,就是太貴。當然這是我的原因不是蘋果的原因,我最早使用Linux寫代碼的時候是2018年,那時候剛畢業上班不久,根本買不起Mac(雖然現在也覺得有點貴)。 在沒有 ...
  • 在這篇全面解析CDN的技術文章中,我們深入探討了CDN的基礎概念、核心架構、多樣化產品和在不同行業中的應用案例。文章揭示了CDN技術如何優化內容分發,提升用戶體驗,並展望了CDN面臨的挑戰和未來發展趨勢。 關註【TechLeadCloud】,分享互聯網架構、雲服務技術的全維度知識。作者擁有10+年互 ...
  • 寫在前面 在Java日常開發過程中,實現Excel文件的導入導出功能是一項常見的需求。 通過使用相關的Java庫,如Apache POI、EasyPoi或EasyExcel,可以輕鬆地實現Excel文件的讀寫操作。 而這篇文章將介紹如何在Java中使用Apache POI、EasyPoi 和Easy ...
  • 學了分塊,感覺這玩意好難啊,怎麼聽起來這麼簡單?【】【】分塊! 先推薦一個東西:loj 分塊全家桶! 首先,把一整個數組劈成 \(\sqrt n\) 塊是最優的!(當然如果你想寫一個 \(114514\) 塊的分塊也沒問題但他不優啊!) 長這樣: 這樣它的複雜度是: 預處理:\(O(n\sqrt n ...
  • 前言:配合狂神老師的教學視頻使用效果更佳: https://www.bilibili.com/video/BV1NE411Q7Nx/?spm_id_from=333.1007.top_right_bar_window_custom_collection.content.click&vd_source ...
  • 大家好,我是你們的老伙計秀才!今天帶來的是[深入淺出Java多線程]系列的第十篇內容:CAS。大家覺得有用請點贊,喜歡請關註!秀才在此謝過大家了!!! ...
  • 在Word中,表格是一個強大的工具,它可以幫助你更好地組織、呈現和分析信息。本文將介紹如何使用Python在Word中創建表格並填入數據、圖片,以及設置表格樣式等。 Python Word庫: 要使用Python在Word中創建或操作表格,需要先將Spire.Doc for Python這個第三方庫 ...
  • 在 Java 的java.util.concurrent包中,除了提供底層鎖、併發同步等工具類以外,還提供了一組原子操作類,大多以Atomic開頭,他們位於java.util.concurrent.atomic包下。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...