Python与Redis的连接教程_python

今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
 
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:
 

 r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
 r.xxxx()

有了ConnectionPool这个类之后,可以使用如下方法
 

pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx)
r = redis.Redis(connection_pool=pool)

这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
 

class StrictRedis(object):
........
  def __init__(self, host='localhost', port=6379,
         db=0, password=None, socket_timeout=None,
         socket_connect_timeout=None,
         socket_keepalive=None, socket_keepalive_options=None,
         connection_pool=None, unix_socket_path=None,
         encoding='utf-8', encoding_errors='strict',
         charset=None, errors=None,
         decode_responses=False, retry_on_timeout=False,
         ssl=False, ssl_keyfile=None, ssl_certfile=None,
         ssl_cert_reqs=None, ssl_ca_certs=None):
     if not connection_pool:
       ..........
       connection_pool = ConnectionPool(**kwargs)
     self.connection_pool = connection_pool

在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:

 

  # COMMAND EXECUTION AND PROTOCOL PARSING
  def execute_command(self, *args, **options):
    "Execute a command and return a parsed response"
    pool = self.connection_pool
    command_name = args[0]
    connection = pool.get_connection(command_name, **options) #调用ConnectionPool.get_connection方法获取一个连接
    try:
      connection.send_command(*args) #命令执行,这里为Connection.send_command
      return self.parse_response(connection, command_name, **options)
    except (ConnectionError, TimeoutError) as e:
      connection.disconnect()
      if not connection.retry_on_timeout and isinstance(e, TimeoutError):
        raise
      connection.send_command(*args)
      return self.parse_response(connection, command_name, **options)
    finally:
      pool.release(connection) #调用ConnectionPool.release释放连接

在来看看ConnectionPool类:

class ConnectionPool(object):
    ...........
  def __init__(self, connection_class=Connection, max_connections=None,
         **connection_kwargs):  #类初始化时调用构造函数
    max_connections = max_connections or 2 ** 31
    if not isinstance(max_connections, (int, long)) or max_connections < 0: #判断输入的max_connections是否合法
      raise ValueError('"max_connections" must be a positive integer')
    self.connection_class = connection_class #设置对应的参数
    self.connection_kwargs = connection_kwargs
    self.max_connections = max_connections
    self.reset() #初始化ConnectionPool 时的reset操作
  def reset(self):
    self.pid = os.getpid()
    self._created_connections = 0 #已经创建的连接的计数器
    self._available_connections = []  #声明一个空的数组,用来存放可用的连接
    self._in_use_connections = set() #声明一个空的集合,用来存放已经在用的连接
    self._check_lock = threading.Lock()
.......
  def get_connection(self, command_name, *keys, **options): #在连接池中获取连接的方法
    "Get a connection from the pool"
    self._checkpid()
    try:
      connection = self._available_connections.pop() #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,
      会直接调用make_connection方法
    except IndexError:
      connection = self.make_connection()
    self._in_use_connections.add(connection)  #向代表正在使用的连接的集合中添加元素
    return connection
  def make_connection(self): #在_available_connections数组为空时获取连接调用的方法
    "Create a new connection"
    if self._created_connections >= self.max_connections:  #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化
      raise ConnectionError("Too many connections")
    self._created_connections += 1  #把代表已经创建的连接的数值+1
    return self.connection_class(**self.connection_kwargs)   #返回有效的连接,默认为Connection(**self.connection_kwargs)
  def release(self, connection): #释放连接,链接并没有断开,只是存在链接池中
    "Releases the connection back to the pool"
    self._checkpid()
    if connection.pid != self.pid:
      return
    self._in_use_connections.remove(connection)  #从集合中删除元素
    self._available_connections.append(connection) #并添加到_available_connections 的数组中
  def disconnect(self): #断开所有连接池中的链接
    "Disconnects all connections in the pool"
    all_conns = chain(self._available_connections,
             self._in_use_connections)
    for connection in all_conns:
      connection.disconnect()

execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:
 

class Connection(object):
  "Manages TCP communication to and from a Redis server"
  def __del__(self):  #对象删除时的操作,调用disconnect释放连接
    try:
      self.disconnect()
    except Exception:
      pass

核心的链接建立方法是通过socket模块实现:

 
   

 def _connect(self):
    err = None
    for res in socket.getaddrinfo(self.host, self.port, 0,
                   socket.SOCK_STREAM):
      family, socktype, proto, canonname, socket_address = res
      sock = None
      try:
        sock = socket.socket(family, socktype, proto)
        # TCP_NODELAY
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        # TCP_KEEPALIVE
        if self.socket_keepalive:  #构造函数中默认 socket_keepalive=False,因此这里默认为短连接
          sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
          for k, v in iteritems(self.socket_keepalive_options):
            sock.setsockopt(socket.SOL_TCP, k, v)
        # set the socket_connect_timeout before we connect
        sock.settimeout(self.socket_connect_timeout) #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式
        # connect
        sock.connect(socket_address)
        # set the socket_timeout now that we're connected
        sock.settimeout(self.socket_timeout) #构造函数中默认socket_timeout=None
        return sock
      except socket.error as _:
        err = _
        if sock is not None:
          sock.close()
.....

关闭链接的方法:
 

  def disconnect(self):
    "Disconnects from the Redis server"
    self._parser.on_disconnect()
    if self._sock is None:
      return
    try:
      self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close
      self._sock.close()
    except socket.error:
      pass
    self._sock = None

       
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索python
python redis 教程、python连接redis、python redis 连接池、python redis关闭连接、python 连接redis集群,以便于您获取更多的相关知识。

时间: 2024-09-21 17:52:38

Python与Redis的连接教程_python的相关文章

Python读写Redis数据库操作示例_python

使用Python如何操作Redis呢?下面用实例来说明用Python读写Redis数据库.比如,我们插入一条数据,如下: 复制代码 代码如下: import redis class Database:      def __init__(self):          self.host = 'localhost'          self.port = 6379      def write(self,website,city,year,month,day,deal_number):    

Python程序语言快速上手教程_python

本来打算从网上找一篇入门教程,但因为Python很少是程序员的第一次接触程序所学的语言,所以网上现有的教程多不是很基础,还是决定自己写下这些. 如果没有程序基础的话,可能会觉得本文涵盖的内容有点多.对照大学里面常教的C语言的教学速度,本文大约有四五个课时的内容:对照网上程序类的视频 教程,大致相当于两三个小时的内容:对于翻一本程序书籍,大约相当于翻一个小时书.也因此,如果有深入学习的打算的话,为了效率还是推荐看书. 如果暂时不能理解本文中的一些内容也没关系,因为都是一些经常会用到的基础知识,在实

Python实现SMTP发送邮件详细教程_python

简介 Python发送邮件的教程本人在网站搜索的时候搜索出来了一大堆,但是都是说了一大堆原理然后就推出了实现代码,我测试用给出的代码进行发送邮件时都不成功,后面找了很久才找到原因,这都是没有一个详细的环境调试导致,所以今天特出一个详细的教程,一步一步从环境调试到代码实现整一个教程,希望对还在苦苦寻找解决方法却迟迟不能得到有效解决的人员一点帮助. SMTP协议 首先了解SMTP(简单邮件传输协议),邮件传送代理程序使用SMTP协议来发送电邮到接收者的邮件服务器.SMTP协议只能用来发送邮件,不能用

使用Python构建Hopfield网络的教程_python

 热的东西显然会变凉.房间会会人沮丧地变得凌乱.几乎同样,消息会失真.逆转这些情况的短期策略分别是重新加热. 做卫生和使用 Hopfield 网络.本文向您介绍了三者中的最后一个,它是一个只需要特定的参数就可以消除噪声的算法.net.py 是一个特别简单的 Python 实现,将向您展示它的基本部分如何结合到一起,以及为何 Hopfield 网络有时可以自失真的图案中 重新得到原图案.尽管这个实现有局限性,不过仍然可以让您获得关于 Hopfield 网络的很多有益且有启发作用的经验. 您寻求的是

在Docker上部署Python的Flask框架的教程_python

本文中,我将尝试展示用Docker开发python应用(主要是Web应用)的可行方法.虽然我本人专注于Python的Flask微框架,但本文目的是演示如何通过Docker更好地开发和共享应用程序,(由任何语言和框架开发的应用程序).Docker通过封装依赖项,大大减少了开发环境和正式产品的差距. 大多数Python开发人员在开发中使用virtualenv.它提供了一种易用的机制让应用程序使用自己专用的依赖项,这些依赖项可能与在其它应用程序或操作系统存在冲突(尤其是不同的Pyhton版本,还有不同

最基础的Python的socket编程入门教程_python

本文介绍使用Python进行Socket网络编程,假设读者已经具备了基本的网络编程知识和Python的基本语法知识,本文中的代码如果没有说明则都是运行在Python 3.4下. Python的socket功能封装在socket库中,要使用socket,记得先import socket,socket库的详细介绍参见官方文档.创建Socket 首先创建一个socket,使用socket库中得socket函数创建. import socket # create an INET, STREAM sock

简单的Python的curses库使用教程_python

curses 库 ( ncurses ) 提供了控制字符屏幕的独立于终端的方法.curses 是大多数类似于 UNIX 的系统(包括 Linux)的标准部分,而且它已经移植到 Windows 和其它系统.curses 程序将在纯文本系统上.xterm 和其它窗口化控制台会话中运行,这使这些应用程序具有良好的可移植性.介绍 curses Python 的标准 curses 提供了"玻璃电传"(glass teletype)(在 20 世纪 70 年代,原始 curses 库刚创建时,它叫

python中的多线程实例教程_python

本文以实例形式较为详细的讲述了Python中多线程的用法,在Python程序设计中有着比较广泛的应用.分享给大家供大家参考之用.具体分析如下: python中关于多线程的操作可以使用thread和threading模块来实现,其中thread模块在Py3中已经改名为_thread,不再推荐使用.而threading模块是在thread之上进行了封装,也是推荐使用的多线程模块,本文主要基于threading模块进行介绍.在某些版本中thread模块可能不存在,要使用dump_threading来代

状态机的概念和在Python下使用状态机的教程_python

什么是状态机? 关于状态机的一个极度确切的描述是它是一个有向图形,由一组节点和一组相应的转移函数组成.状态机通过响应一系列事件而"运行".每个事件都在属于"当前"节点的转移函数的控制范围内,其中函数的范围是节点的一个子集.函数返回"下一个"(也许是同一个)节点.这些节点中至少有一个必须是终态.当到达终态,状态机停止. 但一个抽象的数学描述(就像我刚给出的)并不能真正说明在什么情况下使用状态机可以解决实际编程问题.另一种策略就是将状态机定义成一种强