melhor interface dplyr, mais funções sdf_* e rotinas de serialização baseadas em RDS



melhor interface dplyr, mais funções sdf_* e rotinas de serialização baseadas em RDS

Estamos entusiasmados em anunciar sparklyr 1.5 já está disponível em CRAN!

Para instalar sparklyr 1.5 do CRAN, execute

Nesta postagem do weblog, destacaremos os seguintes aspectos de sparklyr 1,5:

Melhor interface dplyr

Uma grande fração das solicitações pull que foram para o sparklyr A versão 1.5 estava focada em fazer os dataframes do Spark funcionarem com vários dplyr verbos da mesma forma que os dataframes R fazem. A lista completa de dplyr-bugs relacionados e solicitações de recursos que foram resolvidos em
sparklyr 1,5 pode ser encontrado em aqui.

Nesta seção, apresentaremos três novas funcionalidades do dplyr que foram fornecidas com sparklyr 1.5.

Amostragem estratificada

A amostragem estratificada em um dataframe R pode ser realizada com uma combinação de dplyr::group_by() seguido pela
dplyr::sample_n() ou dplyr::sample_frac()onde as variáveis ​​de agrupamento especificadas no dplyr::group_by()
step são os que definem cada estrato. Por exemplo, a consulta a seguir agrupará mtcars por número de cilindros e retornar uma amostra aleatória ponderada de tamanho dois de cada grupo, sem reposição, e ponderada pelo mpg coluna:

## # A tibble: 6 x 11
## # Teams:   cyl (3)
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  33.9     4  71.1    65  4.22  1.84  19.9     1     1     4     1
## 2  22.8     4 108      93  3.85  2.32  18.6     1     1     4     1
## 3  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
## 4  21       6 160     110  3.9   2.62  16.5     0     1     4     4
## 5  15.5     8 318     150  2.76  3.52  16.9     0     0     3     2
## 6  19.2     8 400     175  3.08  3.84  17.0     0     0     3     2

A partir de sparklyr 1.5, o mesmo também pode ser feito para dataframes Spark com Spark 3.0 ou superior, por exemplo:

library(sparklyr)

sc <- spark_connect(grasp = "native", model = "3.0.0")
mtcars_sdf <- copy_to(sc, mtcars, exchange = TRUE, repartition = 3)

mtcars_sdf %>%
  dplyr::group_by(cyl) %>%
  dplyr::sample_n(dimension = 2, weight = mpg, exchange = FALSE) %>%
  print()
# Supply: spark> (?? x 11)
# Teams: cyl
    mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
            
1  21       6 160     110  3.9   2.62  16.5     0     1     4     4
2  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
3  27.3     4  79      66  4.08  1.94  18.9     1     1     4     1
4  32.4     4  78.7    66  4.08  2.2   19.5     1     1     4     1
5  16.4     8 276.    180  3.07  4.07  17.4     0     0     3     3
6  18.7     8 360     175  3.15  3.44  17.0     0     0     3     2

ou

## # Supply: spark> (?? x 11)
## # Teams: cyl
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##             
## 1  21       6 160     110  3.9   2.62  16.5     0     1     4     4
## 2  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
## 3  22.8     4 141.     95  3.92  3.15  22.9     1     0     4     2
## 4  33.9     4  71.1    65  4.22  1.84  19.9     1     1     4     1
## 5  30.4     4  95.1   113  3.77  1.51  16.9     1     1     5     2
## 6  15.5     8 318     150  2.76  3.52  16.9     0     0     3     2
## 7  18.7     8 360     175  3.15  3.44  17.0     0     0     3     2
## 8  16.4     8 276.    180  3.07  4.07  17.4     0     0     3     3

Somas de linha

O rowSums() funcionalidade oferecida por dplyr é útil quando é necessário somar um grande número de colunas em um dataframe R que são impraticáveis ​​para serem enumeradas individualmente. Por exemplo, aqui temos um dataframe de seis colunas de números reais aleatórios, onde o
partial_sum coluna no resultado contém a soma das colunas b através d dentro de cada linha:

## # A tibble: 5 x 7
##         a     b     c      d     e      f partial_sum
##                   
## 1 0.781   0.801 0.157 0.0293 0.169 0.0978        1.16
## 2 0.696   0.412 0.221 0.941  0.697 0.675         2.27
## 3 0.802   0.410 0.516 0.923  0.190 0.904         2.04
## 4 0.200   0.590 0.755 0.494  0.273 0.807         2.11
## 5 0.00149 0.711 0.286 0.297  0.107 0.425         1.40

Começando com sparklyr 1.5, a mesma operação pode ser realizada com dataframes Spark:

## # Supply: spark> (?? x 7)
##         a     b     c      d     e      f partial_sum
##                   
## 1 0.781   0.801 0.157 0.0293 0.169 0.0978        1.16
## 2 0.696   0.412 0.221 0.941  0.697 0.675         2.27
## 3 0.802   0.410 0.516 0.923  0.190 0.904         2.04
## 4 0.200   0.590 0.755 0.494  0.273 0.807         2.11
## 5 0.00149 0.711 0.286 0.297  0.107 0.425         1.40

Como bônus pela implementação do rowSums recurso para dataframes Spark,
sparklyr 1.5 agora também oferece suporte limitado para o operador de subconjunto de colunas em dataframes Spark. Por exemplo, todos os trechos de código abaixo retornarão algum subconjunto de colunas do dataframe denominado sdf:

# choose columns `b` by `e`
sdf(2:5)
# choose columns `b` and `c`
sdf(c("b", "c"))
# drop the primary and third columns and return the remaining
sdf(c(-1, -3))

Resumidor de média ponderada

Semelhante aos dois dplyr funções mencionadas acima, o weighted.imply() resumidor é outra função útil que se tornou parte do dplyr interface para dataframes do Spark em sparklyr 1.5. Pode-se ver isso em ação, por exemplo, comparando a saída do seguinte

com saída da operação equivalente em mtcars em R:

ambos devem avaliar o seguinte:

##     cyl mpg_wm
##     
## 1     4   25.9
## 2     6   19.6
## 3     8   14.8

Novas adições ao sdf_* família de funções

sparklyr fornece um grande número de funções convenientes para trabalhar com dataframes Spark, e todos eles têm nomes começando com o sdf_ prefixo.

Nesta seção mencionaremos brevemente quatro novas adições e mostraremos alguns exemplos de cenários em que essas funções são úteis.

sdf_expand_grid()

Como o nome sugere, sdf_expand_grid() é simplesmente o equivalente Spark de develop.grid(). Em vez de correr develop.grid() em R e importando o dataframe R resultante para o Spark, agora é possível executar sdf_expand_grid()que aceita vetores R e dataframes Spark e oferece suporte a dicas para junções de hash de transmissão. O exemplo abaixo mostra sdf_expand_grid() criando uma grade de 100 por 100 por 10 por 10 no Spark em mais de 1000 partições do Spark, com dicas de hash de transmissão em variáveis ​​com pequenas cardinalidades:

library(sparklyr)

sc <- spark_connect(grasp = "native")

grid_sdf <- sdf_expand_grid(
  sc,
  var1 = seq(100),
  var2 = seq(100),
  var3 = seq(10),
  var4 = seq(10),
  broadcast_vars = c(var3, var4),
  repartition = 1000
)

grid_sdf %>% sdf_nrow() %>% print()
## (1) 1e+06

sdf_partition_sizes()

Como sparklyr usuário @sbottelli sugerido aquiuma coisa que seria ótimo ter em sparklyr é uma maneira eficiente de consultar tamanhos de partição de um dataframe Spark. Em sparklyr 1,5, sdf_partition_sizes() faz exatamente isso:

library(sparklyr)

sc <- spark_connect(grasp = "native")

sdf_len(sc, 1000, repartition = 5) %>%
  sdf_partition_sizes() %>%
  print(row.names = FALSE)
##  partition_index partition_size
##                0            200
##                1            200
##                2            200
##                3            200
##                4            200

sdf_unnest_longer() e sdf_unnest_wider()

sdf_unnest_longer() e sdf_unnest_wider() são os equivalentes de
tidyr::unnest_longer() e tidyr::unnest_wider() para quadros de dados Spark.
sdf_unnest_longer() expande todos os elementos em uma coluna struct em várias linhas e
sdf_unnest_wider() os expande em várias colunas. Conforme ilustrado com um exemplo de dataframe abaixo,

library(sparklyr)

sc <- spark_connect(grasp = "native")
sdf <- copy_to(
  sc,
  tibble::tibble(
    id = seq(3),
    attribute = checklist(
      checklist(title = "Alice", grade = "A"),
      checklist(title = "Bob", grade = "B"),
      checklist(title = "Carol", grade = "C")
    )
  )
)
sdf %>%
  sdf_unnest_longer(col = report, indices_to = "key", values_to = "worth") %>%
  print()

avalia para

## # Supply: spark> (?? x 3)
##      id worth key
##     
## 1     1 A     grade
## 2     1 Alice title
## 3     2 B     grade
## 4     2 Bob   title
## 5     3 C     grade
## 6     3 Carol title

enquanto

sdf %>%
  sdf_unnest_wider(col = report) %>%
  print()

avalia para

## # Supply: spark> (?? x 3)
##      id grade title
##     
## 1     1 A     Alice
## 2     2 B     Bob
## 3     3 C     Carol

Rotinas de serialização baseadas em RDS

Alguns leitores devem estar se perguntando por que um novo formato de serialização precisaria ser implementado em sparklyr de forma alguma. Resumindo, o motivo é que a serialização RDS é um substituto estritamente melhor para seu antecessor CSV. Ele possui todos os atributos desejáveis ​​do formato CSV, evitando uma série de desvantagens que são comuns entre os formatos de dados baseados em texto.

Nesta seção, descreveremos brevemente por que sparklyr deve suportar pelo menos um formato de serialização diferente de arrowaprofunde-se nos problemas da serialização baseada em CSV e mostre como a nova serialização baseada em RDS está livre desses problemas.

Por que arrow não é para todos?

Para transferir dados entre Spark e R de maneira correta e eficiente, sparklyr deve contar com algum formato de serialização de dados que seja bem suportado por Spark e R. Infelizmente, poucos formatos de serialização satisfazem esse requisito, e entre os que o fazem estão formatos baseados em texto, como CSV e JSON, e formatos binários, como como Apache Arrow, Protobuf e, recentemente, um pequeno subconjunto da versão 2 do RDS. Para complicar ainda mais o assunto está a consideração adicional que
sparklyr deve suportar pelo menos um formato de serialização cuja implementação possa ser totalmente independente dentro do sparklyr base de código, ou seja, tal serialização não deve depender de nenhum pacote R externo ou biblioteca de sistema, para que possa acomodar usuários que queiram usar sparklyr mas que não possuem necessariamente a cadeia de ferramentas do compilador C++ necessária e outras dependências do sistema para configurar pacotes R, como arrow ou
protolite. Antes de sparklyr 1.5, a serialização baseada em CSV period a alternativa padrão para quando os usuários não tinham o arrow pacote instalado ou quando o tipo de dados que está sendo transportado de R para Spark não é compatível com a versão do arrow disponível.

Por que o formato CSV não é excellent?

Há pelo menos três razões para acreditar que o formato CSV não é a melhor escolha quando se trata de exportar dados de R para Spark.

Um dos motivos é a eficiência. Por exemplo, um número de ponto flutuante de precisão dupla, como .Machine$double.eps precisa ser expresso como "2.22044604925031e-16" no formato CSV para não incorrer em perda de precisão, ocupando assim 20 bytes em vez de 8 bytes.

Mas mais importantes do que a eficiência são as preocupações com a correção. Em um dataframe R, pode-se armazenar ambos NA_real_ e
NaN em uma coluna de números de ponto flutuante. NA_real_ idealmente deveria ser traduzido para null dentro de um dataframe Spark, enquanto
NaN deverá continuar a ser NaN quando transportado de R para Spark. Infelizmente, NA_real_ em R torna-se indistinguível de NaN uma vez serializado em formato CSV, como fica evidente em uma rápida demonstração mostrada abaixo:

##     x is_nan
## 1  NA  FALSE
## 2 NaN   TRUE
csv_file <- "/tmp/knowledge.csv"
write.csv(original_df, file = csv_file, row.names = FALSE)
deserialized_df <- learn.csv(csv_file)
deserialized_df %>% dplyr::mutate(is_nan = is.nan(x)) %>% print()
##    x is_nan
## 1 NA  FALSE
## 2 NA  FALSE

Outra questão de correção muito semelhante à acima foi o fato de que
"NA" e NA dentro de uma coluna de string de um dataframe R tornam-se indistinguíveis uma vez serializados no formato CSV, conforme apontado corretamente em
este problema do Github
por @caewok e outros.

RDS para o resgate!

O formato RDS é um dos formatos binários mais amplamente usados ​​para serializar objetos R. É descrito com algum detalhe no capítulo 1, seção 8 do
este documento. Entre as vantagens do formato RDS estão a eficiência e a precisão: possui uma implementação razoavelmente eficiente na base R e suporta todos os tipos de dados R.

Também vale a pena notar o fato de que quando um dataframe R contendo apenas tipos de dados com equivalentes sensíveis no Apache Spark (por exemplo, RAWSXP, LGLSXP, CHARSXP, REALSXPand so on) é salvo usando o RDS versão 2, (por exemplo, serialize(mtcars, connection = NULL, model = 2L, xdr = TRUE)), apenas um pequeno subconjunto do formato RDS estará envolvido no processo de serialização, e implementar rotinas de desserialização em Scala capazes de decodificar um subconjunto tão restrito de construções RDS é na verdade uma tarefa razoavelmente simples e direta (como mostrado em
aqui
).

Por último, mas não menos importante, como o RDS é um formato binário, ele permite NA_character_, "NA",
NA_real_e NaN todos sejam codificados de maneira inequívoca, permitindo assim sparklyr
1.5 para evitar todos os problemas de correção detalhados acima emarrow casos de uso de serialização.

Outros benefícios da serialização RDS

Além das garantias de correção, o formato RDS também oferece algumas outras vantagens.

Uma vantagem, claro, é o desempenho: por exemplo, importar um conjunto de dados de tamanho não trivial, como nycflights13::flights de R para Spark usando o formato RDS no sparklyr 1.5 é aproximadamente 40% -50% mais rápido em comparação com a serialização baseada em CSV no sparklyr 1.4. A atual implementação baseada em RDS ainda não é tão rápida quanto arrowserialização baseada em (arrow é cerca de 3 a 4x mais rápido), portanto, para tarefas sensíveis ao desempenho que envolvem serialização pesada, arrow ainda deve ser a primeira escolha.

Outra vantagem é que com a serialização RDS, sparklyr pode importar dataframes R contendo
uncooked colunas diretamente em colunas binárias no Spark. Assim, casos de uso como o abaixo funcionarão em sparklyr 1,5

Enquanto a maioria sparklyr os usuários provavelmente não acharão esse recurso de importação de colunas binárias para o Spark imediatamente útil em seu típico sparklyr::copy_to() ou sparklyr::gather()
usos, ele desempenha um papel essential na redução das despesas gerais de serialização no Spark
foreach back-end paralelo que foi introduzido pela primeira vez em sparklyr 1.2. Isso ocorre porque os trabalhadores do Spark podem buscar diretamente os fechamentos R serializados para serem calculados a partir de uma coluna binária do Spark, em vez de extrair esses bytes serializados de representações intermediárias, como strings codificadas em base64. Da mesma forma, os resultados R da execução de fechamentos de trabalhadores estarão diretamente disponíveis no formato RDS, que pode ser desserializado eficientemente em R, em vez de serem entregues em outros formatos menos eficientes.

Reconhecimento

Em ordem cronológica, gostaríamos de agradecer aos seguintes colaboradores por tornarem suas solicitações pull parte do sparklyr 1,5:

Gostaríamos também de expressar nossa gratidão aos inúmeros relatórios de bugs e solicitações de recursos para
sparklyr de uma fantástica comunidade de código aberto.

Finalmente, o autor desta postagem do weblog agradece
@javierluraschi,
@batpigandmee @skeydan por suas valiosas contribuições editoriais.

Se você deseja saber mais sobre sparklyrConfira sparklyr.ai,
spark.rstudio.come algumas das postagens de lançamento anteriores, como
brilhante 1.4 e
brilhante 1.3.

Obrigado por ler!

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *