今天看啥  ›  专栏  ›  FTDdata

python--并行计算

FTDdata  · 简书  ·  · 2021-04-29 09:17

应用python进行数据挖掘或计算时,往往需要遍历多种参数进行数据建模,而单次的建模或计算有时候非常耗时,这时候可以利用python的并行计算功能,加快计算速度。

python能够应用并行计算的模块有多个multiprocessing、pathos等。其中multiprocessing模块应用的较多,但对于数据挖掘场景来说,pathos模块更实用,尤其允许输入多个可变参数非常简单实用。

环境&软件

  • win 10 64bit
  • Python 3.7

功能实现

本文总结整理了常见的并行计算场景,编写parallel.py模块,主要利用pathos模块实现,可以实现单变量并行、多变量并行、并行嵌套等功能。通过tdqm模块增加了进度条,可以显示计算进度等信息,通过functools模块中的partial函数将静态参数冻结,以适应并行框架。
parallel.py

import time
from functools import partial
from pathos.pools import ProcessPool, ThreadPool
from tqdm import tqdm


def parallel(func, *args, show=False, thread=False, **kwargs):
    """
    并行计算
    :param func: 函数,必选参数
    :param args: list/tuple/iterable,1个或多个函数的动态参数,必选参数
    :param show:bool,默认False,是否显示计算进度
    :param thread:bool,默认False,是否为多线程
    :param kwargs:1个或多个函数的静态参数,key-word形式
    :return:list,与函数动态参数等长
    """

    # 冻结静态参数
    p_func = partial(func, **kwargs)
    # 打开进程/线程池
    pool = ThreadPool() if thread else ProcessPool()
    try:
        if show:
            start = time.time()
            # imap方法
            with tqdm(total=len(args[0]), desc="计算进度") as t:  # 进度条设置
                r = []
                for i in pool.imap(p_func, *args):
                    r.append(i)
                    t.set_postfix({'并行函数': func.__name__, "计算花销": "%ds" % (time.time() - start)})
                    t.update()
        else:
            # map方法
            r = pool.map(p_func, *args)
        return r
    except Exception as e:
        print(e)
    finally:
        # 关闭池
        pool.close()  # close the pool to any new jobs
        pool.join()  # cleanup the closed worker processes
        pool.clear()  # Remove server with matching state

函数parallel的参数定义顺序需要注意: 必选参数--任意位置参数--默认参数--任意关键字参数

结果展示

定义另一个parallel_main.py模块,用来展示各个场景下并行计算结果。
parallel_main.py

from parallel import parallel


class A:
    @staticmethod
    def f1(x):
        return x + 1

    @staticmethod
    def f2(x, y):
        return x + y

    @staticmethod
    def f3(x, y, p=100):
        return x + y + p

    @staticmethod
    def f4(x):
        import time
        time.sleep(1)
        return x + 1

    def f5(self, x):
        from parallel import parallel
        r = parallel(self.f1, [1, 2, 3], thread=True)  # [2,3,4]
        return x + sum(r)

    def f6(self, x):
        from parallel import parallel
        r1 = parallel(self.f1, [1, 2, 3], thread=True)  # [2,3,4]
        r2 = parallel(self.f2, [1, 2, 3], r1, thread=True)  # [3,5,7]
        return x + sum(r2)


if __name__ == '__main__':
    f = A()
    print("f1计算结果(单参数并行模式):", parallel(f.f1, [1, 2, 3]), "\n", "#" * 50)
    print("f2计算结果(多参数并行模式):", parallel(f.f2, [1, 2, 3], [4, 5, 6]), "\n", "#" * 50)
    print("f3计算结果(多参数并行+函数参数模式):", parallel(f.f3, [1, 2, 3], [4, 5, 6], p=200), "\n", "#" * 50)
    print("f4计算结果(进度显示):", parallel(f.f4, range(100), show=True), "\n", "#" * 50)
    print("f5计算结果(2层嵌套并行模式):", parallel(f.f5, range(10)), "\n", "#" * 50)
    print("f6计算结果(多层嵌套并行模式):", parallel(f.f6, range(10)), "\n", "#" * 50)
1.png

总结

parallel函数使用注意点:

  • 函数至少输入一个被并行函数,和可迭代序列参数
  • 要显示计算过程,设置show=True
  • 被并行函数的依赖模块需要导入,否则报NameError
  • 嵌套并行需要导入parallel模块,且子并行需要设置为多线程模式(thread=True)



原文地址:访问原文地址
快照地址: 访问文章快照