Dask: uma ferramenta Pythonic para processamento de dados

Dask: uma ferramenta Pythonic para processamento de dados

A tecnologia permitiu que o mundo evoluísse constantemente. Passamos do uso de cartões perfurados à análise de dados em questão de segundos. Isso gerou um novo conceito de Big Data e um novo ramo da ciência da computação focado em transformar dados em informações funcionais para a tomada de decisões.

O Python é uma das ferramentas preferidas pelos cientistas de dados, mas existem outras que também permitem a análise de dados, entre as mais conhecidas Pandas e Numpy. No entanto, eles têm enormes limitações com conjuntos de dados. O Dask, parte do ecossistema Python nativo para Data Science, suporta o trabalho com diferentes tamanhos de conjuntos de dados ou datasets.

Tipo de Dataset

Tamaño de Dataset

Procesamiento

Herramienta

Dataset Pequeño

Max. 4GB

RAM y Disco

Pandas y Numpy

Dataset Mediano

Max. 2TB

Disco

Dask o Spark

Dataset Grande

> 2TB

Cluster

Dask o Spark

Tabela 1. Tipos de conjunto de dados e ferramentas recomendadas (espanhol)

Embora existam conjuntos de dados que não podemos processar diretamente com Pandas ou Numpy, isso é possível com Dask, que graças ao seu link com Python nos permite continuar usando essas ferramentas em nossos projetos de análise de dados. Independentemente do seu tamanho, podemos analisar conjuntos de dados de tamanho médio localmente, mas um cluster torna-se necessário para analisar grandes coleções de dados.

Em seus primórdios (2014-2015), o Dask nasceu como uma ferramenta para objetos Numpy distribuídos e, assim, aproveitar estações de trabalho em diferentes empresas. Desde então, passou por uma evolução significativa no processo de integração de ferramentas como a ciência geográfica e a análise de imagens. Os esforços para manter a constante evolução do Dask são enormes, apoiados por instituições como NASA, NVIDIA ou Anaconda.

As características do Dask são:

- Permite que você trabalhe com uma ampla gama de tarefas.
- Agendamento dinâmico de tarefas.
- Facilita o uso de certificados TLS/SSL para autenticação e criptografia.
- Agilidade para implantá-lo em um computador ou cluster.
- Ele funciona de forma confiável em um cluster de até 10 mil núcleos.
- Machine Learning.

Imagem 1. Fluxo de trabalho do Dask. Fonte: Documentação do Dask

Por outro lado, o Spark é uma ferramenta muito semelhante ao Dask desenvolvido em Scala (uma variante do Java). Atualmente, recebe aprimoramentos de código da comunidade e faz parte da Apache Software Foundation. Esta ferramenta pode ser utilizada em diferentes linguagens de programação, sendo as mais utilizadas Scala e Java —com o interesse de aproveitar todo o desempenho e novidades que nos oferece—, mas podemos nos comunicar com o cluster Spark desde outras linguagens como SQL, R, F#, C# e, claro, Python.

Para se comunicar com o Apache Spark do Python, devemos usar a API chamada PySpark. Como acontece com quase todas as ferramentas, se seu ambiente nativo não for usado, podemos obter resultados interessantes no início do projeto, mas com o tempo usar uma API para se comunicar custaria caro e acabaríamos presos em um shell, não podendo para usar todo o potencial da ferramenta ou simplesmente aguardar a atualização da API para aproveitar os novos recursos.

Spark e Dask têm funcionalidades semelhantes, como:

- Parallel Execution: permite trabalhar com clusters
- Execução "preguiçosa": que trabalha diretamente com gráficos de tarefas, programadores ou escalonadores.
- Dask nos oferece mais ferramentas que Spark, já que permite realizar testes de desenvolvimento no computador sem a necessidade de um cluster, além de integração nativa com Scikit-learn para Machine Learning e a ferramenta Joblib para trabalhar funções através de pipelines.

Dask não é o mesmo que Pandas. Embora possam trabalhar de mãos dadas, existem certas diferenças que podem pesar no desenvolvimento. Qual? Bem, os objetos no Dask são imutáveis, então não podemos usar operações diretamente. Para corrigir isso, você pode migrar o objeto Dask para o Pandas e trabalhar silenciosamente ou usar o método map_partitions, que permite operações do Pandas em cada uma das partições.

A forma de trabalho “pythonic” do Dask torna possível trabalhar de forma fácil e rápida em ambientes locais e distribuídos, sem a necessidade de instalar ferramentas não-pythonic como Java, Scala e cluster Spark para executar seu código. Essa é uma grande vantagem, pois não é necessário ter conhecimentos fora do mundo Python para realizar processos de Ciência de Dados.

Aqui está uma pequena demonstração da simplicidade do Dask. Neste caso, usaremos um arquivo no Amazon Web Service S3, bem como um arquivo Raw JSON. Isso significa que uma estrutura JSON como tal não é esperada. Este arquivo tem 5 milhões de registros e 20 colunas, então por enquanto vamos apenas consumir a informação sem transformá-la ou limpá-la.

O que você precisa?

  • Instalação completa do Dask (Pip install “dask[complete]”).
⚠️
Nota: Se quisermos apenas instalar o Dask dag, podemos fazê-lo alterando a palavra complete para dag.

  • Instalação da biblioteca S3 (Pip install s3fs).
⚠️
Nota: aplica-se apenas ao teste com dados em S3. Se for feito localmente, esta instalação pode ser ignorada.

Para o consumo do arquivo JSON do S3 utilizarei a ferramenta Google Colab, portanto desta vez não utilizarei um cluster.

import json

from dask import bag as db

from dask import dataframe as dd

S3_URI_FILE = "s3://your_S3_URI/your_file.json"

S3_KEY = “Your_Key”

S3_SECRET_KEY = “Your Secrete KEY”



%%time

ddf = db.read_text(S3_URI_FILE, storage_options = {'key': S3_KEY, 'secret': S3_SECRET_KEY}).map(json.loads)

Resultado:

CPU times: user 954 µs, sys: 0 ns, total: 954 µs

Wall time: 947 µs

%%time

data_frame = ddf.to_dataframe()

data_frame_pd = ddf.compute()

Resultado:

CPU times: user 2.91 s, sys: 679 ms, total: 3.59 s

Wall time: 6.18 s

Código para testá-lo localmente:

import json

import time

from dask import bag as db

from dask import dataframe as dd

start_time = time.time()

ddf = db.read_text(“Your_json_File.json").map(json.loads)

data_frame = ddf.to_dataframe()

print(f'Time to get data and make a dask dataframe is: {time.time()-start_time}')

start_time_2 = time.time()

data_frame_pd = data_frame.compute()

print(f'time to convert dask  dataframe to pandas dataframe: {time.time()-start_time_2}')

start_time_3 = time.time()

data_frame.visualize()

print(f'time to visualize graphs is: {time.time()-start_time_3}')

print(f'dask task to execute was: {time.time()-start_time}')

Resultado:

Time to get data and make a dask dataframe is: 0.7774131298065186

time to convert dask  dataframe to pandas dataframe: 39.88171863555908

time to visualize graphs is: 0.8343448638916016

dask task to execute was: 41.49371099472046

Se você quiser testá-lo ou implantá-lo localmente com um cluster, poderá usar o Docker. Aqui você encontrará a documentação oficial.

Até logo.

⚠️
As opiniões e comentários expressos neste artigo são de propriedade exclusiva de seu autor e não representam necessariamente o ponto de vista da Revelo.

A Revelo Content Network acolhe todas as raças, etnias, nacionalidades, credos, gêneros, orientações, pontos de vista e ideologias, desde que promovam diversidade, equidade, inclusão e crescimento na carreira dos profissionais de tecnologia.