Python:Twistedの短いソースコード
簡単な関数の実行
from twisted.internet import reactor
def printMsg():
print 'test message'
reactor.stop()
reactor.callLater(0, printMsg)
reactor.run()
TCPクライアント.接続だけ
class MyProtocol(protocol.Protocol):
made = 0
closed = 0
def connectionMade(self):
self.made = 1
print self.transport.getPeer().host
self.transport.loseConnection()
def connectionLost(self, reason):
self.closed = 1
print 'connection lost'
class TCPClient(protocol.ClientFactory):
def buildProtocol(self, addr):
self.protocol = MyProtocol()
return self.protocol
client = TCPClient()
reactor.connectTCP('www.accense.com', 80, client)
while not client.protocol or not client.protocol.closed:
reactor.iterate()
TCPクライアント
from twisted.internet import reactor, protocol
class MyProtocol(protocol.Protocol):
made = 0
closed = 0
def connectionMade(self):
self.made = 1
print self.transport.getPeer().host
self.transport.write('GET / HTTP/1.0\r\n\r\n')
def connectionLost(self, reason):
self.closed = 1
def dataReceived(self, data):
print data
self.transport.loseConnection()
class TCPClient(protocol.ClientFactory):
def buildProtocol(self, addr):
self.protocol = MyProtocol()
return self.protocol
client = TCPClient()
reactor.connectTCP('127.0.0.1', 12345, client)
while not client.protocol or not client.protocol.closed:
reactor.iterate()
TCPサーバ
from twisted.internet import reactor, protocol
class MyProtocol(protocol.Protocol):
def connectionLost(self, reason):
pass
def dataReceived(self, data):
print data
self.transport.write('test message\n')
self.transport.loseConnection()
class TCPServer(protocol.ServerFactory):
def buildProtocol(self, addr):
return MyProtocol()
reactor.listenTCP(12345, TCPServer())
reactor.run()
webページの参照
from twisted.internet import reactor
from twisted.web import client
def printPage(data):
print data
reactor.stop()
def printError(failure):
print 'Error: %s' % failure.getErrorMessage()
reactor.stop()
d = client.getPage('http://www.accense.com/index.html')
d.addCallback(printPage)
d.addErrback(printError)
reactor.run()