wiki:Twistedの短いソースコード集

Version 5 (modified by nakiki, 13 years ago) (diff)

--

簡単な関数の実行

from twisted.internet import reactor

def printMsg():
  print 'test message'
  reactor.stop()

reactor.callLater(0, printMsg)
reactor.run()

TCPクライアント.接続だけ

class MyProtocol(protocol.Protocol):
  made = 0
  closed = 0

  def connectionMade(self):
    self.made = 1
    print self.transport.getPeer().host
    self.transport.loseConnection()

  def connectionLost(self, reason):
    self.closed = 1
    print 'connection lost'

class TCPClient(protocol.ClientFactory):
  def buildProtocol(self, addr):
    self.protocol = MyProtocol()
    return self.protocol

client = TCPClient()
reactor.connectTCP('www.accense.com', 80, client)
while not client.protocol or not client.protocol.closed:
  reactor.iterate()

TCPクライアント

from twisted.internet import reactor, protocol

class MyProtocol(protocol.Protocol):
  made = 0
  closed = 0

  def connectionMade(self):
    self.made = 1
    print self.transport.getPeer().host
    self.transport.write('GET / HTTP/1.0\r\n\r\n')

  def connectionLost(self, reason):
    self.closed = 1

  def dataReceived(self, data):
    print data
    self.transport.loseConnection()

class TCPClient(protocol.ClientFactory):
  def buildProtocol(self, addr):
    self.protocol = MyProtocol()
    return self.protocol

client = TCPClient()
reactor.connectTCP('127.0.0.1', 12345, client)
while not client.protocol or not client.protocol.closed:
  reactor.iterate()

TCPサーバ

from twisted.internet import reactor, protocol

class MyProtocol(protocol.Protocol):
  def connectionLost(self, reason):
    pass

  def dataReceived(self, data):
    print data
    self.transport.write('test message\n')
    self.transport.loseConnection()

class TCPServer(protocol.ServerFactory):
  def buildProtocol(self, addr):
    return MyProtocol()

reactor.listenTCP(12345, TCPServer())
reactor.run()