zookeeper源碼(08)請求處理及數據讀寫流程

来源:https://www.cnblogs.com/xugf/p/18020590
-Advertisement-
Play Games

ServerCnxnFactory 用於接收客戶端連接、管理客戶端session、處理客戶端請求。 ServerCnxn抽象類 代表一個客戶端連接對象: 從網路讀寫數據 數據編解碼 將請求轉發給上層組件或者從上層組件接收響應 管理連接狀態,比如:enableRecv、sessionTimeout、s ...


ServerCnxnFactory

用於接收客戶端連接、管理客戶端session、處理客戶端請求。

ServerCnxn抽象類

代表一個客戶端連接對象:

  • 從網路讀寫數據
  • 數據編解碼
  • 將請求轉發給上層組件或者從上層組件接收響應
  • 管理連接狀態,比如:enableRecv、sessionTimeout、stale、invalid等
  • 保存當前的packetsReceived、packetsSent、lastCxid、lastZxid等
  • 繼承了Watcher介面,也可以作為監聽器

兩個實現類:

  • NIOServerCnxn - 基於NIO
  • NettyServerCnxn - 基於Netty

NIOServerCnxnFactory

基於NIO的非阻塞、多線程的ServerCnxnFactory實現類,多線程之間通過queue通信:

  • 1個accept線程,用來接收客戶端連接,交給selector線程處理
  • 1-N個selector線程,每個線程會select 1/N個連接,多個selector線程的原因是,由於有大量連接,select()可能會成為性能瓶頸
  • 0-M個socket IO worker線程,做socket讀寫,如果配置為0則selector線程來做IO
  • 1個清理線程,用於關閉空閑連接

線程數量分配示例:32核的機器,1accept線程,1個清理線程,4個selector線程,64個worker線程。

configure方法

  • 不支持ssl

  • 創建ConnectionExpirerThread線程

  • 根據CPU核數確定各種線程的數量

    int numCores = Runtime.getRuntime().availableProcessors();
    // 32 cores sweet spot seems to be 4 selector threads
    numSelectorThreads = Integer.getInteger(
        ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
        Math.max((int) Math.sqrt((float) numCores / 2), 1));
    
    // 64
    numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
    
  • 創建SelectorThread線程

  • 創建ServerSocketChannel、啟動監聽、設置非阻塞

  • 創建AcceptThread線程

start方法

啟動acceptThread、selectorThreads、workerPool、expirerThread線程。

acceptThread線程

1個accept線程,用來接收客戶端連接,交給selector線程處理:

  1. select查找acceptable的key

  2. doAccept接受連接

    if (key.isAcceptable()) {
        if (!doAccept()) {
            pauseAccept(10);
        }
    }
    
  3. 給sc(SocketChannel)設置非阻塞、驗證遠程IP連接數不超過maxClientCnxns(60)、獲取SelectorThread開始select讀寫事件

    // Round-robin assign this connection to a selector thread
    if (!selectorIterator.hasNext()) {
        selectorIterator = selectorThreads.iterator();
    }
    SelectorThread selectorThread = selectorIterator.next();
    // 使用隊列緩存SocketChannel
    if (!selectorThread.addAcceptedConnection(sc)) {
        throw new IOException("Unable to add connection to selector queue");
    }
    

selectorThread線程

run方法select讀寫事件、接受客戶連接、為key註冊"感興趣"的事件:

  • run方法

    public void run() {
        try {
            while (!stopped) {
                try {
                    select(); // select讀寫事件
                    processAcceptedConnections(); // 接受客戶連接
                    processInterestOpsUpdateRequests();
                } catch (RuntimeException e) {
                } catch (Exception e) {
                }
            }
        }
        // ...
    }
    
  • 接受客戶連接會註冊OP_READ、創建NIOServerCnxn、綁定到key上面

    private void processAcceptedConnections() {
        SocketChannel accepted;
        while (!stopped && (accepted = acceptedQueue.poll()) != null) {
            SelectionKey key = null;
            try {
                key = accepted.register(selector, SelectionKey.OP_READ);
                NIOServerCnxn cnxn = createConnection(accepted, key, this);
                key.attach(cnxn); // 綁定到key上
                addCnxn(cnxn); // 維護連接層會話
            } catch (IOException e) {
                //  略
            }
        }
    }
    
  • select到讀寫事件會交給handleIO方法處理

    private void handleIO(SelectionKey key) {
        IOWorkRequest workRequest = new IOWorkRequest(this, key);
        NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
    
        // Stop selecting this key while processing on its connection
        cnxn.disableSelectable();
        key.interestOps(0); // 重置感興趣的事件,IO處理完成之後會重新註冊讀寫事件
        touchCnxn(cnxn); // 維護連接層會話,刷新過期時間
        workerPool.schedule(workRequest); // workRequest.doWork方法做非同步讀寫
    }
    
  • 為key註冊"感興趣"的事件

    private void processInterestOpsUpdateRequests() {
        SelectionKey key;
        while (!stopped && (key = updateQueue.poll()) != null) {
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            if (cnxn.isSelectable()) {
                key.interestOps(cnxn.getInterestOps());
            }
        }
    }
    

workRequest.doWork方法

workRequest是IOWorkRequest類型對象,doWork會read數據並傳遞給上層組件:

public void doWork() throws InterruptedException {
    // 略

    if (key.isReadable() || key.isWritable()) {
        cnxn.doIO(key); // 在workerPool線程上執行

        // 略
        touchCnxn(cnxn); // 維護連接層會話,刷新過期時間
    }

    // 略
}

數據包使用 len body 方式傳輸,read的過程不介紹了,cnxn在read到完整的數據之後會調用readConnectRequest或readRequest方法將數據傳遞給上層組件:

// 應用層建立連接
private void readConnectRequest() throws IOException, ClientCnxnLimitException {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    // ConnectRequest封裝:
    // protocolVersion(0), lastZxidSeen(0), timeOut(3s), sessionId(0), passwd(16位byte), readOnly(F)
    ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
    zkServer.processConnectRequest(this, request);
    initialized = true;
}

protected void readRequest() throws IOException {
    RequestHeader h = new RequestHeader();
    // 請求頭,封裝客戶端xid和type由客戶端傳遞過來
    ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
    // 轉ByteBufferRequestRecord對象,封裝請求位元組流
    // readRecord將位元組流反序列化為指定的Record實現類對象
    RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
    zkServer.processPacket(this, h, request);
}

NettyServerCnxnFactory

基於Netty的ServerCnxnFactory實現。

CnxnChannelHandler類

核心的網路層處理器,此處記錄重要代碼:

class CnxnChannelHandler extends ChannelDuplexHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final Channel channel = ctx.channel();
        // 略
        // 創建NettyServerCnxn綁定到channel
        NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
        ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);

        // 略
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            try {
                NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                if (cnxn == null) {
                    LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                } else {
                    // 讀取請求數據
                    cnxn.processMessage((ByteBuf) msg);
                }
            } catch (Exception ex) {
                throw ex;
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
}

cnxn讀取請求數據

void processMessage(ByteBuf buf) {
    if (throttled.get()) {
        // 略
    } else {
        if (queuedBuffer != null) {
            appendToQueuedBuffer(buf.retainedDuplicate());
            processQueuedBuffer();
        } else {
            receiveMessage(buf); // 解碼邏輯在此方法中
            // Have to check !closingChannel, because an error in
            // receiveMessage() could have led to close() being called.
            if (!closingChannel && buf.isReadable()) {
                if (queuedBuffer == null) {
                    queuedBuffer = channel.alloc().compositeBuffer();
                }
                appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
            }
        }
    }
}

read到完整的數據之後會將數據傳遞給上層組件:

if (initialized) {
    RequestHeader h = new RequestHeader();
    ByteBufferInputStream.byteBuffer2Record(bb, h);
    RequestRecord request = RequestRecord.fromBytes(bb.slice());
    zks.processPacket(this, h, request);
} else {
    // 應用層建立連接
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
    ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
    zks.processConnectRequest(this, request);
    initialized = true;
}

ZooKeeperServer處理方法

processConnectRequest方法處理連接請求

ZooKeeperServer的processConnectRequest方法用來處理連接請求:

public void processConnectRequest(
        ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {

    long sessionId = request.getSessionId(); // 預設0
    int tokensNeeded = 1;
    // 略

    // ro驗證
    if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client";
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
    }
    // 客戶端zxid驗證
    if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
                     + cnxn.getRemoteSocketAddress()
                     + " as it has seen zxid 0x"
                     + Long.toHexString(request.getLastZxidSeen())
                     + " our last zxid is 0x"
                     + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                     + " client must try another server";
        throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
    }
    int sessionTimeout = request.getTimeOut(); // 客戶端預設30000
    byte[] passwd = request.getPasswd();
    int minSessionTimeout = getMinSessionTimeout(); // 預設tickTime * 2
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout(); // 預設tickTime * 20
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout); // 設置超時時長

    // We don't want to receive any packets until we are sure that the session is setup
    cnxn.disableRecv();

    if (sessionId == 0) {
        // 創建新session
        long id = createSession(cnxn, passwd, sessionTimeout);
    } else {
        validateSession(cnxn, sessionId);
        // 殺掉舊的session和連接
        if (serverCnxnFactory != null) {
            serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        if (secureServerCnxnFactory != null) {
            secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
        }
        // add新session
        cnxn.setSessionId(sessionId);
        // 返回connect響應
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
    }
}

// 重點看一下創建新session
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    if (passwd == null) {
        passwd = new byte[0];
    }
    long sessionId = sessionTracker.createSession(timeout); // 創建Session返回sessionId
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd); // passwd會賦值給request.passwd
    CreateSessionTxn txn = new CreateSessionTxn(timeout);
    cnxn.setSessionId(sessionId);
    // 給業務層處理器提交createSession請求
    // RequestRecord.fromRecord(txn)返回SimpleRequestRecord對象,封裝Record對象
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
    submitRequest(si);
    return sessionId;
}

processPacket方法處理業務請求

ZooKeeperServer的processPacket方法用來處理業務請求:

public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {

    cnxn.incrOutstandingAndCheckThrottle(h);

    if (h.getType() == OpCode.auth) {
        // 略
        return;
    } else if (h.getType() == OpCode.sasl) {
        // 略
    } else {
        if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
            return;
        } else {
            Request si = new Request(
                cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
            int length = request.limit();
            // 大請求驗證
            if (isLargeRequest(length)) { // 預設返回false
                checkRequestSizeWhenMessageReceived(length);
                si.setLargeRequestSize(length);
            }
            si.setOwner(ServerCnxn.me);
            // 提交給業務層處理器
            submitRequest(si);
        }
    }
}

submitRequest流程

  1. 先把request提交給requestThrottler組件
  2. requestThrottler是一個限流(預設不啟用)組件,內部使用隊列緩存request,非同步線程消費隊列,將request提交給業務處理器
  3. 直到submitRequest方法,業務處理才離開workerPool線程
if (request != null) {
    if (request.isStale()) {
        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
    }
    final long elapsedTime = Time.currentElapsedTime() - request.requestThrottleQueueTime;
    ServerMetrics.getMetrics().REQUEST_THROTTLE_QUEUE_TIME.add(elapsedTime);
    // 預設不限流
    if (shouldThrottleOp(request, elapsedTime)) {
      request.setIsThrottled(true);
      ServerMetrics.getMetrics().THROTTLED_OPS.add(1);
    }
    // 提交
    zks.submitRequestNow(request);
}

submitRequestNow方法將請求提交給業務層處理器:

public void submitRequestNow(Request si) {
    // 略
    try {
        touch(si.cnxn); // 刷新session過期時間
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            setLocalSessionFlag(si);
            firstProcessor.processRequest(si); // 提交給業務層處理器
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            // Update request accounting/throttling limits
            requestFinished(si);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        requestFinished(si);
    } catch (RequestProcessorException e) {
        requestFinished(si);
    }
}

Leader客戶端業務層處理器鏈

在之前的文章已經介紹,leader使用LeaderZooKeeperServer作為服務實現類。

本章節介紹"leader處理客戶端請求"的流程。

處理器鏈

// 構建處理器鏈
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor =
        new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
    commitProcessor = new CommitProcessor(
        toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
    commitProcessor.start();
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
    proposalProcessor.initialize();
    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
    prepRequestProcessor.start();
    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

    setupContainerManager();
}
  • FinalRequestProcessor - 處理與請求相關的事務,並提供查詢服務,給客戶端發響應,位於RequestProcessor鏈末尾

  • ToBeAppliedRequestProcessor - 維護toBeApplied列表,之後必須是FinalRequestProcessor且processRequest必須同步處理

  • CommitProcessor - 等待commit完成之後調用下游RequestProcessor處理器

  • ProposalRequestProcessor - 發起proposal並將Request轉發給內部的SyncRequestProcessor和AckRequestProcessor

    public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        // 內部有維護一個SyncRequestProcessor和AckRequestProcessor
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    
        forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
                FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
    }
    
  • PrepRequestProcessor - 通常位於RequestProcessor鏈開頭,為更新請求關聯的事務做設置

  • LeaderRequestProcessor - 負責執行本地會話升級,只有直接提交給leader的Request才能通過這個處理器

LeaderRequestProcessor

public void processRequest(Request request) throws RequestProcessorException {
    // 略

    // 預設不支持localSession
    Request upgradeRequest = null;
    try {
        upgradeRequest = lzks.checkUpgradeSession(request);
    } catch (KeeperException ke) {
        // 略
    } catch (IOException ie) {
        // 略
    }
    // 此處upgradeRequest==null
    if (upgradeRequest != null) {
        nextProcessor.processRequest(upgradeRequest);
    }
    // 調用下游processor
    nextProcessor.processRequest(request);
}

PrepRequestProcessor

事務設置:

  • 使用隊列緩存request
  • 消費線程從隊列拉request設置事務

run方法

public void run() {
    try {
        while (true) {
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
            Request request = submittedRequests.take();
            ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                .add(Time.currentElapsedTime() - request.prepQueueStartTime);
            // 略
            if (Request.requestOfDeath == request) {
                break;
            }

            request.prepStartTime = Time.currentElapsedTime();
            pRequest(request);
        }
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
}

protected void pRequest(Request request) throws RequestProcessorException {
    request.setHdr(null);
    request.setTxn(null);

    if (!request.isThrottled()) {
      pRequestHelper(request);
    }

    request.zxid = zks.getZxid(); // zxid
    long timeFinishedPrepare = Time.currentElapsedTime();
    ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
    nextProcessor.processRequest(request); // 調用下游processor
    ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}

pRequestHelper方法

private void pRequestHelper(Request request) {
    try {
        switch (request.type) {
        case OpCode.createContainer:
        case OpCode.create:
        case OpCode.create2:
            // 創建節點請求封裝path、data、acl、flag
            CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
            // zks.getNextZxid()獲取遞增zxid
            pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
            break;
        case OpCode.createTTL:
            // 創建ttl請求封裝path、data、acl、flag、ttl
            CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
            break;
        case OpCode.deleteContainer:
            // 封裝path
            DeleteContainerRequest deleteContainerRequest =
                request.readRequestRecord(DeleteContainerRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
            break;
        case OpCode.delete:
            // 刪除節點請求封裝path、version
            DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
            break;
        case OpCode.setData:
            // 設置節點數據請求封裝path、data、version
            SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
            break;
        case OpCode.reconfig:
            // reconfig請求封裝joiningServers、leavingServers、newMembers、curConfigId
            ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
            break;
        case OpCode.setACL:
            // 設置acl請求封裝path、acl、version
            SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
            break;
        case OpCode.check:
            // check請求封裝path、version
            CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
            pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
            break;
        case OpCode.multi:
            // 遍歷 逐個pRequest2Txn
            // pRequest2Txn(op.getType(), zxid, request, subrequest)
            // 封裝MultiTxn
            break;

        // create/close session don't require request record
        case OpCode.createSession:
        case OpCode.closeSession:
            if (!request.isLocalSession()) { // 非本地會話
                pRequest2Txn(request.type, zks.getNextZxid(), request, null);
            }
            break;

        // All the rest don't need to create a Txn - just verify session
        case OpCode.sync:
        // sync,exists,getData,getACL,getChildren,getAllChildrenNumber,getChildren2,ping
        // setWatches,setWatches2,checkWatches,removeWatches,getEphemerals,multiRead,addWatch
        case OpCode.whoAmI:
            zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            break;
        default:
            break;
        }
    } catch (KeeperException e) {
        // 略
    } catch (Exception e) {
        // 略
    }
}

pRequest2Txn方法流程

代碼量大,僅對重點的業務類型做簡單分析。

該方法首先會為request設置TxnHeader信息:

if (request.getHdr() == null) {
    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
}

TxnHeader封裝事務請求頭:

public class TxnHeader implements Record {
  private long clientId; // 會話ID
  private int cxid; // 客戶端xid
  private long zxid; // 服務端xid
  private long time;
  private int type; // 操作類型
}

pRequest2Txn - create相關操作

create/create2/createTTL/createContainer操作:

  1. 從flags創建createMode並驗證ttl和ephemeral

  2. 驗證acl

    zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
    
  3. 生成順序節點path

    int parentCVersion = parentRecord.stat.getCversion();
    if (createMode.isSequential()) {
        // 形如/users/admin0000000001
        path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
    }
    validatePath(path, request.sessionId);
    
  4. request.setTxn

    int newCversion = parentRecord.stat.getCversion() + 1;
    if (type == OpCode.createContainer) {
        request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
    } else if (type == OpCode.createTTL) {
        request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
    } else {
        request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
    }
    
  5. 獲取ephemeralOwner

    TxnHeader hdr = request.getHdr();
    long ephemeralOwner = 0;
    if (createMode.isContainer()) {
        ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
    } else if (createMode.isTTL()) {
        ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
    } else if (createMode.isEphemeral()) {
        ephemeralOwner = request.sessionId;
    }
    
  6. addChangeRecord

    StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount++;
    parentRecord.stat.setCversion(newCversion);
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord); // 維護outstandingChanges集
    ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
    nodeRecord.data = data;
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord); // 維護outstandingChanges集
    

pRequest2Txn - delete操作

  1. 驗證acl和version

    checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
    
    // stat.version與delete.version需要一致
    
  2. 驗證沒有子節點,有子節點無法刪除

  3. 創建DeleteTxn

    request.setTxn(new DeleteTxn(path));
    
  4. addChangeRecord

    parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
    parentRecord.childCount--;
    parentRecord.stat.setPzxid(request.getHdr().getZxid());
    parentRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
    addChangeRecord(parentRecord); // 維護outstandingChanges集
    
    nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
    nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord); // 維護outstandingChanges集
    

pRequest2Txn - setData操作

  1. 驗證acl、獲取newVersion

  2. 創建SetDataTxn

    request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
    
  3. addChangeRecord

    nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
    nodeRecord.stat.setVersion(newVersion);
    nodeRecord.stat.setMtime(request.getHdr().getTime());
    nodeRecord.stat.setMzxid(zxid);
    nodeRecord.data = setDataRequest.getData();
    nodeRecord.precalculatedDigest = precalculateDigest(
            DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
    setTxnDigest(request, nodeRecord.precalculatedDigest);
    addChangeRecord(nodeRecord);
    

pRequest2Txn - setACL操作

  1. 驗證acl、獲取newVersion

  2. 創建SetACLTxn

    request.setTxn(new SetACLTxn(path, listACL, newVersion));
    
  3. addChangeRecord

pRequest2Txn - createSession操作

CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());

pRequest2Txn - closeSession操作

long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges) {
    // 獲取所有臨時節點
    Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
    for (ChangeRecord c : zks.outstandingChanges) {
        if (c.stat == null) {
            // Doing a delete
            es.remove(c.path);
        } else if (c.stat.getEphemeralOwner() == request.sessionId) {
            es.add(c.path);
        }
    }
    for (String path2Delete : es) {
        if (digestEnabled) {
            parentPath = getParentPathAndValidate(path2Delete);
            parentRecord = getRecordForPath(parentPath);
            parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
            parentRecord.stat.setPzxid(request.getHdr().getZxid());
            parentRecord.precalculatedDigest = precalculateDigest(
                    DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
            addChangeRecord(parentRecord);
        }
        nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
        nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path2Delete);
        addChangeRecord(nodeRecord);
    }
    if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
        request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
    }
    zks.sessionTracker.setSessionClosing(request.sessionId);
}

ProposalRequestProcessor

processRequest方法

public void processRequest(Request request) throws RequestProcessorException {
    if (request instanceof LearnerSyncRequest) { // 處理sync命令,後續補充sync命令分析
        zks.getLeader().processSync((LearnerSyncRequest) request);
    } else {
        if (shouldForwardToNextProcessor(request)) {
            nextProcessor.processRequest(request); // 調用下游processor(CommitProcessor)
        }
        if (request.getHdr() != null) { // 事務消息需要發proposal、寫磁碟
            // We need to sync and get consensus on any transactions
            try {
                zks.getLeader().propose(request); // 給follower發proposal
            } catch (XidRolloverException e) {
                throw new RequestProcessorException(e.getMessage(), e);
            }
            // 該對象的nextProcessor是AckRequestProcessor
            syncProcessor.processRequest(request);
        }
    }
}

發proposal

發起一個proposal併發給所有成員:

public Proposal propose(Request request) throws XidRolloverException {
    // zxid的低32位滿了,強制重新選舉,生成新一輪epoch和zxid
    if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
        String msg =
            "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
        shutdown(msg);
        throw new XidRolloverException(msg);
    }

    // 序列化
    byte[] data = request.getSerializeData();
    proposalStats.setLastBufferSize(data.length);
    // 封裝數據包
    QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

    // 封裝Proposal對象
    Proposal p = new Proposal();
    p.packet = pp;
    p.request = request;

    synchronized (this) {
        p.addQuorumVerifier(self.getQuorumVerifier());

        if (request.getHdr().getType() == OpCode.reconfig) {
            // 此處會把lastSeenQuorumVerifier寫入zoo.cfg.dynamic.next文件
            self.setLastSeenQuorumVerifier(request.qv, true);
        }

        if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
            p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        lastProposed = p.packet.getZxid();
        // 緩存到outstandingProposals中,processAck時會根據quorum狀態確定是否提交
        outstandingProposals.put(lastProposed, p);
        // 給follower發數據
        sendPacket(pp);
    }
    ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
    return p;
}

syncProcessor.processRequest方法

processRequest方法將request放入queuedRequests隊列,非同步線程消費做業務處理:

  1. 從queuedRequests拉request

  2. 寫txnlog

    zks.getZKDatabase().append(si);
    
  3. 滾動txnlog文件、生成snapshot文件

    // 預設當logCount超過了50000或logSize超過2GB時觸發
    if (shouldSnapshot()) {
        resetSnapshotStats();
        // 滾動txnlog文件
        zks.getZKDatabase().rollLog();
        // 生成snapshot文件
        if (!snapThreadMutex.tryAcquire()) {
            LOG.warn("Too busy to snap, skipping");
        } else {
            // 非同步線程生成snapshot文件
            new ZooKeeperThread("Snapshot Thread") {
                public void run() {
                    try {
                        zks.takeSnapshot();
                    } catch (Exception e) {
                    } finally {
                        snapThreadMutex.release();
                    }
                }
            }.start();
        }
    }
    
  4. 之後會把request傳遞給nextProcessor(AckRequestProcessor對象)

AckRequestProcessor

public void processRequest(Request request) {
    QuorumPeer self = leader.self;
    if (self != null) {
        request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
        leader.processAck(self.getMyId(), request.zxid, null);
    }
}

processAck

Keep a count of acks that are received by the leader for a particular proposal.

public synchronized void processAck(long sid, long zxid, SocketAddress followerAddr) {

    // 略

    if ((zxid & 0xffffffffL) == 0) {
        // We no longer process NEWLEADER ack with this method. However,
        // the learner sends an ack back to the leader after it gets
        // UPTODATE, so we just ignore the message.
        return;
    }

    if (outstandingProposals.size() == 0) {
        return;
    }
    // 說明zxid的數據已經提交
    if (lastCommitted >= zxid) {
        // The proposal has already been committed
        return;
    }
    Proposal p = outstandingProposals.get(zxid);
    if (p == null) {
        return;
    }

    // 略

    p.addAck(sid); // 添加ack

    boolean hasCommitted = tryToCommit(p, zxid, followerAddr);

    // reconfig類型命令的特殊處理,略
}

tryToCommit方法會判斷quorum狀態,即超過半數ack,如果到了quorum狀態:

  1. 從outstandingProposals集移除

  2. 加入到toBeApplied集

  3. 給follower發COMMIT

    public void commit(long zxid) {
        synchronized (this) {
            lastCommitted = zxid;
        }
        QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
        sendPacket(qp); // 發給follower
    }
    
  4. 提交

    zk.commitProcessor.commit(p.request);
    
    // 會進入commitProcessor的committedRequests隊列
    

CommitProcessor

processRequest方法

本地寫磁碟之後即調用此方法:

  1. 把request提交到queuedRequests隊列
  2. 寫請求提交到queuedWriteRequests隊列
public void processRequest(Request request) {
    request.commitProcQueueStartTime = Time.currentElapsedTime();
    queuedRequests.add(request); // 所有請求
    // If the request will block, add it to the queue of blocking requests
    if (needCommit(request)) { // 寫請求
        queuedWriteRequests.add(request);
        numWriteQueuedRequests.incrementAndGet();
    } else {
        numReadQueuedRequests.incrementAndGet();
    }
    wakeup();
}

commit方法

follower對proposal到了quorum狀態後,會使用這個方法提交事務,然後會將事務寫到ZKDatabase中。

public void commit(Request request) {
    request.commitRecvTime = Time.currentElapsedTime();
    ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
    committedRequests.add(request); // 進committedRequests隊列
    wakeup();
}

run方法

對比queuedRequests、queuedWriteRequests、committedRequests這幾個隊列,將提交成功的請求或讀請求轉發給下游的ToBeAppliedRequestProcessor處理器。

ToBeAppliedRequestProcessor

維護toBeApplied列表:清理已提交成功的request數據。

FinalRequestProcessor

處理與請求相關的事務,並提供查詢服務,給客戶端發響應,位於RequestProcessor鏈末尾。

  1. 執行事務
  2. 區分OpCode執行對應邏輯,發迴響應
  3. 使用cnxn把響應返回給客戶端

執行事務

if (!request.isThrottled()) {
  rc = applyRequest(request);
}

// ProcessTxnResult rc = zks.processTxn(request);

createSession操作

zks.finishSessionInit(request.cnxn, true);

closeSession操作

給客戶端發closeConn數據包:

cnxn.sendCloseSession();

create相關操作

create、create2、createTTL、createContainer操作,創建對應的Response對象。

delete相關操作

略。

setData操作

返回SetDataResponse響應。

setACL操作

返回SetACLResponse響應。

getData操作

private Record handleGetDataRequest(
        Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
    GetDataRequest getDataRequest = (GetDataRequest) request;
    String path = getDataRequest.getPath();
    DataNode n = zks.getZKDatabase().getNode(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    // 檢查許可權
    zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
    Stat stat = new Stat();
    // 查詢數據、addWatcher
    byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
    return new GetDataResponse(b, stat);
}

setWatches相關操作

setWatches、setWatches2操作:

SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
        setWatches.getDataWatches(),
        setWatches.getExistWatches(),
        setWatches.getChildWatches(),
        setWatches.getPersistentWatches(),
        setWatches.getPersistentRecursiveWatches(),
        cnxn);

addWatch操作

AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());

removeWatches操作

RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);

getChildren相關操作

getChildren、getChildren2操作:

GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);

zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n),
             ZooDefs.Perms.READ, request.authInfo, path, null);
List<String> children = zks.getZKDatabase()
                           .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);

Follower處理Leader數據

處理器鏈

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();

    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
    syncProcessor.start();
}

處理器鏈:

FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

commitProcessor和syncProcessor處理leader的proposal和commit請求。

processPacket方法

在"zookeeper源碼(08)leader、follower和observer"中已經介紹,Follower使用processPacket方法處理來自leader的數據包:

protected void processPacket(QuorumPacket qp) throws Exception {
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL: // 提案
        ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
        TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
        TxnHeader hdr = logEntry.getHeader();
        Record txn = logEntry.getTxn();
        TxnDigest digest = logEntry.getDigest();
        // 略
        lastQueued = hdr.getZxid();

        // 略

        // 記錄log數據
        // 使用syncProcessor持久化log數據,之後給leader發ack
        fzk.logRequest(hdr, txn, digest);
        // 略
        if (om != null) {
            // 略
        }
        break;
    case Leader.COMMIT: // 提交
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        fzk.commit(qp.getZxid()); // 使用commitProcessor提交log
        if (om != null) {
            // 略
        }
        break;
    case Leader.COMMITANDACTIVATE: // Similar to COMMIT, only for a reconfig operation
        // get the new configuration from the request
        Request request = fzk.pendingTxns.element();
        SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));

        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();
        final long zxid = qp.getZxid();
        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid, true);
        // commit (writes the new config to ZK tree (/zookeeper/config)
        fzk.commit(zxid);

        // 略
        break;
    case Leader.UPTODATE:
        // leader告知follower已處於最新狀態,可以開始響應客戶端
        // 正常情況下不應該再出現該類型請求
        break;
    case Leader.REVALIDATE:
        if (om == null || !om.revalidateLearnerSession(qp)) {
            revalidate(qp);
        }
        break;
    case Leader.SYNC:
        fzk.sync(); // sync命令
        break;
    default:
        LOG.warn("Unknown packet type");
        break;
    }
}

處理PROPOSAL

syncProcessor.processRequest方法

processRequest方法將request放入queuedRequests隊列,非同步線程消費做業務處理。

在本地持久化之後,調用下游處理器(SendAckRequestProcessor對象)。

SendAckRequestProcessor

public void processRequest(Request si) {
    if (si.type != OpCode.sync) {
        // 確認zxid已持久化
        QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
        try {
            si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
            learner.writePacket(qp, false);
        } catch (IOException e) {
            // learner.sock.close();
        }
    }
}

public void flush() throws IOException {
    try {
        learner.writePacket(null, true);
    } catch (IOException e) {
        // learner.sock.close();
    }
}

處理COMMIT

提交給commitProcessor處理器,該處理器會繼續向下游(FinalRequestProcessor)傳遞。

FinalRequestProcessor

上文已經介紹,此處省略。

Observer處理Leader數據

處理器鏈

protected void setupRequestProcessors() {
    // We might consider changing the processor behaviour of
    // Observers to, for example, remove the disk sync requirements.
    // Currently, they behave almost exactly the same as followers.
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(
        finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
    ((ObserverRequestProcessor) firstProcessor).start();

    if (syncRequestProcessorEnabled) {
        syncProcessor = new SyncRequestProcessor(this, null);
        syncProcessor.start();
    }
}

processPacket方法

protected void processPacket(QuorumPacket qp) throws Exception {
    TxnLogEntry logEntry;
    TxnHeader hdr;
    TxnDigest digest;
    Record txn;
    switch (qp.getType()) {
    case Leader.PING:
        ping(qp);
        break;
    case Leader.PROPOSAL:
        LOG.warn("Ignoring proposal");
        break;
    case Leader.COMMIT:
        LOG.warn("Ignoring commit");
        break;
    case Leader.UPTODATE:
        LOG.error("Received an UPTODATE message after Observer started");
        break;
    case Leader.REVALIDATE:
        revalidate(qp);
        break;
    case Leader.SYNC:
        ((ObserverZooKeeperServer) zk).sync();
        break;
    case Leader.INFORM:
        ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        logEntry = SerializeUtils.deserializeTxn(qp.getData());
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
        request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
        request.setTxnDigest(digest);
        ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
        obs.commitRequest(request);
        break;
    case Leader.INFORMANDACTIVATE: // 處理reconfig請求
        // get new designated leader from (current) leader's message
        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
        long suggestedLeaderId = buffer.getLong();

        byte[] remainingdata = new byte[buffer.remaining()];
        buffer.get(remainingdata);
        logEntry = SerializeUtils.deserializeTxn(remainingdata);
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();
        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));

        request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
        request.setTxnDigest(digest);
        obs = (ObserverZooKeeperServer) zk;

        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);

        obs.commitRequest(request);

        if (majorChange) {
            throw new Exception("changes proposed in reconfig");
        }
        break;
    default:
        LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
        break;
    }
}

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 策略模式定義了一系列演算法,並將每個演算法封裝起來,使它們可以互相替換,且演算法的變換不會影響使用演算法的客戶。 在項目開發中,我們經常要根據不同的場景,採取不同的措施,也就是不同的策略。假設我們需要對a、b這兩個整數進行計算,根據條件的不同,需要執行不同的計算方式。我們可以把所有的操作都封裝在同一個 ...
  • 在編程的世界里,我們經常需要對數據進行迴圈處理,常用的兩種方法就是:for迴圈和foreach迴圈。想象你站在一條裝滿寶貝的傳送帶前,你要親手檢查每一件寶貝。使用for迴圈就像是你親手控制傳送帶的速度和方向,而使用foreach迴圈則是傳送帶自動運轉,你只需專註於寶貝本身。 ...
  • Java 方法重載 方法重載 允許在同一個類中定義多個具有相同名稱的方法,但 參數列表 必須不同。 語法: returnType methodName(parameter1, parameter2, ..., parameterN) { // 方法體 } 示例: public class Main ...
  • TLDR 修飾變數的時候,可以把 constexpr 對象當作加強版的 const 對象:const 對象表明值不會改變,但不一定能夠在編譯期取得結果;constexpr 對象不僅值不會改變,而且保證能夠在編譯期取得結果。如果一個 const 變數能夠在編譯期求值,將其改為 constexpr 能夠 ...
  • 老周一般很少玩游戲,在某寶上買了一堆散件,計劃在過年期間自己做個機械臂耍耍。頭腦中划過一道紫藍色的閃電,想起用游戲手柄來控制機械臂。機械臂是由樹莓派(大草莓)負責控制,然後客戶端通過 Socket UDP 來發送信號。優先考慮在 PC 和手機上測試,就順便折騰一下 XInput API。當然,讀取手 ...
  • 有這樣一個帶有搜索功能的用戶界面需求: 搜索流程如下所示: 這個需求涉及兩個實體: “評分(Rating)、用戶名(Username)”數據與User實體相關 “創建日期(create date)、觀看次數(number of views)、標題(title)、正文(body)”與Story實體相關 ...
  • 常見內置數值類型 數值類型是不可變類型(immutable type),它包括布爾類型、整數、浮點數與複數。 類型 英文名 構造方式 對應關鍵字 構造函數 布爾 Boolean var = True bool bool() 整數 Integer var = 5 int int() 浮點數 Float ...
  • Python 常見內置數據類型 在Python中,常用的類型是這些: Python 中查看數據類型的函數(function)為type()。 >>>text = "Is test a string type object?" >>>print(type(text)) <class 'str'> Py ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...