db.from_sequence()
: list to bags.take()
, .count()
, .replace()
# import dask.bag as db
str_list = ['I got new laptop', 'you have more things than I do', 'lord have mercy']
str_bag = db.from_sequence(str_list, npartitions = 3) # npartitions 지정안해도 알아서 split됨
print(str_bag) #dask.bag<from_sequence, npartitions=3> -> lazy 값
print(str_bag.take(1)) # ('I got new laptop',)
print(str_bag.compute()) # ['I got new laptop', 'you have more things than I do', 'lord have mercy']
# .compute()는 모든 결과 나옴
print(str_bag.count()) # <dask.bag.core.Item object at 0x7fc004f039a0>
print(str_bag.count().compute()) # 3
print(str_bag.str.upper().take(1))
print(str_bag.str.replace('got', 'took a').take(1))
print(str_bag.str.count('I').take(3)) # take 사용하지 않으면 dask값
- 왜인지 모르지만 상황에 따라서 3 elements requested, only 1 elements available. Try passing larger
npartitions
totake
. 이라는 오류 메세지가 뜨는데 원인을 모르겠음. 3개의 elements가 존재하고, take(3)의 element를 넣었기 때문에 문제가 될 것이 없다고 생각됨- 다른 데이터 이용한 것에서는 잘 되는데 왜 위의 데이터에서는 안되는지 확인 필요
db.read_text(files)
: file 한개씩 bag에 seperately하게 넣음files = glob.glob('/Volumes/Google Drive/My Drive/Colab_Notebook/kaggle/ecom_behavior/bag_dir/[0-9].*')
print(files) # list
txt_db = db.read_text(files)
print(txt_db)
Dask Bags에서는 Json file을 많이 사용함
1) map : .map(func).compute()
function 적용
import dask.bag as db
import glob
import json
files = glob.glob('/Volumes/Google Drive/My Drive/Colab_Notebook/kaggle/ecom_behavior/bag_dir/politicians/*.json')
txt_db = db.read_text(files)
print(txt_db.take(1))
print(type(txt_db.take(1))) # tuple이지만 그 안은 str
dict_bag = txt_db.map(json.loads)
print(dict_bag.take(1))
print(type(dict_bag.take(1))) # tuble이지만 그 안은 dict
2) filter : .filter()
필터링 적용
def gender_filter(txt_db):
return txt_db['gender'] == "male"
male_db = dict_bag.filter(gender_filter)
print(male_db.count().compute()) #843
3) pluke : .pluck()
# pluck
pluck_ = dict_bag.pluck('gender')
pluck_.take(3)
pluck_name_len = dict_bag.pluck('name').map(len)
pluck_min = pluck_name_len.min()
pluck_max = pluck_name_len.max()
pluck_mean = pluck_name_len.mean()
print(dask.compute(pluck_min, pluck_max, pluck_mean))
Process와 Thread를 동시에 사용하기 위해서는 Cluster와 Client가 필요함
Client를 한번 설정하면 (그 이후의) dask는 모든 computations에 Client를 사용함(그것이 bags이던, df이던 무엇이던간에)
LocalCluster()
: 사용 중인 컴퓨터에 생성되는 cluster. 이외로 생성된 cluster는 다른 컴퓨터와 작업을 분리하여 진행함.
# 두가지 방법 다 동일한 결과값 생성
from dask.distributed import Client, LocalCluster
# Method1. Making Client
client = Client(processes = True,
n_workers = 4,
threads_per_worker = 1)
# Method2. Making Cluster then make client.
cluster = LocalCluster(processes = True,
n_workers = 4,
threads_per_worker = 1
)
client = Client(cluster)