wiki:Twistedの短いソースコード集

Python:Twistedの短いソースコード

簡単な関数の実行

from twisted.internet import reactor

def printMsg():
  print 'test message'
  reactor.stop()

reactor.callLater(0, printMsg)
reactor.run()

TCPクライアント.接続だけ

class MyProtocol(protocol.Protocol):
  made = 0
  closed = 0

  def connectionMade(self):
    self.made = 1
    print self.transport.getPeer().host
    self.transport.loseConnection()

  def connectionLost(self, reason):
    self.closed = 1
    print 'connection lost'

class TCPClient(protocol.ClientFactory):
  def buildProtocol(self, addr):
    self.protocol = MyProtocol()
    return self.protocol

client = TCPClient()
reactor.connectTCP('www.accense.com', 80, client)
while not client.protocol or not client.protocol.closed:
  reactor.iterate()

TCPクライアント

from twisted.internet import reactor, protocol

class MyProtocol(protocol.Protocol):
  made = 0
  closed = 0

  def connectionMade(self):
    self.made = 1
    print self.transport.getPeer().host
    self.transport.write('GET / HTTP/1.0\r\n\r\n')

  def connectionLost(self, reason):
    self.closed = 1

  def dataReceived(self, data):
    print data
    self.transport.loseConnection()

class TCPClient(protocol.ClientFactory):
  def buildProtocol(self, addr):
    self.protocol = MyProtocol()
    return self.protocol

client = TCPClient()
reactor.connectTCP('127.0.0.1', 12345, client)
while not client.protocol or not client.protocol.closed:
  reactor.iterate()

TCPサーバ

from twisted.internet import reactor, protocol

class MyProtocol(protocol.Protocol):
  def connectionLost(self, reason):
    pass

  def dataReceived(self, data):
    print data
    self.transport.write('test message\n')
    self.transport.loseConnection()

class TCPServer(protocol.ServerFactory):
  def buildProtocol(self, addr):
    return MyProtocol()

reactor.listenTCP(12345, TCPServer())
reactor.run()

webページの参照

from twisted.internet import reactor
from twisted.web import client

def printPage(data):
  print data
  reactor.stop()

def printError(failure):
  print 'Error: %s' % failure.getErrorMessage()
  reactor.stop()

d = client.getPage('http://www.accense.com/index.html')
#d = client.getPage('http://www.accense.com/index2.html')
d.addCallback(printPage)
d.addErrback(printError)
reactor.run()