An Introduction to Asynchronous Programming and Twisted (3)

Part 11: Your Poetry is Served

A Twisted Poetry Server

Now that we’ve learned so much about writing clients with Twisted, let’s turn around and re-implement our poetry server with Twisted too. And thanks to the generality of Twisted’s abstractions, it turns out we’ve already learned almost everything we need to know.

class PoetryProtocol(Protocol):

    def connectionMade(self):
        self.transport.write(self.factory.poem)
        self.transport.loseConnection()

class PoetryFactory(ServerFactory):

    protocol = PoetryProtocol
    def __init__(self, poem):
        self.poem = poem

def main():
    options, poetry_file = parse_args()
    poem = open(poetry_file).read()
    factory = PoetryFactory(poem)

    from twisted.internet import reactor
    port = reactor.listenTCP(options.port or 0, factory,
                             interface=options.iface)

    reactor.run()

可见server和client基本原理上是一致的, reactor loop侦听事件, 事件到达时使用protocol去处理, factory用于管理protocol, 继承自ServerFactory.

 

Part 12: A Poetry Transformation Server

这节中实现一个复杂些的server, 根据client发送不同的请求, 将poem做不同的转换并发回client, 这就需要一个协议使得client和server可以正常沟通.

Twisted includes support for several protocols we could use to solve this problem, including XML-RPCPerspective Broker, and AMP.

但是为了是我们的例子足够简单以至于容易理解, 我们使用自己的一个简单的协议,

<transform-name>.<text of the poem>

当server接收到从客户端发出的这样的request后, 根据transform-name将text of the poem进行相应的transform, 并发送回client.

class TransformProtocol(NetstringReceiver):

    def stringReceived(self, request):
        if '.' not in request: # bad request
            self.transport.loseConnection()
            return

        xform_name, poem = request.split('.', 1)

        self.xformRequestReceived(xform_name, poem)

    def xformRequestReceived(self, xform_name, poem):
        new_poem = self.factory.transform(xform_name, poem)

        if new_poem is not None:
            self.sendString(new_poem)

        self.transport.loseConnection()

class TransformFactory(ServerFactory):

    protocol = TransformProtocol

    def __init__(self, service):
        self.service = service

    def transform(self, xform_name, poem):
        thunk = getattr(self, 'xform_%s' % (xform_name,), None)

        if thunk is None: # no such transform
            return None

        try:
            return thunk(poem)
        except:
            return None # transform failed

    def xform_cummingsify(self, poem):
        return self.service.cummingsify(poem)

class TransformService(object):

    def cummingsify(self, poem):
        return poem.lower()

def main():
    service = TransformService()
    factory = PoetryFactory(service)

    from twisted.internet import reactor
    port = reactor.listenTCP(options.port or 0, factory,
                             interface=options.iface)
    reactor.run()

来看看这段代码,

首先, TransformProtocol继承自NetstringReceiver而非Protocol, NetstringReceiver是一种专门用来处理string的协议, 这儿可以使用和继承Twisted开发框架提供的各种协议来简化代码, 而不用每次从头开发, 这就是使用框架的好处.

在TransformProtocol中对于poem具体的transform逻辑上, 调用self.factory.transform, 把变数扔给factory, 而保持protocol的高度抽象, transform逻辑变化,添减, 都保持protocol不需要有任何改动.

其次, 在TransformFactory中, 使用python强大的getattr来避免使用大量的if…else.

但这儿只提供了cummingsify service, 如果要增加或删除service, TransformFactory和TransformService难免需要修改...

这段代码已经写的不错...不过缺少些Twisted的感觉...如果加上deferred的callback机制, 应该可以写出更highlevel的代码.

 

Part 13: Deferred All The Way Down

Introduction

Recall poetry client 5.1 from Part 10.The client used a Deferred to manage a callback chain that included a call to a poetry transformation engine. In client 5.1, the engine was implemented as a synchronous function call implemented in the client itself.

Client5.1中异步去获取poem, 然后调用callback函数cummingsify做transform, 现在我们在Part12中实现了TransformService, 即poem transform也要用异步的方式让服务器去完成.

这其实是个比较自然的想法, 由于reactor的特性, 任何callback都必须是unblock的, 但实际上, 很多callback处理是需要花费较长的时间的, 这个时候在callback内也必须异步处理, 来保证callback本身的unblock, 即callback本身也无法直接返回结果, 而只能返回deferred对象.

如下图, 当碰到这种inner deferred时,

The outer deferred needs to wait until the inner deferred is fired. Of course, the outer deferred can’t block either, so instead the outer deferred suspends the execution of the callback chain and returns control to the reactor

And how does the outer deferred know when to resume? Simple — by adding a callback/errback pair to the inner deferred. Thus, when the inner deferred is fired the outer deferred will resume executing its chain. If the inner deferred succeeds (i.e., it calls the callback added by the outer deferred), then the outer deferred calls its N+1 callback with the result. And if the inner deferred fails (calls the errback added by the outer deferred), the outer deferred calls the N+1errback with the failure.

 

 

下面这段代码给出了怎么样封装inner deferred来提供异步callback,

class TransformClientProtocol(NetstringReceiver):

    def connectionMade(self):
        self.sendRequest(self.factory.xform_name, self.factory.poem)

    def sendRequest(self, xform_name, poem):
        self.sendString(xform_name + '.' + poem)

    def stringReceived(self, s):
        self.transport.loseConnection()
        self.poemReceived(s)

    def poemReceived(self, poem):
        self.factory.handlePoem(poem)

class TransformClientFactory(ClientFactory):

    protocol = TransformClientProtocol

    def __init__(self, xform_name, poem):
        self.xform_name = xform_name
        self.poem = poem
        self.deferred = defer.Deferred()

    def handlePoem(self, poem):
        d, self.deferred = self.deferred, None
        d.callback(poem)

    def clientConnectionLost(self, _, reason):
        if self.deferred is not None:
            d, self.deferred = self.deferred, None
            d.errback(reason)

    clientConnectionFailed = clientConnectionLost

class TransformProxy(object):
    """
    I proxy requests to a transformation service.
    """

    def __init__(self, host, port):
        self.host = host
        self.port = port

    def xform(self, xform_name, poem):
        factory = TransformClientFactory(xform_name, poem)
        from twisted.internet import reactor
        reactor.connectTCP(self.host, self.port, factory)
        return factory.deferred

def cummingsify(poem):
    d = proxy.xform('cummingsify', poem)

    def fail(err):
        print >>sys.stderr, 'Cummingsify failed!'
        return poem

    return d.addErrback(fail)

最后这个函数就是封装好的异步callback, 大家可以和之前part10的callback对比一下...

def cummingsify(poem):
    print 'First callback, cummingsify'
    poem = engine.cummingsify(poem)
    return poem

def cummingsify_failed(err):
    if err.check(GibberishError):
        print 'Second errback, cummingsify_failed, use original poem'
        return err.value.args[0] #return original poem
    return err

再来看一下part10中的callback顺序图, 此时cummingsify为异步callback, cummingsify_failed被加到inner deferred中, 当这个inner deferred被fired时, outer deferred会根据inner deferred情况去调用, got_poem或poem_failed. 其中具体过程似乎是透明的...或者说我也不清楚

作者在这儿也没有讲清, 个人认为这儿如果能参照Part10给个完整的代码例子, 会更清晰一些...

Part 14: When a Deferred Isn’t

We’ll make a caching proxy server. When a client connects to the proxy, the proxy will either fetch the poem from the external server or return a cached copy of a previously retrieved poem. 
这儿可见, 如果是直接从cache返回的话可以直接同步处理, 如需要去external server获取的话就需要异步处理.

这样的有时需要同步, 有时需要异步的情况, 怎么办?

如下代码中, get_poem可能返回的是poem, 也有可能是deferred对象, 对于调用者怎么处理...

class ProxyService(object):

    poem = None # the cached poem

    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_poem(self):
        if self.poem is not None:
            print 'Using cached poem.'
            return self.poem

        print 'Fetching poem from server.'
        factory = PoetryClientFactory()
        factory.deferred.addCallback(self.set_poem)
        from twisted.internet import reactor
        reactor.connectTCP(self.host, self.port, factory)
        return factory.deferred

    def set_poem(self, poem):
        self.poem = poem
        return poem

class PoetryProxyProtocol(Protocol):

    def connectionMade(self):
        d = maybeDeferred(self.factory.service.get_poem)
        d.addCallback(self.transport.write)
        d.addBoth(lambda r: self.transport.loseConnection())

class PoetryProxyFactory(ServerFactory):

    protocol = PoetryProxyProtocol

    def __init__(self, service):
        self.service = service

使用maybeDeferred来解决这个问题, 这个函数会把poem也封装成一个already-fired deferred

    • If the function returns a deferred, maybeDeferred returns that same deferred, or
    • If the function returns a Failure, maybeDeferred returns a new deferred that has been fired (via .errback) with that Failure, or
    • If the function returns a regular value, maybeDeferred returns a deferred that has already been fired with that value as the result, or
    • If the function raises an exception, maybeDeferred returns a deferred that has already been fired (via .errback()) with that exception wrapped in a Failure.

An already-fired deferred may fire the new callback (or errback, depending on the state of the deferred) immediately, i.e., right when you add it.

或者使用succeed函数, The defer.succeed function is just a handy way to make an already-fired deferred given a result.

 

def get_poem(self):
    if self.poem is not None:
        print 'Using cached poem.'
        # return an already-fired deferred
        return succeed(self.poem)

    print 'Fetching poem from server.'
    factory = PoetryClientFactory()
    factory.deferred.addCallback(self.set_poem)
    from twisted.internet import reactor
    reactor.connectTCP(self.host, self.port, factory)
    return factory.deferred

本文章摘自博客园,原文发布日期:2011-09-15
时间: 2024-09-08 14:38:08

An Introduction to Asynchronous Programming and Twisted (3)的相关文章

An Introduction to Asynchronous Programming and Twisted (2)

Part 6: And Then We Took It Higher Part5中的client2.0, 在封装性上已经做的不错, 用户只需要了解和修改PoetryProtocol, PoetryClientFactory就可以完成一个应用. 其实此处, protocol的逻辑就是接受数据, 接受完以后通知factory处理, 这段逻辑已经可以作为common的框架代码, 用户无需改动. 真正需要用户每次根据上下文修改的是, 当数据接受完后的处理逻辑poem_finished(print? sa

An Introduction to Asynchronous Programming and Twisted (1)

之前看的时候, 总觉得思路不是很清晰, 其实Dave在这个模型问题上没有说清楚, 参考同步和异步, 阻塞和非阻塞, Reactor和Proactor 对于阻塞一定是同步的, 但是反之不一定, 对于多线程本质上也是阻塞的方式, 只不过是多个线程一起阻塞, 适用于CPU密集型的任务, 因为事情总要人做的, 无论什么模型都不能让做事情的实际时间变少.  对于非阻塞, 节省的是等待的时间, 所以适用于I/O密集型任务, 因为I/O往往需要等待  Dave谈的异步就是广义的异步, 其实是非阻塞同步  而T

An Introduction to Interactive Programming in Python (Part 1) -- Week 2_1 练习

# Practice Exercises for Functions # Solve each of the practice exercises below. # 1.Write a Python function miles_to_feet that takes a parameter miles and # returns the number of feet in miles miles. def miles_to_feet(miles): feet = miles * 5280 ret

An Introduction to Interactive Programming in Python (Part 1) -- Week 2_3 练习

Mini-project description - Rock-paper-scissors-lizard-Spock Rock-paper-scissors is a hand game that is played by two people. The players count to three in unison and simultaneously "throw" one of three hand signals that correspond to rock, paper

An Introduction to Interactive Programming in Python (Part 1) -- Week 2_2 练习

#Practice Exercises for Logic and Conditionals # Solve each of the practice exercises below. # 1.Write a Python function is_even that takes as input the parameter number (an integer) and # returns True if number is even and False if number is odd. #

网页前端设计资源:前端设计资源收藏夹

文章简介:今天在邮件中收到一个由 Dimi Navrotskyy在Github上发布的前端收藏夹,里面的资源太丰富了.我在想很多同学肯定喜欢.本想直接发个链接与大家分享,但时间久了,找起来辛苦.特意copy了一份发在w3cplus上. 今天在邮件中收到一个由 Dimi Navrotskyy在Github上发布的前端收藏夹,里面的资源太丰富了.我在想很多同学肯定喜欢.本想直接发个链接与大家分享,但时间久了,找起来辛苦.特意copy了一份发在w3cplus上.而且我在后面还增加了一份我自己整理的学习

iOS 各版本中的新特性(What&amp;#39;s New in iOS)- 目录翻译完成

iOS 各版本中的新特性(What's New in iOS) 太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的美丽人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. 介绍 Introduction文档组织结构 Organization of Thi

学习 NodeJS 第六天:主程 Ry 访谈录

这是一段 Oleg Podsechin 与 NodeJS 主程序员 Ryan Dahl 之间的访谈录.虽然不是 NodeJS 的教程,但是从访谈之中可以看出关于 NodeJS 的一些情况,帮助我们理解 NodeJS 之所以存在的前因后果.                                                                                                                                   

A simple IOCP Server/Client Class

  Download demo project v1.13 - 64.4 Kb Download source v1.13 - 121 Kb 1.1 Requirements This article expects the reader to be familiar with C++, TCP/IP, socket programming, MFC, and multithreading. The source code uses Winsock 2.0 and IOCP technology