เป็นที่ทราบกันแล้วว่า Internet of Things หรือ IoT คือกลุ่มของ อุปกรณ์ (devices) และ บริการ (service) ทำงานและเชื่อมโยงกันบนข่ายงานเพื่อเป้าหมายบางอย่าง อุปกรณ์ในระบบ IoT จะมีอุปกรณ์ที่มีหน่วยประมวลผลกลาง (Central Processing Unit, CPU) รวมอยู่ด้วย ดังนั้นระบบ IoT ก็คือระบบที่ CPU หลายหน่วยช่วยกันทำงานนั่นเอง เมื่อมองมุมนี้ การปล่อยให้ computing units ทั้งหลายทำงานแต่เพียงรับข้อมูลจาก Sensors แล้วก็ส่งต่อไปยังเครื่องแม่ข่ายเพื่อทำ data visualization ก็คงจะเรื่องน่าเสียดาย การหาหนทางใช้งาน CPU เหล่านั้นให้คุ้มค่าโดยการนำเอาแนวคิดเรื่อง Distributed Computing มาร่วมในการออกแบบระบบ IoT ก็น่าจะเป็นการดีไม่น้อย
Distributed Computing ป็นเรื่องเกี่ยวกับการแบ่งงานที่ต้องทำออกเป็นงานย่อยหลายงาน แล้วส่งออกไปยัง Computing units ที่อยู่บนข่ายงานเดียวกันเพื่อให้ช่วยประมวลผลแล้วนำผลลัพธ์ของแต่ละส่วนมาประสานกัน เพื่อให้ได้ผลลัพธ์สุดท้ายที่ต้องการ
Distributed Computing |
เพื่อเป็นการยกตัวอย่าง จะทำการจำลองระบบงานที่มี Computing Units อยู่ 3 หน่วย คือ
- ESP8266 (CPU 32 bit ความเร็ว 80 MHz) จำนวน 2 บอร์ด
- Raspberry Pi ( 64 bit ความเร็ว 1.2 GHz) จำนวน 1 บอร์ด
ทั้งหมดเชื่อมถึงกันผ่าน Ethernet WiFi ให้มาช่วยกันทำงานเรียงลำดับตัวเลขจำนวนเต็ม 1000 จำนวนที่สร้างขึ้นมาด้วยการสุ่ม
|
|
การเรียงลำดับเลือกใช้ algorithm ที่เรียกว่า Merge Sort ซึ่งใช้แนวคิดแบบ Divide and Conquer โดยทำการแบ่งชุดของตัวเลขทั้งหมดออกเป็นชุด ๆ ทำการเรียงลำดับของแต่ละชุด จากนั้นก็นำมาเรียงกันระหว่างชุด ทำเป็นทอด ๆ ไปจนได้การเรียงลำดับทั้งหมดในที่สุด
By Swfung8 - Own work, CC BY-SA 3.0, https://commons.wikimedia.org/w/index.php?curid=14961648 |
ขั้นตอนการทำงาน
1. สร้างชุดตัวเลขจำนวน 1000 จำนวน ด้วยการสุ่ม2. แบ่งตัวเลขออกเป็นชุด ส่งให้ Computing Units ทำการเรียงลำดับ
3. Computing unit ส่งชุดตัวเลขที่เรียงลำดับแล้วกลับมามายังเครื่องแม่ข่าย
4. เครื่องแม่ข่ายทำการเรียงลำดับขั้นสุดท้าย
การสร้าง Code
1 สร้าง merge sort algorithm (Python)
def merge(left,right): #merges 2 sorted lists together
result = []
i, j = 0, 0
#Goes through both lists
while i < len(left) and j < len(right):
#Adds smaller element of the lists to the final list
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result += left[i:]
result += right[j:]
return result
def mergesort(lst):
# if list has only one element, no need to sort
if len(lst) < 2:
return lst
#breaks down list
#1) find out the middle
middle = int(len(lst) / 2)
#recursively splits and sorts each half
left = mergesort(lst[:middle])
right = mergesort(lst[middle:])
#merges both sorted lists together
m = merge(left, right)
return m
2. สร้าง utilities
def list2str(lst):
a = ','.join(str(e) for e in lst)
return a
def str2list(_str):
return eval(_str)
def break_list(src_list, num_sec):
sec_len = len(src_list)// num_sec #length of each section
res = []
for i in range(num_sec):
if i < num_sec - 1:
res.append(src_list[ i * sec_len : (i+1) * sec_len ] )
else:
res.append(src_list[ i * sec_len : ] )
return res
ฟังก์ชั่นเหล่านี้มีไว้เพื่ออำนวยความสะดวกดังนี้
- list2str(lst) ใช้เปลี่ยน Abstract Data Type แบบ List ให้กลายเป็น String เพื่อใช้ในการส่งข้อมูลผ่านข่ายงาน Ethernet
- str2list(str) ใช้เปลี่ยน String ที่รับมาจาก Ethernet ให้อยู่ในรูปแบบของ List เพื่อให้สามารถนำไปทำการเรียงลำดับได้
- break_list(src_list,num_sec) ทำการแยก List ที่กำหนดให้ออกเป็นชุด กำหนดจำนวนชุดด้วย num_sec
3. แบ่งงาน
import asyncio
@asyncio.coroutine
def sort_on_esp_1(host,port,data,result):
data = list2str(data)+EOL
reader, writer = yield from asyncio.open_connection(host, port)
writer.write(data.encode())
yield from writer.drain()
answer = yield from reader.readline()
writer.close()
result.extend(eval(answer.decode('utf8')))
@asyncio.coroutine
def sort_on_esp_2(host,port,data,result):
data = list2str(data)+EOL
reader, writer = yield from asyncio.open_connection(host, port)
writer.write(data.encode())
yield from writer.drain()
answer = yield from reader.readline()
writer.close()
result.extend(eval(answer.decode('utf8')))
@asyncio.coroutine
def sort_on_raspi(data,result):
m1 = mergesort(data)
result.extend(m1)
เพื่อให้ระบบสามารถทำงานแบบ Multitasking จึงต้องอาศัย asyncio.coroutine (https://en.wikipedia.org/wiki/Coroutine,https://docs.python.org/3/library/asyncio.html) มาช่วย (จะขอข้ามรายละเอียดไปเพราะอยู่นอกเหนือจากกรอบของเรื่อง) และขออธิบายการทำงานดังนี้
- sort_on_esp_1(host,port,data,result) ทำหน้าที่ส่งข้อมูล data ไปยัง ESP8266 ตามที่อยู่ที่กำหนดโดย host และ port แล้วนำผลลัพธ์มาใส่ไว้ใน result
- sort_on_esp_2(host,port,data,result) ทำงานเหมือนกับ sort_on_esp_1(host,port,data,result)
- sort_on_raspi(data,result) ทำหน้าที่เรียงลำดับข้อมุลบน Raspberry Pi
เหตุผลที่ต้องแยก sort_on_esp_1 และ sort_on_esp_2 ออกจากกันทั้งที่การทำงานเหมือนกัน เพราะเราต้องการให้ทั้งสองทำงานพร้อมกันนั่นเอง
4. สร้างชุดตัวเลข
arr_length = 1000
src_list = list(range(arr_length))
random.shuffle(src_list)
5. เตรียมตัวแปร
esp1 = ('192.168.0.2',8000)
esp2 = ('192.168.0.3',8000)
EOL="\r\n"
num_sect = 4
res1 = []
res2 = []
res3 = []
parted_lists = break_list(src_list,num_sect)
- ตัวแปร num_sect คือจำนวนชุดของตัวเลขที่จะถูกแบ่งออก ในตัวอย่าง เริ่มต้นทดลองที่ 4 ชุดก่อน
- ตัวแปร res1,res2,res3 เป็น List ที่ใช้เก็บชุดตัวเลขที่ได้รับจาก Computing Units แต่ละ units ในกรณีของตัวอย่าง เรามี 3 units
- ตัวแปร parted_lists เป็น List ที่จัดเก็บชุดตัวเลขที่ถูกแยกเป็นส่วน ๆ แล้ว เตรียมทำการจัดส่งออกไปยัง Computing Units ต่าง ๆ ในกรณีตัวอย่างจะได้รูปแบบของตัวแปรคือ [[...],[...],[...],[...]]
6. Main program
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
sort_on_raspi(parted_lists[:num_sect - 2],res1),
sort_on_esp_1(esp1[0],esp1[1],parted_lists[-1],res2),
sort_on_esp_2(esp2[0],esp2[1],parted_lists[-2],res3)
))
loop.close()
res = mergesort(res1+res2+res3)
ทำการรวบงานทั้งหมดให้เป็นแบบ Parallel ด้วยคำสัั่ง asyncio.gather() หลังจาได้ผลลัพธ์กลับมาแล้ว ก็ทำการเรียงลำดับทั้งหมดอีกครั้ง7. ดู Performance ซะหน่อย
โดยการปรับปรุงชุดคำสั่งในข้อ 6 สักหน่อย
import time
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(asyncio.gather(
sort_on_raspi(parted_lists[:num_sect - 2],res1),
sort_on_esp_1(esp1[0],esp1[1],parted_lists[-1],res2),
sort_on_esp_2(esp2[0],esp2[1],parted_lists[-2],res3)
))
loop.close()
res = mergesort(res1+res2+res3)
proc_time = time.time() - start
print("Time of procesing = {}".format(proc_time))
Complete Code ทำงานบน Raspberry Pi:
import time
import random
import asyncio
def merge(left,right): #merges 2 sorted lists together
result = []
i, j = 0, 0
#Goes through both lists
while i < len(left) and j < len(right):
#Adds smaller element of the lists to the final list
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result += left[i:]
result += right[j:]
return result
def mergesort(lst):
# if list has only one element, no need to sort
if len(lst) < 2:
return lst
#breaks down list
#1) find out the middle
middle = int(len(lst) / 2)
#recursively splits and sorts each half
left = mergesort(lst[:middle])
right = mergesort(lst[middle:])
#merges both sorted lists together
m = merge(left, right)
return m
def list2str(lst):
a = ','.join(str(e) for e in lst)
return a
def str2list(_str):
return eval(_str)
def break_list(src_list, num_sec):
sec_len = len(src_list)// num_sec #length of each section
res = []
for i in range(num_sec):
if i < num_sec - 1:
res.append(src_list[ i * sec_len : (i+1) * sec_len ] )
else:
res.append(src_list[ i * sec_len : ] )
return res
@asyncio.coroutine
def sort_on_esp_1(host,port,data,result):
data = list2str(data)+EOL
reader, writer = yield from asyncio.open_connection(host, port)
writer.write(data.encode())
yield from writer.drain()
answer = yield from reader.readline()
writer.close()
result.extend(eval(answer.decode('utf8')))
@asyncio.coroutine
def sort_on_esp_2(host,port,data,result):
data = list2str(data)+EOL
reader, writer = yield from asyncio.open_connection(host, port)
writer.write(data.encode())
yield from writer.drain()
answer = yield from reader.readline()
writer.close()
result.extend(eval(answer.decode('utf8')))
@asyncio.coroutine
def sort_on_raspi(data,result):
t =[]
# to join all sub-lists to be one big list
for e in data:
t.extedn(e)
m1 = mergesort(t)
result.extend(m1)
esp1 = ('192.168.0.2',8000)
esp2 = ('192.168.0.3',8000)
EOL="\r\n"
num_sect = 4
res1 = []
res2 = []
res3 = []
parted_lists = break_list(src_list,num_sect)
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(asyncio.gather(
sort_on_raspi(parted_lists[:num_sect-2],res1),
sort_on_esp_1(esp1[0],esp1[1],parted_lists[-1],res2),
sort_on_esp_2(esp2[0],esp2[1],parted_lists[-2],res3)
))
loop.close()
res = mergesort(res1+res2+res3)
proc_time = time.time() - start
print("Time of procesing = {}".format(proc_time))
Python Script นี้จะถูกติดตั้งและทำงานบนเครื่องแม่ข่าย ในกรณีตัวอย่างนี้คือ Raspberry Pi ครับ8. Esp8266
ทำหน้าที่เป็นตัวช่วยก็จะต้องติดตั้ง Algorithm Merge Sort เช่นเดียวกัน แต่ในส่วนของการสื่อสารแล้ว จะมีการเปลี่ยนแปลงเล็กน้อยดังแสดงไว้ข้างล่างนี้Complete Code (Micropython) ทำงานบน ESP8266
import utime
import uasyncio as asyncio
async def merge(left,right): #merges 2 sorted lists together
result = []
i, j = 0, 0
#Goes through both lists
while i < len(left) and j < len(right):
#Adds smaller element of the lists to the final list
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result += left[i:]
result += right[j:]
return result
async def mergesort(lst):
#if there's only 1 element, no need to sort
if len(lst) < 2:
return lst
#breaks down list into 2 halves
middle = int(len(lst) / 2)
#recursively splits and sorts each half
left = await mergesort(lst[:middle])
right = await mergesort(lst[middle:])
#merges both sorted lists together
m = await merge(left, right)
#print(m)
return m
async def list2str(lst):
a = ','.join(str(e) for e in lst)
return a
async def MServer(reader,writer):
_d = await reader.readline()
_d = _d.decode('utf8').strip()
if _d =='stop':
loop.close()
else :
_l = eval(_d)
_m = await mergesort(_l)
_a = await list2str(_m)
await writer.awrite(_a.encode('utf8'))
await writer.aclose()
await asyncio.sleep(0)
_ip = '192.168.0.2'
_port = 8000
loop = asyncio.get_event_loop()
coro=loop.create_task(asyncio.start_server(MServer,_ip,_port))
loop.run_forever()
loop.close()
มาดูส่วนที่ทำงานหลักก่อนคือ async def MServer(reader,writer) มีหน้าที่คอยรับข้อความที่ส่งเข้ามาจาก Raspberry Pi ซึ่งจะอยู่ในรูปแบบของ String ทำการแปลงให้เป็น List แล้วทำการ Sort จากนั้นก็ส่งผลลัพธ์กลับไป
มาดูผลการทำงานกัน
ภาพแสดงเวลาการทำงานเมื่อแบ่งงานออกเป็น 4 งานย่อย |
จากภาพเราจะเห็นว่าผลลัพธ์ที่ได้ก็เป็นไปตามที่ควรจะเป็น กลุ่มตัวเลข 20 (ยกมาบางส่วน) ตัวข้างบนแสดงให้เห็นตัวเลขที่ยังไม่ได้เรียงลำดับ ถัดมาจะเห็นตัวเลขของเวลาในการทำงานรวมทั้งหมดคือประมาณ 2.5วินาที ส่วนที่สุดท้ายคือกลุ่มตัวเลขที่ได้จัดเรียงเรียบร้อยแล้ว
เวลาในการทำงานค่อนข้างมากไปสักหน่อย ซึ่งอาจเนื่องมาจาก over head ในการสื่อสารผ่าน Ethernet ซึ่งเราควรจะได้ performance ที่ดีขึ้นหากเชื่อมต่อกันผ่าน Hub
สรุป
ในบทความนี้ได้พยายามยกตัวอย่างและกระบวนการทำงานเพื่อให้ผู้อ่านได้เห็นภาพพื้นฐานของการระบบ Distribute Computing ซึ่งเป็นกระบวนการสำคัญอีกกระบวนการหนึ่งสำหรับการทำงานและออกแบบ IoT ครับSign up here with your email
ConversionConversion EmoticonEmoticon