提问者:小点点

与多个Python脚本共享一个脚本


我想要一个唯一的cript(键/值)数据库,可以从同时运行的多个Python脚本访问。

如果script1.py更新d[2839],那么script2.py应该在几秒钟后查询d[2839]时看到修改后的值。

>

  • 我想过使用SQLite,但是从多个进程并发写/读似乎不是SQLite的强项(假设script1.py刚刚修改了d[2839]script2.py的SQLite连接如何知道它必须重新加载数据库的这个特定部分?)

    当我想刷新修改时,我也想过锁定文件(但这是相当棘手的),并使用json. dump进行序列化,然后尝试检测修改,使用json.load重新加载是否有任何修改等…哦,不,我正在重新发明轮子,并重新发明一个特别低效的键/值数据库!

    redis看起来像一个解决方案,但它不正式支持Windows,这同样适用于eveldb。

    多个脚本可能想要完全同时写入(即使这是一个非常罕见的事件),有没有办法让DB系统处理这个(由于锁定参数?似乎默认情况下SQLite不能这样做,因为“SQLite支持无限数量的同时读取器,但它在任何时刻都只允许一个写入器。”)

    Pythonic的解决方案是什么?

    注意:我在视窗上,而这个命令应该有最大1M项(键和值都是整数)。


  • 共3个答案

    匿名用户

    除了SQLite之外,嵌入式数据存储的摩斯没有针对并发访问的优化,我也对SQLite的并发性能感到好奇,所以我做了一个基准测试:

    import time
    import sqlite3
    import os
    import random
    import sys
    import multiprocessing
    
    
    class Store():
    
        def __init__(self, filename='kv.db'):
            self.conn = sqlite3.connect(filename, timeout=60)
            self.conn.execute('pragma journal_mode=wal')
            self.conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
            self.conn.commit()
    
        def get(self, key):
            item = self.conn.execute('select value from "kv" where key=?', (key,))
            if item:
                return next(item)[0]
    
        def set(self, key, value):
            self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
            self.conn.commit()
    
    
    def worker(n):
        d = [random.randint(0, 1<<31) for _ in range(n)]
        s = Store()
        for i in d:
            s.set(i, i)
        random.shuffle(d)
        for i in d:
            s.get(i)
    
    
    def test(c):
        n = 5000
        start = time.time()
        ps = []
        for _ in range(c):
            p = multiprocessing.Process(target=worker, args=(n,))
            p.start()
            ps.append(p)
        while any(p.is_alive() for p in ps):
            time.sleep(0.01)
        cost = time.time() - start
        print(f'{c:<10d}\t{cost:<7.2f}\t{n/cost:<20.2f}\t{n*c/cost:<14.2f}')
    
    
    def main():
        print(f'concurrency\ttime(s)\tpre process TPS(r/s)\ttotal TPS(r/s)')
        for c in range(1, 9):
            test(c)
    
    
    if __name__ == '__main__':
        main()
    

    结果在我的4核macOS盒,SSD卷:

    concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
    1           0.65    7638.43                 7638.43
    2           1.30    3854.69                 7709.38
    3           1.83    2729.32                 8187.97
    4           2.43    2055.25                 8221.01
    5           3.07    1629.35                 8146.74
    6           3.87    1290.63                 7743.78
    7           4.80    1041.73                 7292.13
    8           5.37    931.27                  7450.15
    

    结果在8核心Windows服务器2012云服务器,SSD卷:

    concurrency     time(s) pre process TPS(r/s)    total TPS(r/s)
    1               4.12    1212.14                 1212.14
    2               7.87    634.93                  1269.87
    3               14.06   355.56                  1066.69
    4               15.84   315.59                  1262.35
    5               20.19   247.68                  1238.41
    6               24.52   203.96                  1223.73
    7               29.94   167.02                  1169.12
    8               34.98   142.92                  1143.39
    

    事实证明,无论并发如何,整体吞吐量都是一致的,并且SQLite在Windows上比macOS慢,希望这有帮助。

    由于SQLite写锁是基于数据库的,为了获得更多的TPS,您可以将数据分区到多数据库文件中:

    class MultiDBStore():
    
        def __init__(self, buckets=5):
            self.buckets = buckets
            self.conns = []
            for n in range(buckets):
                conn = sqlite3.connect(f'kv_{n}.db', timeout=60)
                conn.execute('pragma journal_mode=wal')
                conn.execute('create table if not exists "kv" (key integer primary key, value integer) without rowid')
                conn.commit()
                self.conns.append(conn)
    
        def _get_conn(self, key):
            assert isinstance(key, int)
            return self.conns[key % self.buckets]
    
        def get(self, key):
            item = self._get_conn(key).execute('select value from "kv" where key=?', (key,))
            if item:
                return next(item)[0]
    
        def set(self, key, value):
            conn = self._get_conn(key)
            conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
            conn.commit()
    

    我的mac上有20个分区的结果:

    concurrency time(s) pre process TPS(r/s)    total TPS(r/s)
    1           2.07    4837.17                 4837.17
    2           2.51    3980.58                 7961.17
    3           3.28    3047.68                 9143.03
    4           4.02    2486.76                 9947.04
    5           4.44    2249.94                 11249.71
    6           4.76    2101.26                 12607.58
    7           5.25    1903.69                 13325.82
    8           5.71    1752.46                 14019.70
    

    总TPS高于单个数据库文件。

    匿名用户

    在redis之前,有Memcached(适用于windows)。这里有一个教程。https://realpython.com/blog/python/python-memcache-efficient-caching/

    匿名用户

    我会考虑两种选择,都是嵌入式数据库

    正如这里和这里的回答,应该没问题

    链接

    BerkeleyDB(BDB)是一个旨在为键/值数据提供高性能嵌入式数据库的软件库

    它的设计完全符合你的目的

    BDB可以支持数以千计的控制线程或并发进程同时操作高达256 TB的数据库,3适用于各种操作系统,包括大多数类Unix和Windows系统,以及实时操作系统。

    它很健壮,已经存在了几年甚至几十年

    调用redis/memcached/任何其他需要sysops参与的成熟的基于套接字的服务器IMO是任务在位于同一盒子上的两个脚本之间交换数据的开销