twisted網路框架的三個基礎模塊:Protocol, ProtocolFactory, Transport.這三個模塊是構成twisted伺服器端與客戶端程式的基本。Protocol:Protocol對象實現協議內容,即通信的內容協議ProtocolFactory: 是工廠模式的體現,在這裡面生 ...
twisted網路框架的三個基礎模塊:Protocol, ProtocolFactory, Transport.這三個模塊是構成twisted伺服器端與客戶端程式的基本。
Protocol:Protocol對象實現協議內容,即通信的內容協議
ProtocolFactory: 是工廠模式的體現,在這裡面生成協議
Transport: 是用來收發數據,伺服器端與客戶端的數據收發與處理都是基於這個模塊
在windows中安裝twisted需要先安裝pywin32,自己去下載下就行。隨後pip install twisted就會幫我們安裝twisted以及zope。
我們結合一張圖,以及一段程式來理解下twisted的基礎實現:
然後我們首先看看伺服器端程式:
# coding=utf-8 from twisted.internet.protocol import Protocol from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.internet import reactor clients = [] class Spreader(Protocol): def __init__(self, factory): self.factory = factory def connectionMade(self): self.factory.numProtocols = self.factory.numProtocols + 1 self.transport.write( "歡迎來到Spread Site, 你是第%s個客戶端用戶!\n" % (self.factory.numProtocols) ) print "new connect: %d" % (self.factory.numProtocols) clients.append(self) def connectionLost(self, reason): self.factory.numProtocols = self.factory.numProtocols - 1 clients.remove(self) print "lost connect: %d" % (self.factory.numProtocols) def dataReceived(self, data): if data == "close": self.transport.loseConnection() for client in clients: if client != self: client.transport.write(data) else: print data class SpreadFactory(Factory): def __init__(self): self.numProtocols = 0 def buildProtocol(self, addr): return Spreader(self) endpoint = TCP4ServerEndpoint(reactor, 8007) endpoint.listen(SpreadFactory()) reactor.run()
創建一個TCP的IPv4版本的終結點,隨後就開始監聽listen, 在這裡我們傳入協議工廠對象作為參數, 先看看我們自定義的工廠類SpreadFactory, 它派生自Factory, 我們看看這個類的源碼(此時你需要有道詞典了:) ):
@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext) @_oldStyle class Factory: """ This is a factory which produces protocols. By default, buildProtocol will create a protocol of the class given in self.protocol. """ # put a subclass of Protocol here: protocol = None numPorts = 0 noisy = True @classmethod def forProtocol(cls, protocol, *args, **kwargs): """ Create a factory for the given protocol. It sets the C{protocol} attribute and returns the constructed factory instance. @param protocol: A L{Protocol} subclass @param args: Positional arguments for the factory. @param kwargs: Keyword arguments for the factory. @return: A L{Factory} instance wired up to C{protocol}. """ factory = cls(*args, **kwargs) factory.protocol = protocol return factory def logPrefix(self): """ Describe this factory for log messages. """ return self.__class__.__name__ def doStart(self): """Make sure startFactory is called. Users should not call this function themselves! """ if not self.numPorts: if self.noisy: _loggerFor(self).info("Starting factory {factory!r}", factory=self) self.startFactory() self.numPorts = self.numPorts + 1 def doStop(self): """Make sure stopFactory is called. Users should not call this function themselves! """ if self.numPorts == 0: # this shouldn't happen, but does sometimes and this is better # than blowing up in assert as we did previously. return self.numPorts = self.numPorts - 1 if not self.numPorts: if self.noisy: _loggerFor(self).info("Stopping factory {factory!r}", factory=self) self.stopFactory() def startFactory(self): """This will be called before I begin listening on a Port or Connector. It will only be called once, even if the factory is connected to multiple ports. This can be used to perform 'unserialization' tasks that are best put off until things are actually running, such as connecting to a database, opening files, etcetera. """ def stopFactory(self): """This will be called before I stop listening on all Ports/Connectors. This can be overridden to perform 'shutdown' tasks such as disconnecting database connections, closing files, etc. It will be called, for example, before an application shuts down, if it was connected to a port. User code should not call this function directly. """ def buildProtocol(self, addr): """ Create an instance of a subclass of Protocol. The returned instance will handle input on an incoming server connection, and an attribute "factory" pointing to the creating factory. Alternatively, L{None} may be returned to immediately close the new connection. Override this method to alter how Protocol instances get created. @param addr: an object implementing L{twisted.internet.interfaces.IAddress} """ p = self.protocol() p.factory = self return p
在這裡很重要的一個函數就是buildProtocol, 此函數就是在工廠模式中創建協議的.我們是基於基類Factory來實現這個函數的, 下麵我們看看派生自Protocol的協議類Spread,Spread的__Init__參數中,我們給它傳入的是自定義的SpreadFactory, 然後我們看下基類Protocol的源代碼
@implementer(interfaces.IProtocol, interfaces.ILoggingContext) class Protocol(BaseProtocol): """ This is the base class for streaming connection-oriented protocols. If you are going to write a new connection-oriented protocol for Twisted, start here. Any protocol implementation, either client or server, should be a subclass of this class. The API is quite simple. Implement L{dataReceived} to handle both event-based and synchronous input; output can be sent through the 'transport' attribute, which is to be an instance that implements L{twisted.internet.interfaces.ITransport}. Override C{connectionLost} to be notified when the connection ends. Some subclasses exist already to help you write common types of protocols: see the L{twisted.protocols.basic} module for a few of them. """ def logPrefix(self): """ Return a prefix matching the class name, to identify log messages related to this protocol instance. """ return self.__class__.__name__ def dataReceived(self, data): """Called whenever data is received. Use this method to translate to a higher-level message. Usually, some callback will be made upon the receipt of each complete protocol message. @param data: a string of indeterminate length. Please keep in mind that you will probably need to buffer some data, as partial (or multiple) protocol messages may be received! I recommend that unit tests for protocols call through to this method with differing chunk sizes, down to one byte at a time. """ def connectionLost(self, reason=connectionDone): """Called when the connection is shut down. Clear any circular references here, and any external references to this Protocol. The connection has been closed. @type reason: L{twisted.python.failure.Failure} """
而Protocol又是派生自BaseProtocol的,繼續看這個類的源代碼:
@_oldStyle class BaseProtocol: """ This is the abstract superclass of all protocols. Some methods have helpful default implementations here so that they can easily be shared, but otherwise the direct subclasses of this class are more interesting, L{Protocol} and L{ProcessProtocol}. """ connected = 0 transport = None def makeConnection(self, transport): """Make a connection to a transport and a server. This sets the 'transport' attribute of this Protocol, and calls the connectionMade() callback. """ self.connected = 1 self.transport = transport self.connectionMade() def connectionMade(self): """Called when a connection is made. This may be considered the initializer of the protocol, because it is called when the connection is completed. For clients, this is called once the connection to the server has been established; for servers, this is called after an accept() call stops blocking and a socket has been received. If you need to send any greeting or initial message, do it here. """ connectionDone=failure.Failure(error.ConnectionDone()) connectionDone.cleanFailure()
可以看到,我們自定義的Spread不過是實現了基類的函數。接下來我們滾一邊實現邏輯:
首先,我們定義一個列表clients,以便存儲多個客戶端的連接。當伺服器端接收到了客戶端的連接後,調用connectionMade函數,同時,我們給使用Transport客戶端發送消息, 通知客戶端我們已收到連接。當客戶端連接失去的時候,我們調用ConnectionLost, 同時移除列表中的客戶端連接, dataReceived函數來接受數據,當客戶端傳來"close"命令時,我們就主動關閉連接, 否則,我們就把data輸出來。
看看客戶端的代碼:
# coding=utf-8 from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import reactor import threading import time import sys import datetime class Echo(Protocol): def __init__(self): self.connected = False def connectionMade(self): self.connected = True def connectionLost(self, reason): self.connected = False def dataReceived(self, data): print data.decode("utf-8") class EchoClientFactory(ClientFactory): def __init__(self): self.protocol = None def startedConnecting(self, connector): print "Start to Connect..." def buildProtocol(self, addr): print "Connected..." self.protocol = Echo() return self.protocol def clientConnectionLost(self, connector, reason): print "Lost connection. Reason: ", reason def clientConnectionFailed(self, connector, reason): print "Connection is failed, Reason: ", reason bStop = False def routine(factory): while not bStop: if factory.protocol and factory.protocol.connected: factory.protocol.transport.write("hello, I'm %s %s" % ( sys.argv[0], datetime.datetime.now() )) print sys.argv[0], datetime.datetime.now() time.sleep(5) host = '127.0.0.1' port = 8007 factory = EchoClientFactory() reactor.connectTCP(host, port, factory) threading.Thread(target=routine, args=(factory,)).start() reactor.run() bStop = True
一開始我們建立TCP連接, 傳入主機地址, 埠, 協議工廠對象作為參數,隨後reactor.run掛起運行。
下麵我們看看ClientFactory基類,因為我們自定義的協議工廠EchoClientFactory派生自它。源碼:
class ClientFactory(Factory): """A Protocol factory for clients. This can be used together with the various connectXXX methods in reactors. """ def startedConnecting(self, connector): """Called when a connection has been started. You can call connector.stopConnecting() to stop the connection attempt. @param connector: a Connector object. """ def clientConnectionFailed(self, connector, reason): """Called when a connection has failed to connect. It may be useful to call connector.connect() - this will reconnect. @type reason: L{twisted.python.failure.Failure} """ def clientConnectionLost(self, connector, reason): """Called when an established connection is lost. It may be useful to call connector.connect() - this will reconnect. @type reason: L{twisted.python.failure.Failure} """
同樣的,我們自定義的EchoClientFactory不過就是實現了基類中沒有實現的函數,其中最重要的還是buildProtocol, 它為我們生成一個協議,下麵看下我們自定義的協議類Echo, 基類源代碼與上面的是一樣的.
客戶端的協議函數與伺服器端的協議函數是一樣的,在這裡就不多說。
客戶端的twisted模塊講完了,隨後我們創建了一個線程去和伺服器端通信, 並且定時發送, 當然,在這裡我們為了以防萬一,需要判斷是否已經與伺服器取得了連接,隨後才發送消息.
大概講了下基礎部分,所有的代碼都是來自《python高效開發實戰》里的代碼,在這裡也向大家推薦這本書,學習twisted還有兩個不錯的教程,在最後我會發百度網盤共用。之所以寫這篇基礎的,就是為了能夠理解高效開發實戰里的最後一個項目: 用Twisted開發跨平臺物聯網消息網關。因為第一次實習就接觸到了物聯網通信,在工作時,滾了一遍項目的源代碼(java寫的,但我畢竟也是學了C#, .net兩年的人, 看懂項目源碼沒壓力, mvc orm都是與.net中的EF, MVC差不多, 不過就是語法有點不同),正好和書上的這個項目差不多,書上將伺服器與客戶端的通信協議指令都講得很清楚。因此這是一本不容錯過的好書, 也是學習, 精通twisted的最好途徑
最後就是運行測試:
伺服器端:
客戶端:
twisted教程: http://pan.baidu.com/s/1dEBPGhN