并发编程

| 分类 python  | 标签 thread  python  process  concurrent  浏览次数: -

进程

进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。

线程

线程是进程的一个实体,是CPU调度和分配的基本单位。线程自己基本上不拥有系统资源。只要拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)。但是它可以与属于同一个进程的其他线程共享进程所拥有的全部资源。一个线程可以创建和撤销另一个线程。同一个进程中的多个线程之间可以并发执行。

进程与应用程序的区别

  • 应用程序作为一个静态文件存在在计算机系统的硬盘存储空间内
  • 进程处于动态条件下的由操作系统维护的系统资源管理实体 一个程序至少由一个进程,一个进程至少有一个线程

多任务实现方式

  • 多进程模式
  • 多线程模式
  • 多进程 + 多线程模式

fork函数:普通函数调用,调用一次,返回一次,fork()调用一次返回两次。因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

import os

print '进程: (%s)' % os.getpid()
pid = os.fork()  # 返回两次
if pid == 0:
    print '子进程: (%s) 的父进程为:(%s)' % (os.getpid(), os.getppid())
else:
    print '父进程: (%s) 创建子进程: (%s)' % (os.getpid(), pid)

守护进程(damon)没有任何存在的父进程(即PPID=1),且在UNIX系统进程层级中直接位于init之下。守护进程程序通常通过如下方法使自己成为守护进程:对一个子进程运行fork,然后使其父进程立即终止,使得这个子进程能在init下运行。这种方法通常被称为“脱壳”。

TODO 守护进程,qdatamgr ping

multiprocessing模块

multiprocessing模块提供了一个Process类来代表一个进程对象

启动一个子进程并等待其结束

from multiprocessing import Process
import os


def run_child(name):
    print '子进程 (%s) pid: (%s), 父进程pid: (%s)...' % (name, os.getpid(), os.getppid())


if __name__ == '__main__':
    print '当前进程号: (%s)' % os.getpid()
    # 子进程要执行的代码
    p = Process(target=run_child, args=('test',))
    print '开启子进程'
    p.start()  # 启动子进程
    p.join()  # 待子进程结束后再继续往下运行,通常用于进程间的同步。
    print '子进程运行结束'

Pool

Pool进程池批量创建子进程

import os
import time
import random
from multiprocessing import Pool


def long_time_task(name):
    print('任务名: %s 进程号: (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('任务 %s 运行了 %0.2f 秒' % (name, (end - start)))


if __name__ == '__main__':
    print('当前进程 (%s)' % os.getpid())
    p = Pool(3)  # 开启几个进程
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待子进程完成...')
    p.close()
    p.join()  # 等待子进程结束
    print('所有子进程结束')

进程池一般与map一起使用,如 P.map(xxx,xx)

subprocess模块可以非常方便地启动一个子进程,然后控制其输入和输出。

输出:

import subprocess

code = subprocess.call('ls')  # 执行外部命令的输出
print 'Exit code:', code  # 返回值,0:成功

输入:

import subprocess

p = subprocess.Popen(['nslookup'],
                     stdin=subprocess.PIPE,
                     stdout=subprocess.PIPE,
                     stderr=subprocess.PIPE)
std_out, std_err = p.communicate(
    b'set q=mx\npython.org\nexit\n')  # 等价于在终端输入nslookup后继续输入 set q=mx # Enter回车 python.org # Enter回车 exit # Enter回车
print std_out.decode('utf-8')
print "Error:", std_err
print 'Exit code:', p.returncode
print 'error code:', p.wait()

进程间通信

通过QueuePipes等实现。

from multiprocessing import Process, Queue
import os
import time
import random


def write(data):
    print "写 进程: (%s)" % os.getpid()
    for value in ['A', 'B', 'C']:
        print '%s 写入 Queue' % value
        data.put(value)
        time.sleep(random.random())


def read(data):
    print "读 进程: (%s)" % os.getpid()
    while True:
        value = data.get(True)
        print '读取的值: %s' % value


if __name__ == '__main__':
    # 父进程创建Queue,并传给子进程
    q = Queue()
    data_write = Process(target=write, args=(q,))
    data_read = Process(target=read, args=(q,))
    # 启动 写 子进程
    data_write.start()
    # 启动 读 子进程
    data_read.start()
    # 等待 写 进程结束
    data_write.join()
    # 终止 读 进程
    data_read.terminate()

多线程

Python的标准库提供了两个模块:_threadthreading_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

current_thread()函数,它永远返回当前线程的实例。主线程实例的名字叫MainThread,子线程的名字在创建时指定,我们用LoopThread命名子线程。

import time
import threading


def loop():
    print '子线程 %s 开始运行' % threading.current_thread().name
    n = 0
    while n < 5:
        n += 1
        print '线程名 %s >>> %s' % (threading.current_thread().name, n)
        time.sleep(1)
    print '子线程 %s 运行结束' % threading.current_thread().name


if __name__ == '__main__':
    print '主线程: %s 开始运行' % threading.current_thread().name
    t = threading.Thread(target=loop, name='LoopThread')
    t.start()
    t.join()
    print '主线程 %s 运行结束' % threading.current_thread().name

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了.

Lock:线程锁,锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁。注意避免死锁,如何避免????

  • 加锁之后,只有一个线程能对run_thread操作
import threading

balance = 0
lock = threading.Lock()  # 创建锁


def change_it(n):
    # 先存后取,结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        # ========= 第一种方式 ===========
        # 先要获取锁:
        lock.acquire()
        try:
            # 锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()
        # ========= 第二种方式 ===========
        with threading.Lock():
            change_it(n)
        # with 语句会在这个代码块执行前自动获取锁,在执行结束后自动释放锁



if __name__ == '__main__':
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print balance

TODO: 锁一般包成上下文管理器,结合with使用,如: with get_lock(),还可以在管理器中加入时间等

多线程中全局变量使用必须加锁,局部变量传递不方便。利用threading.local()传递局部变量

import threading

# 创建全局ThreadLocal对象, 全局变量
local_school = threading.local()


def process_student():
    # 获取当前线程关联的student:
    std = local_school.name
    print 'Hello, %s (in %s)' % (std, threading.current_thread().name)


def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.name = name
    process_student()

if __name__ == '__main__':
    t1 = threading.Thread(target=process_thread, args=('ABC',), name='Thread-A')
    t2 = threading.Thread(target=process_thread, args=('DEF',), name='Thread-D')
    t1.start()
    t2.start()
    t1.join()
    t2.join()

多线程死锁的避免,可重入锁:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁

import time
import threading

number = 0
lock = threading.Lock()
r_lock = threading.RLock()


class MyThreadLock(threading.Thread):
    def run(self):
        global number
        time.sleep(1)

        if lock.acquire(1):
            number += 1
            msg = self.name + ' set number to ' + str(number)
            print msg
            lock.acquire()
            lock.release()
            lock.release()


class MyThreadRLock(threading.Thread):
    def run(self):
        global number
        time.sleep(1)
        if r_lock.acquire(1):
            number += 1
            msg = self.name + " number to " + str(number)
            print msg
            r_lock.acquire()
            r_lock.release()
            r_lock.release()


if __name__ == "__main__":
    # 一个线程"迭代"请求同一个资源,直接就会造成死锁
    for i in range(5):
        t = MyThreadRLock()
        t.start()


上一篇 django 部署apache     下一篇 require操作
目录导航