python基于mysql实现的简单队列以及跨进程锁实例详解_python

通常在我们进行多进程应用开发的过程中,不可避免的会遇到多个进程访问同一个资源(临界资源)的状况,这时候必须通过加一个全局性的锁,来实现资源的同步访问(即:同一时间里只能有一个进程访问资源)。

举个例子如下:

假设我们用mysql来实现一个任务队列,实现的过程如下:

1. 在Mysql中创建Job表,用于储存队列任务,如下:

create table jobs(
  id auto_increment not null primary key,
  message text not null,
  job_status not null default 0
);

message 用来存储任务信息,job_status用来标识任务状态,假设只有两种状态,0:在队列中, 1:已出队列 
 
2. 有一个生产者进程,往job表中放新的数据,进行排队:

insert into jobs(message) values('msg1');

3.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

4. 如果没有跨进程的锁,两个消费者进程有可能同时取到重复的消息,导致一个消息被消费多次。这种情况是我们不希望看到的,于是,我们需要实现一个跨进程的锁。

=========================分割线=======================================

说到跨进程的锁实现,我们主要有几种实现方式:

(1)信号量
(2)文件锁fcntl
(3)socket(端口号绑定)
(4)signal
这几种方式各有利弊,总体来说前2种方式可能多一点,这里我就不详细说了,大家可以去查阅资料。
 
查资料的时候发现mysql中有锁的实现,适用于对于性能要求不是很高的应用场景,大并发的分布式访问可能会有瓶颈.
 
对此用python实现了一个demo,如下:
 
文件名:glock.py

#!/usr/bin/env python2.7
#
# -*- coding:utf-8 -*-
#
#  Desc  :
#
import logging, time
import MySQLdb
class Glock:
  def __init__(self, db):
    self.db = db
  def _execute(self, sql):
    cursor = self.db.cursor()
    try:
      ret = None
      cursor.execute(sql)
      if cursor.rowcount != 1:
        logging.error("Multiple rows returned in mysql lock function.")
        ret = None
      else:
        ret = cursor.fetchone()
      cursor.close()
      return ret
    except Exception, ex:
      logging.error("Execute sql \"%s\" failed! Exception: %s", sql, str(ex))
      cursor.close()
      return None
  def lock(self, lockstr, timeout):
    sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout)
    ret = self._execute(sql) 

    if ret[0] == 0:
      logging.debug("Another client has previously locked '%s'.", lockstr)
      return False
    elif ret[0] == 1:
      logging.debug("The lock '%s' was obtained successfully.", lockstr)
      return True
    else:
      logging.error("Error occurred!")
      return None
  def unlock(self, lockstr):
    sql = "SELECT RELEASE_LOCK('%s')" % (lockstr)
    ret = self._execute(sql)
    if ret[0] == 0:
      logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr)
      return False
    elif ret[0] == 1:
      logging.debug("The lock '%s' the lock was released.", lockstr)
      return True
    else:
      logging.error("The lock '%s' did not exist.", lockstr)
      return None
#Init logging
def init_logging():
  sh = logging.StreamHandler()
  logger = logging.getLogger()
  logger.setLevel(logging.DEBUG)
  formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
  sh.setFormatter(formatter)
  logger.addHandler(sh)
  logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel()))
def main():
  init_logging()
  db = MySQLdb.connect(host='localhost', user='root', passwd='')
  lock_name = 'queue' 

  l = Glock(db) 

  ret = l.lock(lock_name, 10)
  if ret != True:
    logging.error("Can't get lock! exit!")
    quit()
  time.sleep(10)
  logging.info("You can do some synchronization work across processes!")
  ##TODO
  ## you can do something in here ##
  l.unlock(lock_name)
if __name__ == "__main__":
  main() 

在main函数里:

l.lock(lock_name, 10) 中,10是表示timeout的时间是10秒,如果10秒还获取不了锁,就返回,执行后面的操作。
 
在这个demo中,在标记TODO的地方,可以将消费者从job表中取消息的逻辑放在这里。即分割线以上的.

2.假设有多个消费者进程,从job表中取排队信息,要做的操作如下:

select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id为刚刚取得的记录id

这样,就能保证多个进程访问临界资源时同步进行了,保证数据的一致性。
 
测试的时候,启动两个glock.py, 结果如下:

[@tj-10-47 test]# ./glock.py
2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. 

可以看到第一个glock.py是 17:08:50解锁的,下面的glock.py是在17:08:50获取锁的,可以证实这样是完全可行的。

[@tj-10-47 test]# ./glock.py
2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
[@tj-10-47 test]#

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索python
, mysql
, 队列
跨进程锁
进程间通信 消息队列、java mq消息队列详解、java 消息队列实例、ucos消息队列实例、python 多进程 队列,以便于您获取更多的相关知识。

时间: 2024-07-28 17:22:59

python基于mysql实现的简单队列以及跨进程锁实例详解_python的相关文章

python 排序算法总结及实例详解_python

总结了一下常见集中排序的算法 归并排序 归并排序也称合并排序,是分治法的典型应用.分治思想是将每个问题分解成个个小问题,将每个小问题解决,然后合并. 具体的归并排序就是,将一组无序数按n/2递归分解成只有一个元素的子项,一个元素就是已经排好序的了.然后将这些有序的子元素进行合并. 合并的过程就是 对 两个已经排好序的子序列,先选取两个子序列中最小的元素进行比较,选取两个元素中最小的那个子序列并将其从子序列中 去掉添加到最终的结果集中,直到两个子序列归并完成. 代码如下: #!/usr/bin/p

Python中的闭包实例详解_python

一般来说闭包这个概念在很多语言中都有涉及,本文主要谈谈python中的闭包定义及相关用法.Python中使用闭包主要是在进行函数式开发时使用.详情分析如下: 一.定义 python中的闭包从表现形式上定义(解释)为:如果在一个内部函数里,对在外部作用域(但不是在全局作用域)的变量进行引用,那么内部函数就被认为是闭包(closure).这个定义是相对直白的,好理解的,不像其他定义那样学究味道十足(那些学究味道重的解释,在对一个名词的解释过程中又充满了一堆让人抓狂的其他陌生名词,不适合初学者).下面

python的迭代器与生成器实例详解_python

本文以实例详解了python的迭代器与生成器,具体如下所示: 1. 迭代器概述:  迭代器是访问集合元素的一种方式.迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束.迭代器只能往前不会后退,不过这也没什么,因为人们很少在迭代途中往后退.  1.1 使用迭代器的优点  对于原生支持随机访问的数据结构(如tuple.list),迭代器和经典for循环的索引访问相比并无优势,反而丢失了索引值(可以使用内建函数enumerate()找回这个索引值).但对于无法随机访问的数据结构(比如se

利用Python破解验证码实例详解_python

一.前言 本实验将通过一个简单的例子来讲解破解验证码的原理,将学习和实践以下知识点:       Python基本知识       PIL模块的使用 二.实例详解 安装 pillow(PIL)库: $ sudo apt-get update $ sudo apt-get install python-dev $ sudo apt-get install libtiff5-dev libjpeg8-dev zlib1g-dev \ libfreetype6-dev liblcms2-dev lib

Python文件操作类操作实例详解_python

本文讲述了Python文件操作类的操作实例,详细代码如下: #!/usr/bin/env python #!/usr/bin/env python #coding:utf-8 # Purpose: 文件操作类 #声明一个字符串文本 poem=''' Programming is fun测试 When the work is done if you wanna make your work also fun: use Python! ''' #创建一个file类的实例,模式可以为:只读模式('r'

python time模块用法实例详解_python

本文详细讲述了python的内嵌time模块的用法.分享给大家供大家参考之用.具体分析如下:   一.简介 time模块提供各种操作时间的函数 说明:一般有两种表示时间的方式: 第一种是时间戳的方式(相对于1970.1.1 00:00:00以秒计算的偏移量),时间戳是惟一的 第二种以数组的形式表示即(struct_time),共有九个元素,分别表示,同一个时间戳的struct_time会因为时区不同而不同 year (four digits, e.g. 1998) month (1-12) da

python 根据正则表达式提取指定的内容实例详解_python

python 根据正则表达式提取指定的内容 正则表达式是极其强大的,利用正则表达式来提取想要的内容是很方便的事.   下面演示了在python里,通过正则表达式来提取符合要求的内容. 实例代码: import re # 正则表达式是极其强大的,利用正则表达式来提取想要的内容是很方便的事. # 下面演示了在python里,通过正则表达式来提取符合要求的内容.有几个要注意 # 的地方就是: # [1] 要用()将需要的内容包含起来 # [2] 编号为0的group是整个符合正则表达式的内容,编号为1

python验证码识别的实例详解_python

其实关于验证码识别涉及很多方面的内容,入手难度大,但是入手后,可拓展性又非常广泛,可玩性极强,成就感也很足,对这感兴趣的朋友们下面跟着小编一起来学习学习吧. 依赖 sudo apt-get install python-imaging sudo apt-get install tesseract-ocr pip install pytesseract 利用google ocr来识别验证码 from PIL import Image import pytesseract image = Image

Python基础之函数用法实例详解_python

本文以实例形式较为详细的讲述了Python函数的用法,对于初学Python的朋友有不错的借鉴价值.分享给大家供大家参考之用.具体分析如下: 通常来说,Python的函数是由一个新的语句编写,即def,def是可执行的语句--函数并不存在,直到Python运行了def后才存在. 函数是通过赋值传递的,参数通过赋值传递给函数 def语句将创建一个函数对象并将其赋值给一个变量名,def语句的一般格式如下: def <name>(arg1,arg2,arg3,--,argN): <stateme