Tuesday, June 29, 2021

Parallel Processing in Python المعالجة المتوازية في بايثون





مقدمة

المعالجة المتوازية هي طريقة لتشغيل الكود على عدة معالجات في نفس الوقت، وهو يهدف إلى تقليل وقت المعالجة الإجمالي. يجب الأخذ بعين الاعتبار أن وقت المعالجة المتوازية يتضمن أيضا الوقت اللازم لنسخ الكود والبيانات إلى ذاكرة كل معالج، وكذلك الوقت اللازم للتواصل وتبادل البيانات بين العمليات الموزعة على المعالجات وهو ما يمكن أن يزيد في الوقت الإجمالي الذي تستغرقه المهام الصغيرة بدلا من تقليله. 

في لغة بايثون تُستخدم مكتبة المعالجة المتعددة (multiprocessing module) لتشغيل عمليات متوازية تستخدم المعالجات المتوفرة على الجهاز بحيث يكون لكل عملية معالج وذاكرة مستقلة.


كم عدد العمليات المتوازية القصوى التي يمكنك تشغيلها؟

الحد الأقصى لعدد العمليات التي يمكن تشغيلها في كل مرة مقيد بعدد المعالجات في جهاز الحاسب. ولمعرفة عدد المعالجات الموجودة في الجهاز، يمكن استخدام الأمر التالي:

import multiprocessing as mp
print("Number of processors: ",mp.cpu_count())


المعالجة المتوازية في بايثون

هناك مكونين في مكتبة (multiprocessing) من بايثون لتنفيذ دالة على التوازي: 


يوفر (Pool Class) إمكانية بناء تنفيذ متزامن وغير متزامن من خلال الدوال التالية:



التنفيذ المتزامن

في التنفيذ المتزامن يتم قفل العملية الأساسية بعد استدعاء العمليات المتوازية حتى تنتهي هذه العمليات وتكون نتائجها جاهزة وبالتالي تكون النتائج مرتبة حسب ترتيب استدعاء العمليات المتوازية.

التنفيذ غير المتزامن

في التنفيذ غير المتزامن لا يتم قفل العملية الأساسية بعد استدعاء العمليات المتوازية مما يسمح للعمليات بإنهاء مهامها دون مراعاة ترتيب استدعائها، ولكن قد يؤدي ذلك إلى أن تكون النتائج غير مرتبة، فيجب مراعاة ذلك عند تصميم الكود.








تطبيق على دالة لحساب عدد الأرقام الموجودة في نطاق معين

في هذه المقالة سوف نركز على شرح (Pool Class) من خلال مثال بسيط لحساب عدد الأرقام الموجودة بين نطاق معين في كل صف من مصفوفة ثنائية الأبعاد. في المثال التالي لدينا مصفوفة (3x5) ومطلوب حساب عدد القيم بين ٥ و ٨ في كل صف.




الحل بدون استخدام المعالجة المتوازية:

يتم تنفيذ الكود المتسلسل (sequential code) على نواة واحدة (core) في وحدة المعالجة المركزية كما هو موضح في الرسم التالي:


في الكود التالي يتم حساب عدد القيم الموجودة بين ٥ و ٨ في كل صف من مصفوف مكونة من ٢٠٠ ألف صف و ٨ أعمدة. ويتم تنفيذ الكود بشكل متسلسل على نواة واحدة من خلال استدعاء الدالة (count_in_range) لكل صف في المصفوفة:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import random
from time import time

# generate 200000x8 random numbers between 0 and 10
data = [random.sample(range(0, 10), 8) for _ in range(200000)]
# print the first 5 rows
for i in range(5):
  print(f"{data[i]}")

# returns how many numbers lie within maximum & minimum in a given row
def count_in_range(row, minimum, maximum):
  count = 0
  for n in row:
    if minimum <= n <= maximum:
      count = count + 1
  return count
start = time()

# do the calculation for each row in data for the range (5,8)
results = []
for row in data:
  results.append(count_in_range(row, minimum=5, maximum=8))
end = time()

# print time
print(f"Time: {end-start}")
# print the first 5 results
print(f"Results: {results[:5]}")
[3, 1, 9, 6, 4, 2, 7, 0]
[2, 3, 7, 4, 9, 6, 8, 1]
[3, 9, 6, 7, 4, 2, 0, 5]
[6, 5, 8, 9, 0, 1, 2, 7]
[8, 1, 4, 0, 6, 5, 2, 9]
Time: 0.16396498680114746
Results: [2, 3, 3, 4, 3]

الحل باستخدام المعالجة المتوازية المتزامنة:

الطريقة العامة لتنفيذ أي عملية بالتوازي تكون بأخذ الدالة التي من المفترض أن يتم تنفيذها عدة مرات وجعلها تشتغل بالتوازي على عدة معالجات ثم يتم تجميع النتائج بنفس ترتيب استدعاء العمليات كما هو موضح في الرسم التالي:




ولعمل ذلك نقوم بتهيئة (Pool) مكون من (n) من المعالجات ونمرر الدالة المطلوب تشغيلها بالتوازي إلى أحد الدوال الموجودة في (Pool) وهي:
  • apply
  • map
  • starmap
والاختلاف بين هذه الدوال يكون في طريقة استدعاء الدالة وتمرير (parameters) إليها كما سيتم توضيحه عند شرح الأكواد. ونلاحظ استخدام الشرط التالي لعزل العملية الرئيسية عن الدالة التي سيتم تنفيذها في العمليات الفرعية، حيث أن الكود في العملية الرئيسية يتم تنفيذه مرة واحدة لإنشاء العمليات الفرعية وتجميع النتائج.

if __name__ == "__main__":

استخدام دالة (apply)

استخدام الدالة (apply) يتطلب تطبيقها داخل حلقة بنفس عدد العمليات المطلوبة (عدد الصفوف في هذا المثال) بحيث تقوم باستدعاء الدالة (count_in_range) وتمرير البيانات المطلوبة، ثم تجميع النتائج بنفس الترتيب.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import random
from time import time
from multiprocessing import Pool

# returns how many numbers lie within maximum & minimum in a given row
def count_in_range(row, minimum, maximum):
  count = 0
  for n in row:
    if minimum <= n <= maximum:
      count = count + 1
  return count
if __name__ == "__main__":
    # generate 200000x8 random numbers between 0 and 10
    data = [random.sample(range(0, 10), 8) for _ in range(200000)]
    
    # print the first 5 rows
    for i in range(5):
      print(f"{data[i]}")
  
    start = time()
    
    # do the calculation for each row in data for the range (5,8)
    # using Pool.apply()
    results = []
    # Step 1: initialize Pool()
    pool = Pool()
    # Step 2: use Pool.apply() on each row
    for row in data:
      result = pool.apply(count_in_range, args=(row, 5, 8))
      results.append(result)
    # Step 3: close Pool()
    pool.close()
    
    end = time()
    # print time
    print(f"Time: {end-start}")
    # print the first 5 results
    print(f"Results: {results[:5]}")
[6, 0, 4, 9, 8, 7, 3, 2]
[2, 6, 8, 9, 0, 1, 3, 5]
[7, 9, 0, 1, 4, 2, 8, 6]
[9, 8, 4, 3, 2, 7, 0, 5]
[6, 0, 8, 3, 7, 4, 2, 5]
Time: 43.65902090072632
Results: [3, 3, 3, 3, 4]


استخدام دالة (map)

استخدام الدالة (apply) لا يتطلب تطبيقها داخل حلقة كما هو الحال في الدالة السابقة، حيث تقوم هذه الدالة بتوزيع صفوف المصفوفة آليا على العمليات.ولكن هذه الدالة تقبل فقط مدخلات من النوع (iterable) وهي هياكل البيانات التي يمكن المرور على عناصرها بواسطة حلقة مثل (list, tuple, dictionary, set). وحيث أن الدالة (count_in_range) تحتوي مدخلات غير قابلة للتكرار (minimum, maximum) فنحتاج إلى خداع هذه الدالة لتمرير هذه المدخلات باستخدام الدوال الجزئية (partial functions)، ولمعرفة المزيد عن الدوال الجزئية يمكن زيارة الرابط التالي:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import random
from time import time
from multiprocessing import Pool
from functools import partial

# returns how many numbers lie within maximum & minimum in a given row
def count_in_range(row, minimum, maximum):
  count = 0
  for n in row:
    if minimum <= n <= maximum:
      count = count + 1
  return count
if __name__ == "__main__":
    # generate 200000x8 random numbers between 0 and 10
    data = [random.sample(range(0, 10), 8) for _ in range(200000)]
    
    # print the first 5 rows
    for i in range(5):
      print(f"{data[i]}")
  
    start = time()
    
    # do the calculation for each row in data for the range (5,8)
    # using Pool.map()
   
    # Step 1: initialize Pool()
    pool = Pool()
    # Step 2: use Pool.map() on the matrix
    partial_count_in_range = partial(count_in_range, minimum=5, maximum=8)
    results = pool.map(partial_count_in_range, data)
    # Step 3: close Pool()
    pool.close()
    
    end = time()
    # print time
    print(f"Time: {end-start}")
    # print the first 5 results
    print(f"Results: {results[:5]}")
[5, 4, 7, 8, 0, 1, 6, 3]
[7, 0, 2, 3, 5, 6, 8, 4]
[4, 8, 6, 9, 1, 7, 3, 0]
[3, 0, 6, 5, 2, 1, 4, 9]
[4, 1, 2, 5, 3, 8, 6, 9]
Time: 0.35679197311401367
Results: [4, 4, 3, 2, 3]

استخدام دالة (starmap)

كما هو الحال في الدالة (map) تقبل الدالة (starmap) أيضا تقبل (iterable) واحد فقط، ولكن كل عنصر فيه هو (iterable)، وبالتالي يمكن تقديم مدخلات الدالة على شكل (tuple) يتم تفكيكه أثناء التنفيذ وإسناد كل قيمة إلى المدخل المناسب في الدالة.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import random
from time import time
from multiprocessing import Pool

# returns how many numbers lie within maximum & minimum in a given row
def count_in_range(row, minimum, maximum):
  count = 0
  for n in row:
    if minimum <= n <= maximum:
      count = count + 1
  return count
if __name__ == "__main__":
    # generate 200000x8 random numbers between 0 and 10
    data = [random.sample(range(0, 10), 8) for _ in range(200000)]
    
    # print the first 5 rows
    for i in range(5):
      print(f"{data[i]}")
    
    # prepare arguments for starmap
    args = [(row, 5, 8) for row in data]
    
    start = time()
    
    # do the calcuation for each row in data for the range (5,8)
    # using Pool.starmap()
   
    # Step 1: initialize Pool()
    pool = Pool()
    # Step 2: use Pool.map() on the matrix
    results = pool.starmap(count_in_range, args)
    pool.close()
    
    end = time()
    # print time
    print(f"Time: {end-start}")
    # print the first 5 results
    print(f"Results: {results[:5]}")
[4, 9, 3, 1, 7, 2, 6, 0]
[0, 7, 3, 9, 6, 4, 8, 5]
[1, 0, 3, 4, 7, 8, 6, 9]
[0, 9, 5, 6, 7, 4, 2, 8]
[4, 7, 5, 3, 2, 9, 6, 8]
Time: 0.34380197525024414
Results: [2, 4, 3, 4, 4]

الحل باستخدام المعالجة المتوازية غير المتزامنة:

تتيح الدوال (apply_async, map_async, starmap_async) نفس خواص الدوال السابقة بشكل غير متزامن، ونتيجة لذلك، ليس هناك ما يضمن أن النتيجة ستكون بنفس ترتيب الإدخال. والحل لهذه الإشكالية أن يتم الدالة (count_in_range) بحيث تتضمن مدخلاتها رقم الصف الذي ستجرى عليها العملية، ويتم إعادته مع عدد العناصر ضمن النطاق المطلوب على شكل (tuple). النقطة الأخرى التي يجب مراعاتها أن هذه الدوال تعيد النتائج على شكل كائن من نوع (ApplyResult/MapResult)، ويجب استخدام الدالة (get) للحصول على النتائج بالصيغة المطلوبة.

استخدام دالة (apply_async)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import random
from time import time
from multiprocessing import Pool

# returns how many numbers lie within maximum & minimum in a given row
def count_in_range(row_index, row, minimum, maximum):
  count = 0
  for n in row:
    if minimum <= n <= maximum:
      count = count + 1
  return (row_index,count)

if __name__ == "__main__":
    # generate 200000x8 random numbers between 0 and 10
    data = [random.sample(range(0, 10), 8) for _ in range(200000)]
    
    # print the first 5 rows
    for i in range(5):
      print(f"{data[i]}")
  
    start = time()
    
    # do the calcuation for each row in data for the range (5,8)
    # using Pool.apply()
    results = []
    # Step 1: initialize Pool()
    pool = Pool()
    # Step 2: use Pool.apply_async() on each row
    for i in range(200000):
      result = pool.apply_async(count_in_range, args=(i, data[i], 5, 8)).get()
      results.append(result)
    # Step 3: close Pool()
    pool.close()
    
    end = time()
    # print time
    print(f"Time: {end-start}")
    # print the first 5 results
    print(f"Results: {results[:5]}")
[4, 8, 5, 3, 2, 9, 0, 6]
[8, 9, 5, 2, 7, 3, 1, 4]
[4, 6, 1, 9, 2, 5, 7, 3]
[9, 5, 3, 0, 4, 8, 1, 7]
[2, 5, 8, 6, 0, 7, 4, 3]
Time: 41.59072303771973
Results: [(0, 3), (1, 3), (2, 3), (3, 3), (4, 4)]

استخدام دالة (starmap_async) 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import random
from time import time
from multiprocessing import Pool

# returns how many numbers lie within maximum & minimum in a given row
def count_in_range(row_index, row, minimum, maximum):
  count = 0
  for n in row:
    if minimum <= n <= maximum:
      count = count + 1
  return (row_index, count)
if __name__ == "__main__":
    # generate 200000x8 random numbers between 0 and 10
    data = [random.sample(range(0, 10), 8) for _ in range(200000)]
    
    # print the first 5 rows
    for i in range(5):
        print(f"{data[i]}")
    
    # prepare arguments for Pool.starmap_async
    args = [(i, data[i], 5, 8) for i in range(20000)]
    start = time()
    
    # do the calcuation for each row in data for the range (5,8)
    # using Pool.starmap_async()
   
    # Step 1: initialize Pool()
    pool = Pool()
    # Step 2: use Pool.starmap_async() on the matrix
    results = pool.starmap_async(count_in_range, args).get()
    pool.close()
    
    end = time()
    # print time
    print(f"Time: {end-start}")
    # print the first 5 results
    print(f"{results[:5]}")
[0, 4, 3, 2, 8, 5, 6, 1]
[2, 6, 7, 0, 9, 4, 8, 3]
[2, 9, 1, 8, 5, 6, 3, 0]
[2, 5, 4, 3, 0, 7, 1, 6]
[0, 2, 3, 9, 5, 7, 1, 8]
Time: 0.17753982543945312
[(0, 3), (1, 3), (2, 3), (3, 3), (4, 3)]


مقارنة النتائج

الجدول التالي يقارن وقت التنفيذ بين أوقات التنفيذ للأكواد التي تم شرحها. نلاحظ أن أداء الدوال غير المتزامنة (startmap_async) كان أفضل من الدوال المتزامنة (starmap)، وأن أداء أكواد المعالجة المتوازية على ٨ (cores) التي تم استخدامها في التجربة لم تتفوق على الكود الذي تم تشغيله على نواة واحدة. وهنا نتساءل عن السبب - هل هو:
  • تصميم الخوارزمية
  • هياكل البيانات المستخدمة (lists)
  • طريقة توزيع البيانات على العمليات
  • حجم البيانات المستخدمة
في تدوينة قادمة بإذن الله سأحاول أن أجيب على هذا السؤال بشكل عملي من خلال المزيد من التجارب.




Parallel Processing in Python المعالجة المتوازية في بايثون

مقدمة المعالجة المتوازية هي طريقة لتشغيل الكود على عدة معالجات في نفس الوقت، وهو يهدف إلى تقليل وقت المعالجة الإجمالي.  يجب الأخذ بعين الاعت...