PHP前端开发

在Python中构建缓存

百变鹏仔 4天前 #Python
文章标签 缓存

缓存。有用的东西。如果您不熟悉它,这是一种将数据保存在内存(或磁盘)中以便快速检索的方法。考虑查询数据库以获取某些信息。我们可以只执行一次并将结果保存在缓存中,而不是每次应用程序请求数据时都执行此操作。对数据的后续调用将从缓存中返回副本,而不是进行数据库查询。理论上,这可以提高应用程序的性能。

让我们构建一个简单的缓存以在 python 程序中使用。

缓存api

我将首先创建一个名为 simplecache 的新模块,并在其中定义一个 cache 类。我还不会实现任何东西,我只是想定义我的缓存将使用的 api。

class cache:    """ a simple caching class that works with an in-memory or file based    cache. """    def __init__(self, filename=none):        """ construct a new in-memory or file based cache."""        pass    def update(self, key, item, ttl=60):        """ add or update an item in the cache using the supplied key. optional        ttl specifies how many seconds the item will live in the cache for. """        pass    def get(self, key):        """ get an item from the cache using the specified key. """        pass    def remove(self, key):        """ remove an item from the cache using the specified key. """        pass    def purge(self, all=false):        """ remove expired items from the cache, or all items if flag is set. """        pass    def close(self):        """ close the underlying connection used by the cache. """        pass

到目前为止一切顺利。我们可以通过 __init__ 方法告诉缓存创建一个新的内存中或基于文件的缓存。我们可以使用 update 将项目添加到缓存中 - 如果该项目已存在,则会覆盖该项目。我们可以使用 key get 来获取一个项目。最后,我们可以通过 key remove 删除项目,或者使用 purge 清空过期项目的缓存(可以选择允许清除所有项目)。

在哪里缓存数据?

那么这个类要在哪里缓存数据呢? sqlite 附带 python 标准库,非常适合此类事情。事实上,sqlite 的建议用例之一就是缓存。它允许我们创建内存中或基于文件的 sql 数据库,这也是我们的用例所涵盖的。让我们设计一个可以保存缓存数据的 sql 表。

create table 'cache' (    'key' text not null unique,    'item' blob not null,    'createdon' text not null,    'timetolive' text not null,    primary key("key")));

为了解决这个问题,我们有一个名为 cache 的表,它有四个字段。 key 是一个字符串,将充当表上的唯一主键。接下来我们的 item 字段是二进制数据的 blob - 我在这里想,我们将序列化添加到缓存的对象,然后再将它们保存到数据库中。最后两个字段用于确定缓存中项目的生命周期 - createdon 是添加项目时的时间戳,timetolive 是我们需要保留该项目的时间长度。

立即学习“Python免费学习笔记(深入)”;

构建缓存

让我们首先将 sqlite 库导入到我们的模块中。

import sqlite3

然后我们需要将注意力转向 __init__ 方法。我们有两种情况需要支持:一种是给定文件名,另一种是没有。

def __init__(self, filename=none):    """ construct a new in-memory or file based cache."""    if filename is none:        self._connection = sqlite3.connect(":memory:")    else:        self._connection = sqlite3.connect(filename)    self._create_schema()        

我们可以打开一个连接并在班级的整个生命周期中保留它。因此,在调用内部方法 _create_schema 之前,我们设置 self._connection 属性来保存连接实例(稍后会详细介绍)。

引导架构

如果我们使用内存数据库,那么我们的模式还不存在。然而,对于基于文件的数据库,情况可能并非如此。这可能是已经使用我们的架构设置的现有文件。让我们看看处理这个过程的代码。

def _create_schema(self):    table_name = "cache"    cursor = self._connection.cursor()    result = cursor.execute(        "select name from sqlite_master where type = 'table' and name = ?",        (table_name,))    cache_exists = result.fetchone() is not none    if cache_exists:        return        sql = """        create table 'cache' (        'key' text not null unique,        'item' blob not null,        'createdon' text not null,        'timetolive' text not null,        primary key('key'))    """    cursor.execute(sql)    cursor.close()

首先我们定义表名并打开游标来执行一些数据库操作。接下来我们检查主表中是否存在我们的表。如果存在,我们退出该方法,否则我们执行 create table... 语句以在数据库中构建我们的表。

我们现在可以使用内存数据库或基于文件的数据库实例化我们的 cache 类来缓存对象。

添加到缓存

现在让我们将注意力转向向缓存添加项目。回想一下我们 api 中的 update 方法将对此负责。如果缓存中已存在某个键,我们将用提供的任何内容替换其条目。

def update(self, key, item, ttl=60):    """ add or update an item in the cache using the supplied key. optional    ttl specifies how many seconds the item will live in the cache for. """    sql = "select key from 'cache' where key = ?"    cursor = self._connection.cursor()    result = cursor.execute(sql, (key,))    row = result.fetchone()    if row is not none:        sql = "delete from 'cache' where key = ?"        cursor.execute(sql, (key,))        connection.commit()    sql = "insert into 'cache' values(?, ?, datetime(), ?)"    pickled_item = pickle.dumps(item)    cursor.execute(sql, (key, pickled_item, ttl))    self._connection.commit()    cursor.close()

首先我们获得内存数据库连接。然后我们测试缓存中是否存在提供的键值。如果存在,则将其从缓存中删除。接下来,我们对提供的项目值进行 pickle,将其转换为二进制 blob。然后我们将密钥、选取的项目和 ttl 插入缓存中。

从缓存中获取数据

我们从缓存中获取项目的行为大多是直接的。我们查找具有指定键的条目并取消关联的项目。当 ttl 值过期时,此过程会变得复杂。也就是说,创建日期加上 ttl 小于当前时间。如果是这种情况,则该项目已过期,应从缓存中删除。

这种方法存在一个哲学问题。有一种思想认为方法应该返回一个值(读取)执行数据突变(写入)。在这里,我们可能同时做这两件事。我们故意引入一个副作用(删除过期的项目)。我认为在这种情况下这是可以的,但其他程序员可能会反对。

def get(self, key):    """ get an item from the cache using the specified key. """    sql = "select item, createdon, timetolive from 'cache' where key = ?"    cursor = self._connection.cursor()    result = cursor.execute(sql, (key,))    row = result.fetchone()    if row is none:        return    item = pickle.loads(row[0])    expiry_date = datetime.datetime.fromisoformat(row[1]) + datetime.timedelta(seconds=int(row[2]))    now = datetime.datetime.now()    if expiry_date <= now:        sql = "delete from 'cache' where key = ?"        cursor.execute(sql, (key,))        self._connection.commit()        item = none    cursor.close()    return item

删除并清除缓存中的项目

删除项目很简单 - 事实上我们已经在其他两种方法中完成了(两次)。这是重构的理想选择(我们稍后会讨论)。现在,我们将直接实现该方法。

def remove(self, key):    """ remove an item from the cache using the specified key. """    sql = "delete from 'cache' where key = ?"    cursor = self._connection.cursor()    cursor.execute(sql, (key,))    self._connection.commit()    cursor.close()

清除物品有点复杂。我们支持两种场景 - 清除所有项目和仅清除过期项目。让我们看看如何实现这一目标。

def purge(self, all=false):    """ remove expired items from the cache, or all items if flag is set. """    cursor = self._connection.cursor()    if all:        sql = "delete from 'cache'"        cursor.execute(sql)        self._connection.commit()    else:        sql = "select key, createdon, timetolive from 'cache'"        for row in cursor.execute(sql):            expiry_date = datetime.datetime.fromisoformat(row[1]) + datetime.timedelta(seconds=int(row[2]))            now = datetime.datetime.now()            if expiry_date <= now:                sql = "delete from 'cache' where key = ?"                cursor.execute(sql, (key,))                self._connection.commit()    cursor.close()

删除所有内容非常简单。我们只需运行 sql 来删除缓存表中的所有内容。对于仅过期的项目,我们需要循环遍历每一行,计算过期日期,并确定是否应将其删除。同样,后一段代码已从我们的其他方法之一中重复(在本例中为 get )。另一个重构候选者。

重构

我们有一个有效的缓存实现,满足我们原始的 api 规范。然而,有一些重复的代码,我们可以将其分解到他们自己的方法中。让我们从 get、update、remove 和 purge 方法中存在的删除逻辑开始。这些实例都可以通过调用以下新方法来替换。

def _remove_item(self, key, cursor):    sql = "delete from 'cache' where key = ?"    cursor.execute(sql, (key,))    cursor.connection.commit()

我们可以看到这对我们的代码有很大的影响。其他四种方法现在正在调用一个通用的 _remove_item 方法。接下来我们看一下过期日期检查代码。

def _item_has_expired(self, created, ttl):    expiry_date = datetime.datetime.fromisoformat(created) + datetime.timedelta(seconds=int(ttl))    now = datetime.datetime.now()    return expiry_date <= nowdef get(self, key):    """ get an item from the cache using the specified key. """    sql = "select item, createdon, timetolive from 'cache' where key = ?"    cursor = self._connection.cursor()    result = cursor.execute(sql, (key,))    row = result.fetchone()    if row is none:        return    item = pickle.loads(row[0])    if self._item_has_expired(row[1], row[2]):        self._remove_item(key, cursor)        item = none    cursor.close()    return itemdef purge(self, all=false):    """ remove expired items from the cache, or all items if flag is set. """    cursor = self._connection.cursor()    with self.__class__.lock:        if all:            sql = "delete from 'cache'"            cursor.execute(sql)            self._connection.commit()        else:            sql = "select key, createdon, timetolive from 'cache'"            for row in cursor.execute(sql):                if self._item_has_expired(row[1], row[2]):                    self._remove_item(row[0], cursor)    cursor.close()

太棒了。我们还减少了两个地方的代码重复。

带锁的线程安全

我们快完成了。为了使其成为一个健壮的类,我们需要确保线程安全。缓存通常可以作为应用程序中的单例实例出现,因此线程安全非常重要。我们将通过在破坏性缓存操作周围使用锁来实现这一点。这就是我们整个班级添加锁定后的样子。请注意添加和删除操作周围的 with 块。即使出现问题,这些也能确保锁被释放。

#! /usr/bin/env python3import datetimeimport pickleimport sqlite3import threadingclass cache:    """ a simple caching class that works with an in-memory or file based    cache. """    _lock = threading.lock()    def __init__(self, filename=none):        """ construct a new in-memory or file based cache."""        if filename is none:            self._connection = sqlite3.connect(":memory:")        else:            self._connection = sqlite3.connect(filename)        self._create_schema()    def _create_schema(self):        table_name = "cache"        cursor = self._connection.cursor()        result = cursor.execute(            "select name from sqlite_master where type = 'table' and name = ?",            (table_name,))        cache_exists = result.fetchone() is not none        if cache_exists:            return            sql = """            create table 'cache' (            'key' text not null unique,            'item' blob not null,            'createdon' text not null,            'timetolive' text not null,            primary key('key'))        """        cursor.execute(sql)        cursor.close()    def update(self, key, item, ttl=60):        """ add or update an item in the cache using the supplied key. optional        ttl specifies how many seconds the item will live in the cache for. """        sql = "select key from 'cache' where key = ?"        cursor = self._connection.cursor()        result = cursor.execute(sql, (key,))        row = result.fetchone()        with self.__class__._lock:            if row is not none:                self._remove_item(key, cursor)            sql = "insert into 'cache' values(?, ?, datetime(), ?)"            pickled_item = pickle.dumps(item)            cursor.execute(sql, (key, pickled_item, ttl))            self._connection.commit()        cursor.close()    def _remove_item(self, key, cursor):        sql = "delete from 'cache' where key = ?"        cursor.execute(sql, (key,))        cursor.connection.commit()    def get(self, key):        """ get an item from the cache using the specified key. """        sql = "select item, createdon, timetolive from 'cache' where key = ?"        cursor = self._connection.cursor()        result = cursor.execute(sql, (key,))        row = result.fetchone()        if row is none:            return        item = pickle.loads(row[0])        if self._item_has_expired(row[1], row[2]):            with self.__class__._lock:                self._remove_item(key, cursor)            item = none        cursor.close()        return item    def _item_has_expired(self, created, ttl):        expiry_date = datetime.datetime.fromisoformat(created) + datetime.timedelta(seconds=int(ttl))        now = datetime.datetime.now()        return expiry_date <= now    def remove(self, key):        """ remove an item from the cache using the specified key. """        cursor = self._connection.cursor()        with self.__class__._lock:            self._remove_item(key, cursor)        cursor.close()    def purge(self, all=false):        """ remove expired items from the cache, or all items if flag is set. """        cursor = self._connection.cursor()        with self.__class__._lock:            if all:                sql = "delete from 'cache'"                cursor.execute(sql)                self._connection.commit()            else:                sql = "select key, createdon, timetolive from 'cache'"                for row in cursor.execute(sql):                    if self._item_has_expired(row[1], row[2]):                        with self.__class__._lock:                            self._remove_item(row[0], cursor)        cursor.close()    def close(self):        """ close the underlying connection used by the cache. """        self._connection.close()

测试缓存

是时候测试我们的缓存了。我们可以通过如下方式启动交互式会话来做到这一点。

python -i simplecache.py

现在我们可以新建一个内存缓存并测试我们的方法。

>>> c = Cache()>>> c.update("key", "some value")>>> c.update("key2", [1, 2, 3], 300)>>> c.get("key")'some vlaue'>>> c.remove("key")>>> c.purge()>>> c.get("key2")[1, 2, 3]>>> c.purge(True)>>> c.get("key2")>>> c.close()>>>

供读者练习

  1. 为 cache 类编写一套单元测试。测试有多容易?您需要进行任何更改以适应测试吗?

  2. 让生活的时间成为一个滑动窗口而不是固定的时间。也就是说,每当从缓存中检索到某个项目时,其生存时间值就会重新开始。

  3. 添加将缓存内容写入屏幕的方法。