Distributed Computing อย่างง่าย ๆ ด้วย Python และ MicroPython


เป็นที่ทราบกันแล้วว่า Internet of Things หรือ IoT คือกลุ่มของ อุปกรณ์ (devices) และ บริการ (service) ทำงานและเชื่อมโยงกันบนข่ายงานเพื่อเป้าหมายบางอย่าง อุปกรณ์ในระบบ  IoT จะมีอุปกรณ์ที่มีหน่วยประมวลผลกลาง (Central Processing Unit, CPU) รวมอยู่ด้วย ดังนั้นระบบ IoT ก็คือระบบที่ CPU หลายหน่วยช่วยกันทำงานนั่นเอง  เมื่อมองมุมนี้ การปล่อยให้ computing units ทั้งหลายทำงานแต่เพียงรับข้อมูลจาก Sensors แล้วก็ส่งต่อไปยังเครื่องแม่ข่ายเพื่อทำ data visualization  ก็คงจะเรื่องน่าเสียดาย การหาหนทางใช้งาน CPU เหล่านั้นให้คุ้มค่าโดยการนำเอาแนวคิดเรื่อง Distributed Computing มาร่วมในการออกแบบระบบ IoT ก็น่าจะเป็นการดีไม่น้อย

 Distributed Computing ป็นเรื่องเกี่ยวกับการแบ่งงานที่ต้องทำออกเป็นงานย่อยหลายงาน แล้วส่งออกไปยัง Computing units ที่อยู่บนข่ายงานเดียวกันเพื่อให้ช่วยประมวลผลแล้วนำผลลัพธ์ของแต่ละส่วนมาประสานกัน เพื่อให้ได้ผลลัพธ์สุดท้ายที่ต้องการ

Distributed Computing


เพื่อเป็นการยกตัวอย่าง จะทำการจำลองระบบงานที่มี Computing Units อยู่ 3 หน่วย คือ

  • ESP8266 (CPU 32 bit ความเร็ว 80 MHz) จำนวน 2 บอร์ด
  • Raspberry Pi ( 64 bit ความเร็ว 1.2 GHz) จำนวน 1 บอร์ด

ทั้งหมดเชื่อมถึงกันผ่าน Ethernet  WiFi ให้มาช่วยกันทำงานเรียงลำดับตัวเลขจำนวนเต็ม 1000 จำนวนที่สร้างขึ้นมาด้วยการสุ่ม



ESP8266
Raspberry Pi 3 Model B


 การเรียงลำดับเลือกใช้ algorithm ที่เรียกว่า Merge Sort   ซึ่งใช้แนวคิดแบบ Divide and Conquer  โดยทำการแบ่งชุดของตัวเลขทั้งหมดออกเป็นชุด ๆ  ทำการเรียงลำดับของแต่ละชุด จากนั้นก็นำมาเรียงกันระหว่างชุด ทำเป็นทอด ๆ ไปจนได้การเรียงลำดับทั้งหมดในที่สุด


By Swfung8 - Own work, CC BY-SA 3.0, https://commons.wikimedia.org/w/index.php?curid=14961648



ขั้นตอนการทำงาน  

1. สร้างชุดตัวเลขจำนวน 1000 จำนวน ด้วยการสุ่ม
2. แบ่งตัวเลขออกเป็นชุด ส่งให้ Computing Units ทำการเรียงลำดับ
3. Computing unit ส่งชุดตัวเลขที่เรียงลำดับแล้วกลับมามายังเครื่องแม่ข่าย
4. เครื่องแม่ข่ายทำการเรียงลำดับขั้นสุดท้าย






การสร้าง Code

1 สร้าง merge sort algorithm (Python)



def merge(left,right): #merges 2 sorted lists together 
 result = [] 
 i, j = 0, 0 

 #Goes through both lists 
 while i < len(left) and j < len(right): 
  #Adds smaller element of the lists to the final list 
  if left[i] <= right[j]:
   result.append(left[i]) 
   i += 1
  else:
   result.append(right[j]) 
   j += 1
 result += left[i:] 
 result += right[j:] 

 return result


def mergesort(lst):
 # if list has only one element, no need to sort
 if len(lst) < 2:
  return lst

 #breaks down list 
        #1) find out the middle
 middle = int(len(lst) / 2)

 #recursively splits and sorts each half 
 left =  mergesort(lst[:middle])
 right = mergesort(lst[middle:])
 
 #merges both sorted lists together 
 m = merge(left, right)
 return m


2. สร้าง utilities


def list2str(lst):
 a = ','.join(str(e) for e in lst)
 return a
 

def str2list(_str):
 return eval(_str)
 

def break_list(src_list, num_sec):
 sec_len = len(src_list)// num_sec #length of each section 
 res = [] 
 for i in range(num_sec):
  if i < num_sec - 1:
   res.append(src_list[ i * sec_len : (i+1) * sec_len ] )
  else:
   res.append(src_list[ i * sec_len : ] )
 return res



ฟังก์ชั่นเหล่านี้มีไว้เพื่ออำนวยความสะดวกดังนี้

  • list2str(lst)  ใช้เปลี่ยน Abstract Data Type แบบ  List ให้กลายเป็น String เพื่อใช้ในการส่งข้อมูลผ่านข่ายงาน Ethernet
  • str2list(str) ใช้เปลี่ยน String ที่รับมาจาก Ethernet ให้อยู่ในรูปแบบของ List เพื่อให้สามารถนำไปทำการเรียงลำดับได้
  • break_list(src_list,num_sec) ทำการแยก List ที่กำหนดให้ออกเป็นชุด กำหนดจำนวนชุดด้วย  num_sec

3. แบ่งงาน


import asyncio

@asyncio.coroutine
def sort_on_esp_1(host,port,data,result):
 data = list2str(data)+EOL
 reader, writer = yield from asyncio.open_connection(host, port)
 writer.write(data.encode())
 yield from writer.drain()
 answer = yield from reader.readline()
 writer.close() 
 result.extend(eval(answer.decode('utf8')))
 
@asyncio.coroutine
def sort_on_esp_2(host,port,data,result):
 data = list2str(data)+EOL
 reader, writer = yield from asyncio.open_connection(host, port)
 writer.write(data.encode())
 yield from writer.drain()
 answer = yield from reader.readline()
 writer.close() 
 result.extend(eval(answer.decode('utf8')))

@asyncio.coroutine
def sort_on_raspi(data,result): 
 m1 = mergesort(data)
 result.extend(m1)



เพื่อให้ระบบสามารถทำงานแบบ Multitasking จึงต้องอาศัย asyncio.coroutine (https://en.wikipedia.org/wiki/Coroutine,https://docs.python.org/3/library/asyncio.html) มาช่วย (จะขอข้ามรายละเอียดไปเพราะอยู่นอกเหนือจากกรอบของเรื่อง) และขออธิบายการทำงานดังนี้

  • sort_on_esp_1(host,port,data,result) ทำหน้าที่ส่งข้อมูล data ไปยัง ESP8266 ตามที่อยู่ที่กำหนดโดย host และ port แล้วนำผลลัพธ์มาใส่ไว้ใน result
  • sort_on_esp_2(host,port,data,result) ทำงานเหมือนกับ sort_on_esp_1(host,port,data,result) 
  • sort_on_raspi(data,result) ทำหน้าที่เรียงลำดับข้อมุลบน Raspberry Pi 


เหตุผลที่ต้องแยก sort_on_esp_1 และ sort_on_esp_2 ออกจากกันทั้งที่การทำงานเหมือนกัน เพราะเราต้องการให้ทั้งสองทำงานพร้อมกันนั่นเอง

4. สร้างชุดตัวเลข

arr_length = 1000
src_list = list(range(arr_length))
random.shuffle(src_list)


5. เตรียมตัวแปร


esp1 = ('192.168.0.2',8000)
esp2 = ('192.168.0.3',8000)
EOL="\r\n"
num_sect = 4
res1 = []
res2 = []
res3 = []
parted_lists = break_list(src_list,num_sect)

  • ตัวแปร num_sect คือจำนวนชุดของตัวเลขที่จะถูกแบ่งออก ในตัวอย่าง เริ่มต้นทดลองที่ 4 ชุดก่อน
  • ตัวแปร res1,res2,res3 เป็น List ที่ใช้เก็บชุดตัวเลขที่ได้รับจาก Computing Units แต่ละ units ในกรณีของตัวอย่าง เรามี 3 units 
  • ตัวแปร parted_lists เป็น List ที่จัดเก็บชุดตัวเลขที่ถูกแยกเป็นส่วน ๆ แล้ว เตรียมทำการจัดส่งออกไปยัง Computing Units ต่าง ๆ ในกรณีตัวอย่างจะได้รูปแบบของตัวแปรคือ  [[...],[...],[...],[...]]

6.  Main program


loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.gather(
    sort_on_raspi(parted_lists[:num_sect - 2],res1),
    sort_on_esp_1(esp1[0],esp1[1],parted_lists[-1],res2),
    sort_on_esp_2(esp2[0],esp2[1],parted_lists[-2],res3)
))
loop.close()
res = mergesort(res1+res2+res3)

ทำการรวบงานทั้งหมดให้เป็นแบบ Parallel ด้วยคำสัั่ง asyncio.gather()  หลังจาได้ผลลัพธ์กลับมาแล้ว ก็ทำการเรียงลำดับทั้งหมดอีกครั้ง

7. ดู Performance ซะหน่อย

โดยการปรับปรุงชุดคำสั่งในข้อ 6 สักหน่อย

import time

loop = asyncio.get_event_loop() 
start = time.time()
loop.run_until_complete(asyncio.gather(
    sort_on_raspi(parted_lists[:num_sect - 2],res1),
    sort_on_esp_1(esp1[0],esp1[1],parted_lists[-1],res2),
    sort_on_esp_2(esp2[0],esp2[1],parted_lists[-2],res3)
))
loop.close()
res = mergesort(res1+res2+res3)
proc_time = time.time() - start
print("Time of procesing = {}".format(proc_time))


Complete Code ทำงานบน Raspberry Pi:

import time 
import random
import asyncio


def merge(left,right): #merges 2 sorted lists together 
 result = [] 
 i, j = 0, 0 

 #Goes through both lists 
 while i < len(left) and j < len(right): 
  #Adds smaller element of the lists to the final list 
  if left[i] <= right[j]:
   result.append(left[i]) 
   i += 1
  else:
   result.append(right[j]) 
   j += 1
 result += left[i:] 
 result += right[j:] 

 return result


def mergesort(lst):
 # if list has only one element, no need to sort
 if len(lst) < 2:
  return lst

 #breaks down list 
        #1) find out the middle
 middle = int(len(lst) / 2)

 #recursively splits and sorts each half 
 left =  mergesort(lst[:middle])
 right = mergesort(lst[middle:])
 
 #merges both sorted lists together 
 m = merge(left, right)
 return m

def list2str(lst):
 a = ','.join(str(e) for e in lst)
 return a
 

def str2list(_str):
 return eval(_str)
 

def break_list(src_list, num_sec):
 sec_len = len(src_list)// num_sec #length of each section 
 res = [] 
 for i in range(num_sec):
  if i < num_sec - 1:
   res.append(src_list[ i * sec_len : (i+1) * sec_len ] )
  else:
   res.append(src_list[ i * sec_len : ] )
 return res



@asyncio.coroutine
def sort_on_esp_1(host,port,data,result):
 data = list2str(data)+EOL
 reader, writer = yield from asyncio.open_connection(host, port)
 writer.write(data.encode())
 yield from writer.drain()
 answer = yield from reader.readline()
 writer.close() 
 result.extend(eval(answer.decode('utf8')))
 
@asyncio.coroutine
def sort_on_esp_2(host,port,data,result):
 data = list2str(data)+EOL
 reader, writer = yield from asyncio.open_connection(host, port)
 writer.write(data.encode())
 yield from writer.drain()
 answer = yield from reader.readline()
 writer.close() 
 result.extend(eval(answer.decode('utf8')))

@asyncio.coroutine
def sort_on_raspi(data,result): 
        t =[]
        # to join all sub-lists to be one big list 
        for e in data:
             t.extedn(e)
 m1 = mergesort(t)
 result.extend(m1)
 
 
esp1 = ('192.168.0.2',8000)
esp2 = ('192.168.0.3',8000)
EOL="\r\n"
num_sect = 4
res1 = []
res2 = []
res3 = []
parted_lists = break_list(src_list,num_sect)


loop = asyncio.get_event_loop() 
start = time.time()
loop.run_until_complete(asyncio.gather(
 sort_on_raspi(parted_lists[:num_sect-2],res1),
 sort_on_esp_1(esp1[0],esp1[1],parted_lists[-1],res2),
 sort_on_esp_2(esp2[0],esp2[1],parted_lists[-2],res3)
))
loop.close()
res = mergesort(res1+res2+res3)
proc_time = time.time() - start
print("Time of procesing = {}".format(proc_time))

Python Script นี้จะถูกติดตั้งและทำงานบนเครื่องแม่ข่าย ในกรณีตัวอย่างนี้คือ Raspberry Pi ครับ

8.  Esp8266 

ทำหน้าที่เป็นตัวช่วยก็จะต้องติดตั้ง Algorithm Merge Sort  เช่นเดียวกัน  แต่ในส่วนของการสื่อสารแล้ว จะมีการเปลี่ยนแปลงเล็กน้อยดังแสดงไว้ข้างล่างนี้
Complete Code (Micropython) ทำงานบน ESP8266

import utime 
import uasyncio as asyncio

async def merge(left,right): #merges 2 sorted lists together 
 result = [] 
 i, j = 0, 0 
 #Goes through both lists 
 while i < len(left) and j < len(right): 
  #Adds smaller element of the lists to the final list 
  if left[i] <= right[j]:
   result.append(left[i]) 
   i += 1
  else:
   result.append(right[j]) 
   j += 1
 result += left[i:] 
 result += right[j:] 
 return result

async def mergesort(lst):
 #if there's only 1 element, no need to sort 
 if len(lst) < 2:
  return lst
 #breaks down list into 2 halves 
 middle = int(len(lst) / 2)
 #recursively splits and sorts each half 
 left = await mergesort(lst[:middle])
 right = await mergesort(lst[middle:])
 
 #merges both sorted lists together 
 m = await merge(left, right)
 #print(m)
 return m 

async def list2str(lst):
 a = ','.join(str(e) for e in lst)
 return a
 
async def MServer(reader,writer):
 _d = await reader.readline()
 _d = _d.decode('utf8').strip()
 if _d =='stop':
  loop.close()
 else :
  _l = eval(_d)  
  _m = await mergesort(_l) 
  _a  = await list2str(_m)
  await writer.awrite(_a.encode('utf8'))
  await writer.aclose()   
 await asyncio.sleep(0)

_ip = '192.168.0.2'
_port = 8000
loop = asyncio.get_event_loop()
coro=loop.create_task(asyncio.start_server(MServer,_ip,_port))
loop.run_forever() 
loop.close()


มาดูส่วนที่ทำงานหลักก่อนคือ async def MServer(reader,writer) มีหน้าที่คอยรับข้อความที่ส่งเข้ามาจาก Raspberry Pi ซึ่งจะอยู่ในรูปแบบของ String ทำการแปลงให้เป็น List แล้วทำการ Sort จากนั้นก็ส่งผลลัพธ์กลับไป


มาดูผลการทำงานกัน


ภาพแสดงเวลาการทำงานเมื่อแบ่งงานออกเป็น 4 งานย่อย 

จากภาพเราจะเห็นว่าผลลัพธ์ที่ได้ก็เป็นไปตามที่ควรจะเป็น กลุ่มตัวเลข 20 (ยกมาบางส่วน) ตัวข้างบนแสดงให้เห็นตัวเลขที่ยังไม่ได้เรียงลำดับ  ถัดมาจะเห็นตัวเลขของเวลาในการทำงานรวมทั้งหมดคือประมาณ 2.5วินาที ส่วนที่สุดท้ายคือกลุ่มตัวเลขที่ได้จัดเรียงเรียบร้อยแล้ว

เวลาในการทำงานค่อนข้างมากไปสักหน่อย ซึ่งอาจเนื่องมาจาก over head ในการสื่อสารผ่าน Ethernet ซึ่งเราควรจะได้ performance ที่ดีขึ้นหากเชื่อมต่อกันผ่าน Hub




มาดูกราฟกันสักหน่อย แกนตั้งคือเวลาในการทำงานหน่วยเป็นวินาทีและแกนนอนคือจำนวนงานย่อยที่ถูกแบ่งออก จากการทดลองจะเห็นว่าหากเราแบ่งงานออกเป็นจำนวนงานย่อยมากขึ้นจะส่งผลต่อเวลาการทำงานที่ลดลง แต่พอไปถึงจุดหนึ่งแล้วเวลาจะไม่ลดลงแล้ว นั่นอาจอธิบายได้ว่า ยิ่งงานมีขนาดเล็กลงเวลาในการทำงานของอุปกรณ์ก็จะใช้น้อยลงด้วย (อุปกรณ์ใน IoT มักมีทรัพยากรน้อย) แต่หากแบ่งย่อยลงไปเรื่อย ๆ เวลาที่ใช้ในการประมวลผลอาจลดลงในแต่ละอุปกรณ์ก็จริงแต่เวลาเวลาในการสื่อสารอาจไม่ลดลงตามจนทำให้เกิดผลที่ชัดเจน

สรุป

ในบทความนี้ได้พยายามยกตัวอย่างและกระบวนการทำงานเพื่อให้ผู้อ่านได้เห็นภาพพื้นฐานของการระบบ Distribute Computing ซึ่งเป็นกระบวนการสำคัญอีกกระบวนการหนึ่งสำหรับการทำงานและออกแบบ IoT ครับ



Previous
Next Post »