[python] for / while / generator / yield / coroutine / futures / asyncio

markyang92·2021년 4월 23일
0

python

목록 보기
15/43
post-thumbnail

for

  • Python for문의 주의점
    • Python한번의 Cycle이후, 기준이 변경된지 모름 (-_-);!!
  • C

  • Python: for의 진행, 비진행 standard의 dynamic 변경을 눈치 채지 못한다.

  • Solution!
    • 그래서 필자는 그냥 아래처럼 쓴다.

range

i=0; i<10; i++

for i in range(10): # for(int i=0 ; i<10 ; i++)
    print(i)	# i=0, 1, 2, 3, 4, 5, 6, 7, 8, 9
    
for i in range(0,10):
    print(i)

i=5; i<10; i++

for i in range(5, 10):	# for(i=5 ; i< 10; i++)
    print(i)	# i=5, 6, 7, 8, 9 

i=10, i>0; i--

for i in range(10, 0, -1):
    print(i)
    
--- 출력 ---
10
9
8
7
6
5
4
3
2
1

for idx,val in enumerate( )

arr=[1,2,3,4,5]

for idx,val in enumerate(arr):
    print(idx, val)

generator, yield

  • generator는 iterator와 같은 loop를 컨트롤하기 위해 쓰여지는 특별한 함수이다.
  • yield는 한 번 호출될 때, 한번의 return
  • 결국 generator, yield를 잘 사용하면 memory 절약
    • 특히 for문에서 반복문 때, 위처럼 in range를 사용한다. 하지만, range(0,100)과 같은 경우 100 size의 list를 만든다. 이는 분명 의미 없는 메모리 낭비 이다.

  • 수 많은 자료를 돌아다니면서, 반복문을 돌려야하면 Generator를 사용하자.


yield

  • yield함수 실행 중간에 빠져 나올 수 있는 generator를 만들 때, 사용한다.
    • yieldreturn 이라고 생각하자!!
    • 하지만 return은 아예, StopIteration Exception을 만들어 빠져나오지만,
      yield는 잠시 대기한다. yield도 iterator를 다돌면, StopIteration Exception


gen을 사용한 for문

# 1.
def gen_func(nums):
	for i in nums:
    	yield i * i


for i in gen_func([1,2,3,4,5]):
	print(i)
    
    
-----
# 2. 
gen_list = (x*x for x in [1,2,3,4,5]) # not [x*x for x in [1,2,3,4,5]]
for i in gen_list:
    print(i)

# 1.
def gen_func(nums):
	for i in nums:
        yield i * i
        
arr = [1,2,3,4,5]

for i in gen_func(arr):
    print(i)
    
-------
# 2. 

arr = [1,2,3,4,5]

gen_list = (x*x for x in arr)

for i in gen_list:
    print(i)


-------
# 3.
arr = (x for x in range(100))
print(arr) ==> generator<0x100000>

next(arr) ====> 0



while

a = []
print(a)	# false


i =0
while True:
    if not a:
        break
    print(a[i])
    i+=1

이터레이터 제어


concurrent: 코루틴

  • subroutine
    • main에서 호출 -> subroutine 에서 수행(흐름제어)
  • 코루틴(Coroutine)
    • yieldmainsubroutine이 왔다갔다.
    • 루틴 실행 중 중지 -> concurrency
    • 장점: 쓰레드에 비해 오버헤드 감소
      • 멀티 쓰레드가 너무 복잡하고, 교착 상태, 컨텍스트 스위칭 비용 발생 하니까 코루틴으로 변경되는 추세



  • yield 에서 멈춰있는데, 또 next() 를 호출하면 yieldNone 반환

send

  • send()를 사용해 main 루틴에서 yield 로 전달 가능


coroutine 상태

  1. GEN_CREATED: 처음 대기 상태
  2. GEN_RUNNING: 실행 상태
  3. GEN_SUSPENDED: yield 대기 상태
  4. GEN_CLOSED: 실행 완료 상태

async, await

  1. Python 3 >=3.5
  • def => async
  • yield => await

Async I/O

  • 동기, 비동기는 Sync/Async/Block/Non-Block 글 참고
    • Non Block상태: caller가 어떤 함수를 call 했는데 callee 도 thread 등으로 자체 제어권을 가지고 있어서,
      caller가 자기 제어권으로 계속 일을 수행함
  • callee결과 값을 주던 안주던~~ 제어권각자가지고 있음
  • 하지만, callee값이 Sync 되지 않았다면 Async!
    • callee는 일을 다하면, caller에게 이를 알리면 sync

Async I/O의 종류

  1. 연산 중심의 비동기 함수
  2. I/O 중심의 비동기 함수
    2.1 File I/O: 디스크로 부터 파일을 읽어 RAM에 올리는 걸 CPU명령이 기다리면, 비효율 끝판왕

pip asyncio 설치

$ python3 -m pip install asyncio
$ python3 -m pip install beautifulsoup4
  • asyncio: async, await의 syntax를 사용한 비동기 IO를 위한 library
  • coroutine 에서, network나 파일 I/O를 위해 따로 High-level로 만든 것이, asyncio

asyncio 사용

1. future 객체리스트 내 블록킹 함수를 사용해도, Non-block IO

await (= yield) 'Non-block 하고픈 함수 사용'


import asyncio
from urllib.request import urlopen
from concurrent.futures import ThreadPoolExecutor
import threading

urls = ['https://naver.com', 'http://daum.net']

async def fetch(url, executor):
	# 스레드명 출력
    print('Thread Name : ', threading.current_thread().getName(), 'Start', url)
    # 실행
    res = await loop.run_in_executor(executor, urlopen, url) # loop는 main에서 선언했기때문에 참조할 수 있다.
    
    print('Thread Name : ', threading.current_thread().getName(), 'Done', url)
    # 결과 반환
    return res.read()

async def main():
    # async: 나는 비 동기 함수야!
    # 스레드 풀 생성
    executor = ThreadPoolExecutor(max_workers=10)
    
    # future 객체 모아서 gather에서 실행
    futures = [
        asyncio.ensure_future(fetch(url,executor)) for url in urls # 실행할 함수를 쌍으로 넣으면된다. # 각 사이트에 각 스레드가 실행된다.
    ]
    
    # 결과 취함
    res = await asyncio.gather(*futures) # futures가 끝날때까지 기다린다. (*는 그냥 unpacking)
    
    print('Result : ', res)

if __name__ == '__main__':
    loop = asyncio.get_event_loop() # 루프 초기화
    loop.run_until_complete(main()) # 작업완료까지 대기. 끝날 때 까지 이 루프는 계속된다.
    

1-1. beautifulsoup4 사용해서, html에 원하는 Tag 찾기

$ pip3 install beautifulsoup4
  • import
from bs4 import Beautifulsoup4


1. sync 및 blocking 작업 예

  • getpage(x) 마다 동기 및 blocking 함수 이기 때문에, 5초 소요

2. async 및 non-blocking 작업 예

  • 최종적으로 1.0x초 소요
#!/usr/bin/env python3
import time
import asyncio

async def getpage(url):
    s=time.time()
    await asyncio.sleep(1)    # Bg 대기
    print('%d' % (url))
    e=time.time()
    print(e-s)

async def main():
    await asyncio.gather( 
        getpage(1), 
        getpage(2), 
        getpage(3), 
        getpage(4), 
        getpage(5)
    )


loop=asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

3. sync 및 blocking get URL

  • requests 모듈: url로 부터 text, html src를 받을 수 있음
    • 모듈 설치: $ pip3 install requests
    • requests.get(url)은 Blocking 함수


      0.94초 소요

4. async 및 non-blocking get URL

  • 0.21초 소요
    • loop.run_in_executor(): sync함수 -> async
#!/usr/bin/env python3

import requests
import asyncio
import time

async def getpage(url):
    req=await loop.run_in_executor(None, requests.get, url)
    html=req.text
    print(len(html))

async def main():
    await asyncio.gather(
        getpage("https://www.naver.com"),
        getpage("https://www.naver.com"),
        getpage("https://www.naver.com"),
        getpage("https://www.naver.com"),
        getpage("https://www.naver.com")
    )

s=time.time()

loop=asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()


e=time.time()

print(e-s)

async subprocess

예제 1.

import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

if __name__ == '__main__':
    asyncio.run(run('echo helloworld'))

# === 출력 === 
['echo helloworld' exited with 0]
[stdout]
helloworld

예제 2. 2개의 async subprocess

  • 모든 asyncio 서브프로세스 함수는 비동기 이고, asyncio가 이러한 함수로 작업 할 수 있는 많은 도구를 제공한다.
  • 여러 명령 동시 실행은 아래처럼 한다.
import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')


async def async_main():
    await asyncio.gather(
        run('ls'),
        run('sleep 1; echo "hello"')
    )

if __name__ == '__main__':
    asyncio.run(async_main())
  • 결과

서브프로세스 만들기

coroutine asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)

주의 Deprecated since version 3.8, removed in version 3.10: The loop parameter.


coroutine asyncio.create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, limit=None, **kwds)

argumentdescription
cmd셸 명령을 실행한다.
limitProcess.stdoutProcess.stderr에 대한 StreamReader 래퍼의 버퍼 한계 설정
returndescription
Process 인스턴스성공

concurrent.futures

  • Python 3 >= 3.2
  • 비동기 실행을 위한 API를 고수준으로 작성 -> 사용하기 쉽도록 개선

GIL

  • Python만의 독특한, 글로벌 인터프린터 lock
    • 두 개 이상의 스레드가 동시에 실행될 때, 하나의 자원을 액세스하는 경우 문제점 방지하기 위해, GIL이 실행된다.
    • GIL 실행, 리소스 전체에 Lock -> Context Switch 비용.. 때문에 오히려 싱글스레드보다 느려질 수 있다.
  • GIL 우회법은 주로, multiprocess, CPython

사용

from concurrent import futures


#!/usr/bin/env python3
from concurrent import futures

WORK_LIST = [10**3, 10**4, 10**5]

def sum_gen(n):
    return sum(n for n in range(1, n+1))

if __name__ == '__main__':
    with futures.ThreadPoolExecutor() as executor:
        result = executor.map(sum_gen, WORK_LIST)
    print(*result)

Scheduling

#!/usr/bin/env python3
from concurrent import futures

WORK_LIST = [10**3, 10**4, 10**5]

def sum_gen(n):
    return sum(n for n in range(1, n+1))

if __name__ == '__main__':

    futures_list = []
    with futures.ThreadPoolExecutor() as executor:
        for work in WORK_LIST:
            future = executor.submit(sum_gen, work)

            futures_list.append(future)

            print('Scheduling for {} : {}'.format(work,future))

wait

#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [10**7, 10**4, 10**5]

def sum_gen(n):
    return sum(n for n in range(1, n+1))

if __name__ == '__main__':

    futures_list = []
    with ThreadPoolExecutor() as executor:
        for work in WORK_LIST:
            future = executor.submit(sum_gen, work)

            futures_list.append(future)

            print('Scheduling for {} : {}'.format(work,future))

        result = wait(futures_list, timeout=0.00000001)

    print('Completed Tasks: ' + str(result.done))
    print('Completed Tasks: ' + str(result.not_done))

결과 출력


as_completed

profile
pllpokko@alumni.kaist.ac.kr

0개의 댓글