Amostragem ponderada, verbos Tidyr, escalonador robusto, RAPIDS e muito mais


sparklyr 1.4 já está disponível em CRAN! Para instalar sparklyr 1.4 do CRAN, execute

Nesta postagem do weblog, apresentaremos as seguintes novas funcionalidades muito esperadas do sparklyr Versão 1.4:

Amostragem Ponderada Paralelizada

Leitores familiarizados com dplyr::sample_n() e dplyr::sample_frac() funções podem ter notado que ambas suportam casos de uso de amostragem ponderada em dataframes R, por exemplo,

dplyr::sample_n(mtcars, dimension = 3, weight = mpg, substitute = FALSE)
               mpg cyl  disp  hp drat    wt  qsec vs am gear carb
Fiat 128      32.4   4  78.7  66 4.08 2.200 19.47  1  1    4    1
Merc 280C     17.8   6 167.6 123 3.92 3.440 18.90  1  0    4    4
Mazda RX4 Wag 21.0   6 160.0 110 3.90 2.875 17.02  0  1    4    4

e

dplyr::sample_frac(mtcars, dimension = 0.1, weight = mpg, substitute = FALSE)
             mpg cyl  disp  hp drat    wt  qsec vs am gear carb
Honda Civic 30.4   4  75.7  52 4.93 1.615 18.52  1  1    4    2
Merc 450SE  16.4   8 275.8 180 3.07 4.070 17.40  0  0    3    3
Fiat X1-9   27.3   4  79.0  66 4.08 1.935 18.90  1  1    4    1

irá selecionar algum subconjunto aleatório de mtcars usando o mpg atributo como o peso amostral para cada linha. Se substitute = FALSE é definido, então uma linha é removida da população de amostragem assim que for selecionada, enquanto ao definir substitute = TRUEcada linha sempre permanecerá na população amostral e poderá ser selecionada diversas vezes.

Agora, exatamente os mesmos casos de uso são suportados para dataframes do Spark em sparklyr 1.4! Por exemplo:

library(sparklyr)

sc <- spark_connect(grasp = "native")
mtcars_sdf <- copy_to(sc, mtcars, repartition = 4L)

dplyr::sample_n(mtcars_sdf, dimension = 5, weight = mpg, substitute = FALSE)

retornará um subconjunto aleatório de tamanho 5 do dataframe do Spark mtcars_sdf.

Mais importante ainda, o algoritmo de amostragem implementado em sparklyr 1.4 é algo que se encaixa perfeitamente no paradigma MapReduce: como dividimos nosso mtcars dados em 4 partições de mtcars_sdf especificando repartition = 4Lo algoritmo primeiro processará cada partição de forma independente e em paralelo, selecionando um conjunto de amostras de tamanho até 5 de cada e, em seguida, reduzirá todos os 4 conjuntos de amostras em um conjunto de amostras ultimate de tamanho 5, escolhendo registros com as 5 prioridades de amostragem mais altas. entre todos.

Como é possível tal paralelização, principalmente para o cenário de amostragem sem reposição, onde o resultado desejado é definido como o resultado de um processo sequencial? Uma resposta detalhada a esta pergunta está em esta postagem do weblogque inclui uma definição do problema (em specific, o significado exato dos pesos amostrais em termos de probabilidades), uma explicação de alto nível da solução atual e a motivação por trás dela, e também alguns detalhes matemáticos, todos escondidos em um hyperlink para um arquivo PDF, para que os leitores não orientados para a matemática possam entender a essência de todo o resto sem se assustarem, enquanto os leitores orientados para a matemática podem se divertir resolvendo todas as integrais antes de espiar a resposta.

Verbos arrumados

As implementações especializadas dos seguintes tidyr verbos que funcionam eficientemente com dataframes Spark foram incluídos como parte do sparklyr 1.4:

Podemos demonstrar como esses verbos são úteis para organizar dados através de alguns exemplos.

Digamos que recebemos mtcars_sdfum dataframe do Spark contendo todas as linhas de mtcars mais o nome de cada linha:

# Supply: spark> (?? x 12)
  mannequin          mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
                    
1 Mazda RX4     21       6   160   110  3.9   2.62  16.5     0     1     4     4
2 Mazda RX4 W…  21       6   160   110  3.9   2.88  17.0     0     1     4     4
3 Datsun 710    22.8     4   108    93  3.85  2.32  18.6     1     1     4     1
4 Hornet 4 Dr…  21.4     6   258   110  3.08  3.22  19.4     1     0     3     1
5 Hornet Spor…  18.7     8   360   175  3.15  3.44  17.0     0     0     3     2
# … with extra rows

e gostaríamos de transformar todos os atributos numéricos em mtcar_sdf (em outras palavras, todas as colunas, exceto a mannequin coluna) em pares de valores-chave armazenados em 2 colunas, com o key coluna que armazena o nome de cada atributo e o worth coluna que armazena o valor numérico de cada atributo. Uma maneira de conseguir isso com tidyr é utilizando o tidyr::pivot_longer funcionalidade:

mtcars_kv_sdf <- mtcars_sdf %>%
  tidyr::pivot_longer(cols = -mannequin, names_to = "key", values_to = "worth")
print(mtcars_kv_sdf, n = 5)
# Supply: spark> (?? x 3)
  mannequin     key   worth
        
1 Mazda RX4 am      1
2 Mazda RX4 carb    4
3 Mazda RX4 cyl     6
4 Mazda RX4 disp  160
5 Mazda RX4 drat    3.9
# … with extra rows

Para desfazer o efeito de tidyr::pivot_longerpodemos aplicar tidyr::pivot_wider para o nosso mtcars_kv_sdf Spark dataframe e recupere os dados originais que estavam presentes em mtcars_sdf:

tbl <- mtcars_kv_sdf %>%
  tidyr::pivot_wider(names_from = key, values_from = worth)
print(tbl, n = 5)
# Supply: spark> (?? x 12)
  mannequin         carb   cyl  drat    hp   mpg    vs    wt    am  disp  gear  qsec
                    
1 Mazda RX4        4     6  3.9    110  21       0  2.62     1  160      4  16.5
2 Hornet 4 Dr…     1     6  3.08   110  21.4     1  3.22     0  258      3  19.4
3 Hornet Spor…     2     8  3.15   175  18.7     0  3.44     0  360      3  17.0
4 Merc 280C        4     6  3.92   123  17.8     1  3.44     0  168.     4  18.9
5 Merc 450SLC      3     8  3.07   180  15.2     0  3.78     0  276.     3  18
# … with extra rows

Outra maneira de reduzir muitas colunas em menos é usando tidyr::nest para mover algumas colunas para tabelas aninhadas. Por exemplo, podemos criar uma tabela aninhada perf encapsulando todos os atributos relacionados ao desempenho de mtcars (nomeadamente, hp, mpg, dispe qsec). No entanto, ao contrário dos dataframes R, os Spark Dataframes não têm o conceito de tabelas aninhadas, e o mais próximo das tabelas aninhadas que podemos obter é um perf coluna contendo estruturas nomeadas com hp, mpg, dispe qsec atributos:

mtcars_nested_sdf <- mtcars_sdf %>%
  tidyr::nest(perf = c(hp, mpg, disp, qsec))

Podemos então inspecionar o tipo de perf coluna em mtcars_nested_sdf:

sdf_schema(mtcars_nested_sdf)$perf$kind
(1) "ArrayType(StructType(StructField(hp,DoubleType,true), StructField(mpg,DoubleType,true), StructField(disp,DoubleType,true), StructField(qsec,DoubleType,true)),true)"

e inspecionar elementos estruturais individuais dentro perf:

perf <- mtcars_nested_sdf %>% dplyr::pull(perf)
unlist(perf((1)))
    hp    mpg   disp   qsec
110.00  21.00 160.00  16.46

Finalmente, também podemos usar tidyr::unnest para desfazer os efeitos tidyr::nest:

mtcars_unnested_sdf <- mtcars_nested_sdf %>%
  tidyr::unnest(col = perf)
print(mtcars_unnested_sdf, n = 5)
# Supply: spark> (?? x 12)
  mannequin          cyl  drat    wt    vs    am  gear  carb    hp   mpg  disp  qsec
                    
1 Mazda RX4        6  3.9   2.62     0     1     4     4   110  21    160   16.5
2 Hornet 4 Dr…     6  3.08  3.22     1     0     3     1   110  21.4  258   19.4
3 Duster 360       8  3.21  3.57     0     0     3     4   245  14.3  360   15.8
4 Merc 280         6  3.92  3.44     1     0     4     4   123  19.2  168.  18.3
5 Lincoln Con…     8  3     5.42     0     0     3     4   215  10.4  460   17.8
# … with extra rows

Escalador Robusto

RobustScaler é uma nova funcionalidade introduzida no Spark 3.0 (FAÍSCA-28399). Graças a um solicitação pull por @zero323uma interface R para RobustScalera saber, o ft_robust_scaler() função, agora faz parte sparklyr.

É frequentemente observado que muitos algoritmos de aprendizado de máquina têm melhor desempenho em entradas numéricas padronizadas. Muitos de nós aprendemos nas estatísticas 101 que, dada uma variável aleatória (X)podemos calcular sua média (mu = E(X))desvio padrão (sigma = sqrt{E(X^2) – (E(X))^2})e, em seguida, obter uma pontuação padrão (z = frac{X – mu}{sigma}) que tem média 0 e desvio padrão 1.

No entanto, observe ambos (EX)) e (E(X^2)) acima são quantidades que podem ser facilmente distorcidas por valores discrepantes extremos em (X)causando distorções (z). Um caso particularmente ruim seria se todos os não-outliers entre (X) estão muito perto de (0)portanto fazendo (EX)) perto de (0)enquanto os valores discrepantes extremos estão todos na direção negativa, arrastando assim para baixo (EX)) enquanto distorce (E(X^2)) para cima.

Uma forma alternativa de padronização (X) com base em seus valores de mediana, 1º quartil e 3º quartil, todos robustos contra valores discrepantes, seria o seguinte:

(displaystyle z = frac{X – textual content{Mediana}(X)}{textual content{P75}(X) – textual content{P25}(X)})

e é exatamente isso que RobustScaler ofertas.

Para ver ft_robust_scaler() em ação e demonstrar sua utilidade, podemos passar por um exemplo inventado que consiste nas seguintes etapas:

  • Extraia 500 amostras aleatórias da distribuição regular padrão
  (1) -0.626453811  0.183643324 -0.835628612  1.595280802  0.329507772
  (6) -0.820468384  0.487429052  0.738324705  0.575781352 -0.305388387
  ...
  • Inspecione os valores mínimo e máximo entre os (500) amostras aleatórias:
  (1) -3.008049
  (1) 3.810277
  • Agora crie (10) outros valores que são discrepantes extremos em comparação com o (500) amostras aleatórias acima. Dado que sabemos tudo (500) amostras estão dentro da faixa de ((-4, 4))podemos escolher (-501, -502, ldots, -509, -510) como nosso (10) discrepantes:
outliers <- -500L - seq(10)
  • Copiar tudo (510) valores em um dataframe Spark chamado sdf
library(sparklyr)

sc <- spark_connect(grasp = "native", model = "3.0.0")
sdf <- copy_to(sc, information.body(worth = c(sample_values, outliers)))
  • Podemos então aplicar ft_robust_scaler() para obter o valor padronizado para cada entrada:
scaled <- sdf %>%
  ft_vector_assembler("worth", "enter") %>%
  ft_robust_scaler("enter", "scaled") %>%
  dplyr::pull(scaled) %>%
  unlist()
  • Traçar o resultado mostra os pontos de dados não discrepantes sendo dimensionados para valores que ainda formam mais ou menos uma distribuição em forma de sino centrada em torno (0)como esperado, portanto o dimensionamento é robusto contra a influência dos valores discrepantes:

Amostragem ponderada, verbos Tidyr, escalonador robusto, RAPIDS e muito mais

  • Finalmente, podemos comparar a distribuição dos valores escalonados acima com a distribuição das pontuações z de todos os valores de entrada e observar como escalonar a entrada apenas com média e desvio padrão teria causado assimetria perceptível – que o escalonador robusto evitou com sucesso:
all_values <- c(sample_values, outliers)
z_scores <- (all_values - imply(all_values)) / sd(all_values)
ggplot(information.body(scaled = z_scores), aes(x = scaled)) +
  xlim(-0.05, 0.2) +
  geom_histogram(binwidth = 0.005)

  • A partir dos 2 gráficos acima, pode-se observar que ambos os processos de padronização produziram algumas distribuições que ainda tinham formato de sino, a produzida por ft_robust_scaler() está centrado em torno (0)indicando corretamente a média entre todos os valores não discrepantes, enquanto a distribuição do escore z claramente não está centrada em (0) como o seu centro foi visivelmente deslocado pela (10) valores discrepantes.

RÁPIDAS

Os leitores que acompanham de perto os lançamentos do Apache Spark provavelmente notaram a recente adição de RÁPIDAS Suporte para aceleração de GPU no Spark 3.0. Acompanhando esse desenvolvimento recente, também foi criada uma opção para habilitar RAPIDS em conexões Spark. sparklyr e enviado em sparklyr 1.4. Em um host com {hardware} compatível com RAPIDS (por exemplo, uma instância do Amazon EC2 do tipo ‘p3.2xlarge’), pode-se instalar sparklyr 1.4 e observe a aceleração de {hardware} RAPIDS sendo refletida nos planos de consulta física do Spark SQL:

library(sparklyr)

sc <- spark_connect(grasp = "native", model = "3.0.0", packages = "rapids")
dplyr::db_explain(sc, "SELECT 4")
== Bodily Plan ==
*(2) GpuColumnarToRow false
+- GpuProject (4 AS 4#45)
   +- GpuRowToColumnar TargetSize(2147483647)
      +- *(1) Scan OneRowRelation()

Todas as funções de ordem superior recentemente introduzidas no Spark 3.0, como array_sort() com comparador personalizado, transform_keys(), transform_values()e map_zip_with()são apoiados por sparklyr 1.4.

Além disso, todas as funções de ordem superior agora podem ser acessadas diretamente através dplyr em vez de seus hof_* homólogos em sparklyr. Isto significa, por exemplo, que podemos executar o seguinte dplyr consultas para calcular o quadrado de todos os elementos da matriz na coluna x de sdfe, em seguida, classifique-os em ordem decrescente:

library(sparklyr)

sc <- spark_connect(grasp = "native", model = "3.0.0")
sdf <- copy_to(sc, tibble::tibble(x = record(c(-3, -2, 1, 5), c(6, -7, 5, 8))))

sq_desc <- sdf %>%
  dplyr::mutate(x = rework(x, ~ .x * .x)) %>%
  dplyr::mutate(x = array_sort(x, ~ as.integer(signal(.y - .x)))) %>%
  dplyr::pull(x)

print(sq_desc)
((1))
(1) 25  9  4  1

((2))
(1) 64 49 36 25

Reconhecimento

Em ordem cronológica, gostaríamos de agradecer às seguintes pessoas por suas contribuições para sparklyr 1.4:

Também apreciamos relatórios de bugs, solicitações de recursos e outros comentários valiosos sobre sparklyr da nossa incrível comunidade de código aberto (por exemplo, o recurso de amostragem ponderada em sparklyr 1.4 foi em grande parte motivado por este Problema no Github arquivado por @ajinge alguns dplyrcorreções de bugs relacionadas nesta versão foram iniciadas em #2648 e completou com isso solicitação pull por @wkdavis).

Por último, mas não menos importante, o autor deste submit está extremamente grato pelas fantásticas sugestões editoriais de @javierluraschi, @batpigandmee @skeydan.

Se você deseja saber mais sobre sparklyrrecomendamos verificar sparklyr.ai, spark.rstudio.come também algumas das postagens de lançamento anteriores, como brilhante 1.3 e brilhante 1.2.

Obrigado por ler!

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *