tornado和subprocess如何实现程序的非堵塞异步处理

tornado     是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。

源码组织:

|---__init__.py

---auth.py

---......

---epoll.c

---ioloop.py

---iostream.py

---...

tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。

ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理

iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.

本栏目更多精彩内容:http://www.bianceng.cnhttp://www.bianceng.cn/Servers/zs/

这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。

下面是个例子,有tornado基础的朋友,一看就懂的~

import subprocess
import tornado.ioloop
import time
import fcntl
import functools
import os
class GenericSubprocess (object):
    def __init__ ( self, timeout=-1, **popen_args ):
        self.args = dict()
        self.args["stdout"] = subprocess.PIPE
        self.args["stderr"] = subprocess.PIPE
        self.args["close_fds"] = True
        self.args.update(popen_args)
        self.ioloop = None
        self.expiration = None
        self.pipe = None
        self.timeout = timeout
        self.streams = []
        self.has_timed_out = False
    def start(self):
        """Spawn the task.
        Throws RuntimeError if the task was already started."""
        if not self.pipe is None:
            raise RuntimeError("Cannot start task twice")
        self.ioloop = tornado.ioloop.IOLoop.instance()
        if self.timeout > 0:
            self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
        self.pipe = subprocess.Popen(**self.args)
        self.streams = [ (self.pipe.stdout.fileno(), []),
                             (self.pipe.stderr.fileno(), []) ]
        for fd, d in self.streams:
            flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
            fcntl.fcntl( fd, fcntl.F_SETFL, flags)
            self.ioloop.add_handler( fd,
                                     self.stat,
                                     self.ioloop.READ|self.ioloop.ERROR)
    def on_timeout(self):
        self.has_timed_out = True
        self.cancel()
    def cancel (self ) :
        """Cancel task execution
        Sends SIGKILL to the child process."""
        try:
            self.pipe.kill()
        except:
            pass
    def stat( self, *args ):
        '''Check process completion and consume pending I/O data'''
        self.pipe.poll()
        if not self.pipe.returncode is None:
            '''cleanup handlers and timeouts'''
            if not self.expiration is None:
                self.ioloop.remove_timeout(self.expiration)
            for fd, dest in  self.streams:
                self.ioloop.remove_handler(fd)
            '''schedulle callback (first try to read all pending data)'''
            self.ioloop.add_callback(self.on_finish)
        for fd, dest in  self.streams:
            while True:
                try:
                    data = os.read(fd, 4096)
                    if len(data) == 0:
                        break
                    dest.extend([data])
                except:
                    break
    @property
    def stdout(self):
        return self.get_output(0)
    @property
    def stderr(self):
        return self.get_output(1)
    @property
    def status(self):
        return self.pipe.returncode
    def get_output(self, index ):
        return "".join(self.streams[index][1])
    def on_finish(self):
        raise NotImplemented()
class Subprocess (GenericSubprocess):
    def __init__ ( self, callback, *args, **kwargs):
        self.callback = callback
        self.done_callback = False
        GenericSubprocess.__init__(self, *args, **kwargs)
    def on_finish(self):
        if not self.done_callback:
            self.done_callback = True
            '''prevent calling callback twice'''
            self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if __name__ == "__main__":
    ioloop = tornado.ioloop.IOLoop.instance()
    def print_timeout( status, stdout, stderr, has_timed_out) :
        assert(status!=0)
        assert(has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_ok( status, stdout, stderr, has_timed_out) :
        assert(status==0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def print_error( status, stdout, stderr, has_timed_out):
        assert(status!=0)
        assert(not has_timed_out)
        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)
    def stop_test():
        ioloop.stop()
    t1 = Subprocess( print_timeout, timeout=3, args=[ "sleep","5"] )
    t2 = Subprocess( print_ok, timeout=3, args=[ "ip", "a" ] )
    t3 = Subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas", "1" ] )
    t4 = Subprocess( print_error, timeout=3, args=[ "cat", "/etc/sdfsdfsdfsdfsdfsdfsdf" ] )
    t1.start()
    t2.start()
    try:
        t3.start()
        assert(false)
    except:
        print "OK"
    t4.start()
    ioloop.add_timeout(time.time() + 10, stop_test)
    ioloop.start()

本文出自 “峰云,就她了。” 博客,请务必保留此出处http://rfyiamcool.blog.51cto.com/1030776/1236330

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索timeout
, iostream
, time out
, timed out
, tornado
, status
, stdout
, timeout expired
, tornado异步数据库
, Read timed out
, Self
, subprocess
, repr
stderr
tornado 异步实现、tornado 异步、tornado mysql 异步、tornado 异步请求、tornado 异步非阻塞,以便于您获取更多的相关知识。

时间: 2024-11-05 18:24:14

tornado和subprocess如何实现程序的非堵塞异步处理的相关文章

美国一半程序员非科班出身,他们咋做到的

美国的程序员交流网站Stack Overflow在去年的开发者调查中想要了解,码农们是否都有大学的计算机专业学位.有26086名程序员参与了这项调查,其中有近一半受访者的答案是,没有学位. 大量程序员非科班出身的原因有很多方面.一方面,美国大学的学费正日益高涨.私立四年制大学一年学费平均接近4.5万美元,其中还没有包括生活费.另一方面,网上课程和培训班吸引了很多人.有志于成为程序员的人士可以在做中学,而不必"浪费"几年时间去完成大学学业. 关于非科班出身的程序员,有几点建议可供参考:

c++编程-当程序输入非数字就跳出循环

问题描述 当程序输入非数字就跳出循环 题:当输入如1+3=的算数题时 程序一直进行 当输入非数字就跳出循环 请问怎么写这个程序 主要还是怎么输入非数字就跳出循环???? 解决方案 能用C语言来写吗 如果是的话 我想我可以写出来 mark 解决方案二: 可以通过if语句来判断,如果你要做简易的计算器,不用这么做的,你可以只输入加数和被加数就OK了,为什么 要每次都自己输入"+"和"="号呢? 解决方案三: C++中有自带的方法isdigit(c),用这个判断 解决方案

进度条-非阻塞异步处理。敬请高人指点,不胜感激。

问题描述 非阻塞异步处理.敬请高人指点,不胜感激. <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%>/c:set <!DOCTY

advisory lock 实现高并发非堵塞式 业务锁

标签 PostgreSQL , advisory lock , 锁 背景 某些业务会利用数据库来作为一种可靠的锁,例如任务调度系统,或者其他需要可靠的锁机制的系统. 通常他们可能会使用数据库的一条记录来实现锁的SLOT和状态信息. 例如 create table lock_test ( tid int primary key, -- 任务ID state int default 1, -- 任务状态,1表示初始状态,-1表示正在处理, 0表示处理结束 retry int default -1,

Android程序开发ListView+Json+异步网络图片加载+滚动翻页的例子(图片能缓存,图片不错乱)_Android

例子中用于解析Json的Gson请自己Google下载 主Activity: package COM.Example.Main; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import COM.Example.Main.R; import COM.Example.Main.stringG

Apache Artemis —— 非堵塞 Java 嵌入消息服务

Apache ActiveMQ Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器.其核心只依赖一个 netty.jar 文件.该项目的目的是为你的 Java 应用提供一个可嵌入的消息服务. 架构图: 特性: 支持 AMQP 协议 OpenWire 支持 5 个 ActiveMQ 客户端 STOMP 协议支持 HornetQ Core 协议支持 HornetQ 2.4,2.5 客户端 JMS 2.0 和 1.1 支持 通过共享存储和基于复制的非共享存储实现的高可用性

LINUX C 父进程建立多个子进程循环非堵塞回收列子

下面 代码主要用于复习,留于此 点击(此处)折叠或打开 /*************************************************************************   > File Name: fork5.c   > Author: gaopeng QQ:22389860 all right reserved   > Mail: gaopp_200217@163.com   > Created Time: Sun 02 Jul 2017 0

C#取得Web程序和非Web程序的根目录的N种取法总结_C#教程

非Web程序 1.AppDomain.CurrentDomain.BaseDirectory 2.Environment.CurrentDirectory 3.HttpRuntime.BinDirectory The path to the current application's/bin directory. Web程序 HttpCurrent.Context.Server.Mappath();

一个可以让.net程序在非WIN平台上运行的软件Mono_实用技巧

唉,最近只知道埋头写程序,消息实在是不灵通啊,今天偶然发一个开源项目,Mono,一个旨在让.net程序在linux,unix,mac os,solaris等平台的软件,爽啊,期待.... What is Mono?Mono provides the necessary software to develop and run .NET client and server applications on Linux, Solaris, Mac OS X, Windows, and Unix. Spo