python百万级并发,python做接口绝对并发

让计算机程序并发的运行是一个经常被讨论的话题,今天我想讨论一下Python下的各种并发方式。

并发方式

线程(Thread)

多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中由于使用了全局解释锁(GIL)的原因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现使用多线程来改进自己的Python代码后,程序的运行效率却下降了,这是多么蛋疼的一件事呀!如果想了解更多细节,推荐阅读这篇文章。实际上使用多线程的编程模型是很困难的,程序员很容易犯错,这并不是程序员的错误,因为并行思维是反人类的,我们大多数人的思维是串行(精神分裂不讨论),而且冯诺依曼设计的计算机架构也是以顺序执行为基础的。所以如果你总是不能把你的多线程程序搞定,恭喜你,你是个思维正常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供基础的,低**(Low Level)接口,使用Function作为线程的运行体。还有一组是threading模块,提供更容易使用的基于对象的接口(类似于Java),可以继承Thread对象来实现线程,还提供了其它一些线程相关的对象,例如Timer,Lock

使用thread模块的例子

import thread

def worker():

 """thread worker function"""

 print 'Worker'

thread.start_new_thread(worker)

使用threading模块的例子

import threading

def worker():

 """thread worker function"""

 print 'Worker'

t = threading.Thread(target=worker)

t.start()

或者Java Style

import threading

class worker(threading.Thread):

 def __init__(self):

 pass

 def run():

 """thread worker function"""

 print 'Worker'

 

t = worker()

t.start()

进程 (Process)

由于前文提到的全局解释锁的问题,Python下比较好的并行方式是使用多进程,这样可以非常有效的使用CPU资源,并实现真正意义上的并发。当然,进程的开销比线程要大,也就是说如果你要创建数量惊人的并发进程的话,需要考虑一下你的机器是不是有一颗强大的心。

Python的mutliprocess模块和threading具有类似的接口。

from multiprocessing import Process



def worker():

 """thread worker function"""

 print 'Worker'

p = Process(target=worker)

p.start()

p.join()

由于线程共享相同的地址空间和内存,所以线程之间的通信是非常容易的,然而进程之间的通信就要复杂一些了。常见的进程间通信有,管道,消息队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,可以方便的在进程间传递消息。

Python进程间的同步使用锁,这一点喝线程是一样的。

另外,Python还提供了进程池Pool对象,可以方便的管理和控制线程。

远程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上似乎已经失去了效果,数据的计算和处理需要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,已经是现在的软件架构所必需考虑的问题。

远程主机间的进程间通信有几种常见的方式

  • TCP/IPTCP/IP是所有远程通信的基础,然而API比较低级别,使用起来比较繁琐,所以一般不会考虑
  • 远程方法调用 Remote Function CallRPC是早期的远程进程间通信的手段。Python下有一个开源的实现RPyC
  • 远程对象 Remote Object远程对象是更高级别的封装,程序可以想操作本地对象一样去操作一个远程对象在本地的代理。远程对象最广为使用的规范CORBA,CORBA最大的好处是可以在不同语言和平台中进行通信。当让不用的语言和平台还有一些各自的远程对象实现,例如Java的RMI,MS的DCOMPython的开源实现,有许多对远程对象的支持
    • Dopy
    • Fnorb (CORBA)
    • ICE
    • omniORB (CORBA)
    • Pyro
    • YAMI
  • 消息队列 Message Queue比起RPC或者远程对象,消息是一种更为灵活的通信手段,常见的支持Python接口的消息机制有
    • RabbitMQ
    • ZeroMQ
    • Kafka
    • AWS SQS + BOTO

在远程主机上执行并发和本地的多进程并没有非常大的差异,都需要解决进程间通信的问题。当然对远程进程的管理和协调比起本地要复杂。

Python下有许多开源的框架来支持分布式的并发,提供有效的管理手段包括:

  • Celery Celery是一个非常成熟的Python分布式框架,可以在分布式的系统中,异步的执行任务,并提供有效的管理和调度功能。参考这里
  • SCOOPSCOOP (Scalable COncurrent Operations in Python)提供简单易用的分布式调用接口,使用Future接口来进行并发。
  • Dispy相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务
  • PP PP (Parallel Python)是另外一个轻量级的Python并行服务, 参考这里
  • AsyncoroAsyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的系统,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的支持,例如Spark

伪线程 (Pseudo-Thread)

还有一种并发手段并不常见,我们可以称之为伪线程,就是看上去像是线程,使用的接口类似线程接口,但是实际使用非线程的方式,对应的线程开销也不存的。

  • greenlet greenlet提供轻量级的coroutines来支持进程内的并发。greenlet是Stackless的一个副产品,使用tasklet来支持一中被称之为微线程(mirco-thread)的技术,这里是一个使用greenlet的伪线程的例子
from greenlet import greenlet



def test1():

 print 12

 gr2.switch()

 print 34

 

def test2():

 print 56

 gr1.switch()

 print 78

 

gr1 = greenlet(test1)

gr2 = greenlet(test2)

gr1.switch()

运行以上程序得到如下结果:

12

56

34

伪线程gr1 switch会打印12,然后调用gr2 switch得到56,然后switch回到gr1,打印34,然后伪线程gr1结束,程序退出,所以78永远不会被打印。通过这个例子我们可以看出,使用伪线程,我们可以有效的控制程序的执行流程,但是伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

  • eventlet

eventlet是一个提供网络调用并发的Python库,使用者可以以非阻塞的方式来调用阻塞的IO操作。

import eventlet

from eventlet.green import urllib2



urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']



def fetch(url):

 return urllib2.urlopen(url).read()



pool = eventlet.GreenPool()



for body in pool.imap(fetch, urls):

 print("got body", len(body))

执行结果如下

('got body', 17629)

('got body', 1270)

('got body', 46949)

eventlet为了支持generator的操作对urllib2做了修改,接口和urllib2是一致的。这里的GreenPool和Python的Pool接口一致。

  • gevent

gevent和eventlet类似,关于它们的差异大家可以参考这篇文章

import gevent

from gevent import socket

urls = ['www.google.com', 'www.example.com', 'www.python.org']

jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

gevent.joinall(jobs, timeout=2)



print [job.value for job in jobs]

执行结果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']
  • concurence

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家可以自己尝试一下。

实战运用

通常需要用到并发的场合有两种,一种是计算密集型,也就是说你的程序需要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包括读写文件,收发网络请求等等。

计算密集型

对应计算密集型的应用,我们选用著名的蒙特卡洛算法来计算PI值。基本原理如下

 

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4 : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个结论,只要我们模拟出点落在四分之一圆上的概率就可以知道圆周率了,为了得到这个概率,我们可以通过大量的实验,也就是生成大量的点,看看这个点在哪个区域,然后统计出结果。

基本算法如下:

from math import hypot

from random import random



def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))

这里test方法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断方法是检查该点到圆心的距离,如果小于R则是在圆上。

通过大量的并发,我们可以快速的运行多次试验,试验的次数越多,结果越接近真实的圆周率。

这里给出不同并发方法的程序代码

  • 非并发我们先在单线程,但进程运行,看看性能如何
from math import hypot

from random import random

import eventlet

import time



def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))



def calcPi(nbFutures, tries):

 ts = time.time()

 result = map(test, [tries] * nbFutures)

 

 ret = 4. * sum(result) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



print calcPi(3000,4000)
  • 多线程 thread为了使用线程池,我们用multiprocessing的dummy包,它是对多线程的一个封装。注意这里代码虽然一个字的没有提到线程,但它千真万确是多线程。通过测试我们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行结果和没有并发时一样,当我们把线程池数字设置为5时,耗时几乎是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的任务,还是放弃多线程吧。
from multiprocessing.dummy import Pool



from math import hypot

from random import random

import time



def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))



def calcPi(nbFutures, tries):

 ts = time.time()

 p = Pool(1)

 result = p.map(test, [tries] * nbFutures)

 ret = 4. * sum(result) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



if __name__ == '__main__':

 p = Pool()

 print("pi = {}".format(calcPi(3000, 4000)))
  • 多进程 multiprocess理论上对于计算密集型的任务,使用多进程并发比较合适,在以下的例子中,进程池的规模设置为5,修改进程池的大小可以看到对结果的影响,当进程池设置为1时,和多线程的结果所需的时间类似,因为这时候并不存在并发;当设置为2时,响应时间有了明显的改进,是之前没有并发的一半;然而继续扩大进程池对性能影响并不大,甚至有所下降,也许我的Apple Air的CPU只有两个核?当心,如果你设置一个非常大的进程池,你会遇到 Resource temporarily unavailable的错误,系统并不能支持创建太多的进程,毕竟资源是有限的。
from multiprocessing import Pool



from math import hypot

from random import random

import time



def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))



def calcPi(nbFutures, tries):

 ts = time.time()

 p = Pool(5)

 result = p.map(test, [tries] * nbFutures)

 ret = 4. * sum(result) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



if __name__ == '__main__':

 print("pi = {}".format(calcPi(3000, 4000)))
  • gevent (伪线程)不论是gevent还是eventlet,因为不存在实际的并发,响应时间和没有并发区别不大,这个和测试结果一致。
import gevent

from math import hypot

from random import random

import time



def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))



def calcPi(nbFutures, tries):

 ts = time.time()

 jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]

 gevent.joinall(jobs, timeout=2)

 ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



print calcPi(3000,4000)
  • eventlet (伪线程)
from math import hypot

from random import random

import eventlet

import time



def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))



def calcPi(nbFutures, tries):

 ts = time.time()

 pool = eventlet.GreenPool()

 result = pool.imap(test, [tries] * nbFutures)

 

 ret = 4. * sum(result) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



print calcPi(3000,4000)
  • SCOOP

SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。

from math import hypot

from random import random

from scoop import futures



import time



def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))



def calcPi(nbFutures, tries):

 ts = time.time()

 expr = futures.map(test, [tries] * nbFutures)

 ret = 4. * sum(expr) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



if __name__ == "__main__":

 print("pi = {}".format(calcPi(3000, 4000)))
  • Celery

任务代码

from celery import Celery



from math import hypot

from random import random

 

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

 

@app.task

def test(tries):

 return sum(hypot(random(), random()) < 1 for _ in range(tries))

客户端代码

from celery import group

from tasks import test



import time



def calcPi(nbFutures, tries):

 ts = time.time()

 result = group(test.s(tries) for i in xrange(nbFutures))().get()

 

 ret = 4. * sum(result) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



print calcPi(3000, 4000)

使用Celery做并发的测试结果出乎意料(环境是单机,4frefork的并发,消息broker是rabbitMQ),是所有测试用例里最糟糕的,响应时间是没有并发的5~6倍。这也许是因为控制协调的开销太大。对于这样的计算任务,Celery也许不是一个好的选择。

  • asyncoroAsyncoro的测试结果和非并发保持一致。
import asyncoro



from math import hypot

from random import random

import time



def test(tries):

 yield sum(hypot(random(), random()) < 1 for _ in range(tries))





def calcPi(nbFutures, tries):

 ts = time.time()

 coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]

 ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)

 span = time.time() - ts

 print "time spend ", span

 return ret



print calcPi(3000,4000)

IO密集型

IO密集型的任务是另一种常见的用例,例如网络WEB服务器就是一个例子,每秒钟能处理多少个请求时WEB服务器的重要指标。

我们就以网页读取作为最简单的例子

from math import hypot

import time

import urllib2



urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']



def test(url):

 return urllib2.urlopen(url).read()



def testIO(nbFutures):

 ts = time.time()

 map(test, urls * nbFutures)



 span = time.time() - ts

 print "time spend ", span



testIO(10)

在不同并发库下的代码,由于比较类似,我就不一一列出。大家可以参考计算密集型中代码做参考。

通过测试我们可以发现,对于IO密集型的任务,使用多线程,或者是多进程都可以有效的提高程序的效率,而使用伪线程性能提升非常显著,eventlet比没有并发的情况下,响应时间从9秒提高到0.03秒。同时eventlet/gevent提供了非阻塞的异步调用模式,非常方便。这里推荐使用线程或者伪线程,因为在响应时间类似的情况下,线程和伪线程消耗的资源更少。

本站部分内容由互联网用户自发贡献,该文观点仅代表作者本人,本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如发现本站有涉嫌抄袭侵权/违法违规等内容,请联系我们举报!一经查实,本站将立刻删除。

(0)

相关推荐

  • 五香牛肉最正宗的做法,五香牛肉的做法及配料

    (1)配料标准 ①主料牛肉5千克。 ②辅料食盐300克,白糖150克,花椒10克,八角菌香10克,丁香2.5克,草果5克,陈皮5克,鲜姜50克,硝酸钠5克。 (2)加工方法 ①原料整理选用卫生合格的鲜牛肉,剔去骨头、筋腿,切成200克左右的肉块。 ②腌制切好的牛肉块加入食盐、硝酸钠,拌和均匀,放入缸内在低温下腌制12天,期间翻倒几次。腌好的肉块在清水中浸泡2…

    2023-07-02
  • 选择显卡,显卡参数怎么看性能好

    之前笔者给大家介绍了一些关于显卡参数的知识(贵的肯定好?显卡参数该怎么看),很多玩家表示看过后虽然明白了许多以前不知道的细节,但还是有一种“云里雾里”的感觉,真正到了自己挑选显卡时,还是有一些头疼。有一种“看了这么多文章 为什么我还是不会选显卡?”的感觉,所以今天的这篇选购指南就是为了让大家全面了解到底该如何选择显卡。 在选购显卡时,最重要的就需求和价格,首…

    2023-07-12 用户投稿
  • 圆锥母线(圆锥母线和底面半径的关系)

    知识链接: 1.圆锥的底面半径r,高h及母线l可以组成直角三角形. 2.在直角三角形中,如果一个锐角所对的直角边等于斜边的一半,那么,这个锐角等于30o. 3.圆锥的底面周长2πr=圆锥侧面展开图的弧长l. 4.扇形面积S=1/2lR. 题目: 求:(1)圆锥的母线长与底面半径之比; (2)∠BAC 的度数; (3)圆锥的侧面积(结果保留π). 解析: (1…

    2022-05-05
  • 小学防溺水观后感(防溺水观后感200字)

    红网时刻常德4月25日讯(通讯员 李国君)“生命安全高于天,父母给我们的生命只有一次,每个人没有理由不珍惜生命、注意安全。”4月25日上午,汉寿县三和中学综治办主任刘彦华,在防溺水安全教育工作部署会上讲到。同时,汉寿县三和中学开展了以“珍爱生命预防溺水”为主题的专题教育系列活动。 在学校精心策划、教师们的认真组织下,开展了“八个一”防溺水系列活动:开一次会(…

    2022-05-05
  • 河南联通超级流量王(联通流量王臻享版套餐)

    对于中国联通这家运营商,大多数人都不陌生了,这是我们最为熟悉的三家运营商之一,并且也是我国目前来说仅有的四家基础运营商之一,而我们下面要讲的主要内容,就是联通的一种流量卡,叫做“联通流量王臻享版”。 流量多、资费低、不限速,相信这是很多人对于流量卡的一个要求,这种流量卡确实存在,不过都是限时办理的优惠套餐,例如“联通流量王臻享版”就是其中之一,下面我们就来了…

    2023-05-30
  • 国产冰箱什么牌子好,国产冰箱哪个牌子最受欢迎?

    #头条创作挑战赛# 家庭装修,一般都会去选购各种各样的冰箱。 市面上冰箱的种类,样式,款式非常的多。那么到底怎么选呢? 冰箱选对了,确实很好用;选不对,带来的可能就是烦恼。 为此,家居杂坛给大家总结了换过3台冰箱的经验,最终给大家推荐以下6点选择建议,都是经验教训。 但是,对于不同的冰箱品牌,大家一定要区分清楚: ①、首先我们要说的就是进口的冰箱品牌。例如德…

    用户投稿 2023-05-13
  • 辞职书怎么写最简单(最简单的个人辞职原因)

    一、以家庭原因辞职的理由: 1、父母生病了,需要回家照顾 2、家里孩子太小,离不开。 3、家里准备盖房子。 4、家里庄稼要开始收割。 5、家里发生了邻里纠纷,要回去处理。 6、兄弟或姐妹要结婚,辞职回家。 二、以个人原因辞职理由 1、身体吃不消。 2、自己要回去结婚。 3、回去读书,再也不打工了。 4、我水土不服。 5、我非常想家。 6、自己太笨,学不好技术…

    用户投稿 2022-05-06
  • 光子嫩肤的好处,光子嫩肤的后遗症危害

    日常生活中,很多人都会对自己的皮肤进行保养,通过各种方式,方法,让肌肤变得光滑,紧致。同样,明星也不例外,他们会更加注重自己的皮肤状态,使皮肤永远保持胶原蛋白。   光子嫩肤是什么? 光子嫩肤是一种先进的高科技的美容项目,是采用特定的宽光谱的彩光直接照射于皮肤表面,可以穿透皮肤的深层,选择性的作用于皮下的色素或血管分解色斑,闭合异常的毛细血管,同时…

    2023-07-13 用户投稿
  • 梦幻西游5级称谓,狮驼岭五级称谓攻略

    在梦幻西游这款游戏中,每个玩家对游戏的需求不一样,可能大多数追求的是人物角色的战斗力,或者追求花枝招展的锦衣以及祥瑞。然而有少部分土豪玩家所追求的是角色成就点或者人物称谓。例如曾是梦幻西游一代枭雄的陈无敌,花几百万人民币就是为了把成就点刷到全服第一。最近又出现了一位任性玩家,为了一个称谓竟花7万元购买了一个69级小号,有钱人真的是任性。下面我们一起来欣赏一下…

    2023-06-25 用户投稿
  • 骆驼祥子读书笔记摘抄(骆驼祥子读书笔记摘抄第十章)

    个别的解决,祥子没那么聪明。全盘的清算,他没那个魄力。于是,一点儿办法没有,整天际圈着满肚子委屈。正和一切的生命同样,受了损害之后,无可如何的只想由自己去收拾残局。那斗落了大腿的蟋蟀,还想用那些小腿儿爬。祥子没有一定的主意,只想慢慢的一天天,一件件的挨过去,爬到哪儿算哪儿,根本不想往起跳了。 离二十七还有十多天,他完全注意到这一天上去,心里想的,口中念道的,…

    2022-05-05
  • 苹果笔记本论坛,科技股破万亿

    每经记者:吴永久 每经编辑:何剑岭 股民懵了,700多亿市值的科技巨头蓝思科技跌停! 今日,蓝思科技以跌停价15.88元开盘,随后打开跌停,但由于抛盘汹涌,很快就被砸至跌停,直至收盘。收盘时,跌停价依然有27.6万手卖单。 从公开消息来看,4月10日晚间,公司公布了150亿元的定增预案。此后,这个方案引发资本市场热议,“布局5G说”和“黑心圈钱说”两大阵营激…

    2023-06-29
  • 小学生创新作文网(实施四质工程,培育时代新人心得体会)

    为进一步丰富校园文化生活,激发学生的写作兴趣,提高学生的习作水平和语文素养,营造良好的读书、习作氛围,4月6日,巴中市南江县沙河镇乐坝小学根据上级文件要求组织二至六年级178名学生参与第十九届全国创新作文征文初赛活动。各班根据活动组织方设置的不同的写作要求,积极、认真筹备此次征文活动。 在作文竞赛过程中涌现出了许多写作小能手。一篇篇文章书写工整,内容真实,情…

    2023-05-29