У python ужасная репутация в параллельных вычислениях. Если забыть про обычные аргументы о потоках и GIL(которые довольно справедливы), настоящую проблему с параллельными вычислениями в python я вижу не в техническом аспекте, а скорее в педагогическом. Основные руководства про мультипоточность и мультипроцессинговость в Python слишком тяжеловесны. Они начинаются с каких-то глубоких вещей и заканчиваются до того, как объясняют что-то действительно полезное, применимое в ежедневных задачах.
Традиционный пример.
Беглый просмотр первых результатов DDG по запросу Python threading tutorial, демонстрирует, что практически каждый первый из них говорит об одном и том же и базируется на похожей Class + Queue модели.
import time
import threading
import Queue
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
# queue.get() блокирует текущий поток до получения данных
msg = self._queue.get()
# Проверяем, является ли текущее сообщение "Poison Pill"
if isinstance(msg, str) and msg == 'quit':
# Если является, выходим из цикла
break
# Обработка(в данном случае печать) элемента очереди
print "I'm a thread, and I received %s!!" % msg
# Всегда будь дружелюбен!
print 'Bye byes!'
def Producer():
# Queue используется для доступа к данным из разных потоков
queue = Queue.Queue()
# Создаём воркер
worker = Consumer(queue)
# Вызываем внутренний run() метод, чтобы запустить поток
worker.start()
# переменная, чтобы следить когда мы начали
start_time = time.time()
# Пока меньше 5 секунд
while time.time() - start_time < 5:
# Вычислить часть данных и положить их в очередь
queue.put('something at %s' % time.time())
# Вздремнуть немного, чтобы избежать огромного количества сообщений
time.sleep(1)
# Используем "poison pill" метод, чтобы убить поток
queue.put('quit')
# Ждем, пока поток закроется
worker.join()
if __name__ == '__main__':
Producer()
Хм, да тут пахнет Java.
Но я не хочу создать впечатление, как будто я думаю, что Producer/Consumer способ взаимодействия с мультипоточностью/мультипроцессностью неправильный - потому что это определённо не так. Это отличный подход для целого ряда задач. Тем не менее, я практически уверен, что это не самый лучший вариант для ежедневных скриптовых задач.
Проблемы(как я их вижу).
Во-первых, вам нужен класс, который по сути не делает ничего полезного. Во-вторых, вы должны обслуживать очередь через которую вы должны передавать объекты, и в конце концов, вам нужны методы на обоих концах цепочки, чтобы делать собственно то, что вам нужно.(например использовать другую очередь, если вы хотите сохранить результаты)
Больше обработчиков - больше проблем
Теперь вы захотите сделать набор таких обработчиков, чтобы попытаться выжать какое-то ускорение из Python. Ниже пример кода из отличного учебника от IBM по многопоточности. Это очень распространённый сценарий, где вы решаете задачу загрузить веб страницы в нескольких потоках.
import time
import threading
import Queue
import urllib2
class Consumer(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
response = urllib2.urlopen(content)
print 'Bye byes!'
def Producer():
urls = [
'http://www.python.org', 'http://www.yahoo.com'
'http://www.scala.org', 'http://www.google.com'
# и т.д.
]
queue = Queue.Queue()
worker_threads = build_worker_pool(queue, 4)
start_time = time.time()
# Добавляем ссылки на обработку
for url in urls:
queue.put(url)
# Добавляем poison pill
for worker in worker_threads:
queue.put('quit')
for worker in worker_threads:
worker.join()
print 'Done! Time taken: {}'.format(time.time() - start_time)
def build_worker_pool(queue, size):
workers = []
for _ in range(size):
worker = Consumer(queue)
worker.start()
workers.append(worker)
return workers
if __name__ == '__main__':
Producer()
Работает прекрасно, но посмотрите на весь этот код! Вы должны написать методы, списки потоков, чтобы следить за ними, и что хуже всего, если вы склонны к dead-lock также как и я, куча конструкций join дадут вам возможность ошибиться. И это становится всё сложнее и сложнее.
Чего мы добились, написав столько кода? Практически ничего. Весь этот код это чистая обвязка, он не несёт полезной нагрузки и содержит много мест, чтобы в них ошибиться(Чёрт! Да я забыл вызвать task_done() у объекта очереди, пока писал это(Я слишком ленив, чтобы исправить это и сделать другой скриншот)) и это слишком много кода, который делает слишком мало. К счастью, есть способ лучше.
Вступление: Map
Map это небольшая классная функция, и ключ к простому добавлению распараллеливания в ваш код на Python. Для тех, кто с ней не знаком, map это что-то пришедшее из функциональных языков, таких как Lisp. Это функция, которая сопоставляет другую функцию последовательности, например так:
urls = ['http://www.yahoo.com', 'http://www.reddit.com']
results = map(urllib2.urlopen, urls)
Это применяет метод urlopen к каждому элементу последовательности и сохраняет все результаты в список. Это более-менее эквивалентно такому:
results = []
for url in urls:
results.append(urllib2.urlopen(url))
Map совершает итерацию по последовательности, применяет фукнцию и сохраняет результы в список.
Что это даёт нам? Дело в том, что с правильными библиотеками, map может выполнять это параллельно поразительно просто.
Параллельная версия map предоставляется двумя библиотеками: multiprocessing, и её малоизвестный, но в равной степени фантастический потомок multiprocessing.dummy.
Небольшое отступление: Как так? Никогда не слышали о потоковом клоне multiproccessing названном dummy? Я тоже не слышал до самого недавнего времени. В доментации на multiprocessing есть всего одно предложение о нём. И оно сводится к "О да, и такое есть". Это слишком скромно для него, поверьте мне.
Dummy это точный клон модуля multiprocessing. Единственное отличие в том, что multiprocessing работает с процессами, а dummy использует потоки(что приносит все их ограничения. Таким образом всё, что применимо к одному, можно применить и к другому. Что особенно хорошо, для исследовательских задач, когда вы не до конца уверены, вызов какого-то фреймворка будет потреблять CPU или IO.
Начнём.
Чтобы получить доступ к параллельным версиям map, первым делом нужно импортировать модули, которые её содержат.
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
И создать набор объектов
Эта короткая инструкция делает всё, что мы делали за 7 строк в функции build_worker_pool из example2.py. А именно, она создаёт набор обработчиков, запускает их и сохраняет в переменной, для простого доступа.
Набор объектов принимает несколько параметров, но сейчас нас интересует только первый: processes. Он устанавливает количество обработчиков в наборе. По умолчанию он равен количеству ядер в вашем процессоре.
В общем случае, при использовании мультипроцессинга для решения требовательных к процессорному времени задач, больше ядер означает быстрее.(Но есть много но). В других случаях, при использовании мультипоточности и задач связанных с сетью, всё может быть по разному, и можно поэкспериментировать с размером набора обработчиков.
pool = ThreadPool(4) # Устанавливает размер набора в 4
Если вы запустите слишком много потоков, больше времени будет тратиться на переключение между ними, чем на полезную работу, поэтому имеет смысл поиграть со значением, чтобы найти оптимальное для этой задачи.
Теперь обработчики созданы и простая параллелизация буквально у нас на кончиках пальцев, давайте перепишем задачу из example2.py
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
# etc..
]
# Создаём набор обработчиков
pool = ThreadPool(4)
# Открываем ссылки в собственных потоках и возвращаем результаты
results = pool.map(urllib2.urlopen, urls)
#закрываем набор и ждём, пока работа будет закончена
pool.close()
pool.join()
И что у нас получилось? Код, который решает задачу состоит из 4 строк, 3 из которых только подготовка к реальной работе. Вызов map с лёгкостью делает всё то же, что и наш предыдущий 40 строчный пример. Ради смеха, я замерил время выполнения обоих подходов с разным размером набора обработчиков.
Sigle thread: 14.4 seconds
4 Pool: 3.1 seconds
8 Pool: 1.4 seconds
13 Pool: 1.3 seconds
Потрясающе! И также показывает, что правильно поиграть со значением размера набора обработчиков. На моей машине любое значение больше 9 минимизует время выполнения.
Второй пример из реального мира
Изготовление тысяч миниатюр
Давайте сделаем что-то, что потребляет много процессора. Подходящая задача для меня это манипуляция большими папками с изображениями. Одна из трансформаций это создание миниатюр.
Для одного процесса, без распараллеливания.
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
for image in images:
create_thumbnail(Image)
Немного подправлено для примера, в реальности, папка передаётся в программу, собираются все изображения в ней, создаются миниатюры и кладутся в свою отдельную папку.
На моём компьютере, это занимает 27.9 секунд для обработки 6000 изображений.
Если мы заменим цикл for на вызов параллельного map:
import os
import PIL
from multiprocessing import Pool
from PIL import Image
SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'
def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpeg' in f)
def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base, fname = os.path.split(filename)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)
if __name__ == '__main__':
folder = os.path.abspath(
'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
images = get_image_paths(folder)
pool = Pool()
pool.map(create_thumbnail, images)
pool.close()
pool.join()
5.6 секунд!
Это очень заметное ускорение, полученное заменой лишь нескольких строк кода. Продакшен версия этого даже еще быстрее, потому что задачи потребляющие процессор и ввод-вывод, разделены там в соотвествующие отдельные процессы и потоки. Однако, благодаря прозрачной природе map и отсутствию необходимости ручного управления потоками, на самом деле просто смешивать и сочетать эти подходы, и результат получается простой, надёжный и легко поддающийся отладке.
Собственно это оно. Параллельные вычисления одной(практически) строкой.