sparklyr
1.4 já está disponível em CRAN! Para instalar sparklyr
1.4 do CRAN, execute
Nesta postagem do weblog, apresentaremos as seguintes novas funcionalidades muito esperadas do sparklyr
Versão 1.4:
Amostragem Ponderada Paralelizada
Leitores familiarizados com dplyr::sample_n()
e dplyr::sample_frac()
funções podem ter notado que ambas suportam casos de uso de amostragem ponderada em dataframes R, por exemplo,
dplyr::sample_n(mtcars, dimension = 3, weight = mpg, substitute = FALSE)
mpg cyl disp hp drat wt qsec vs am gear carb
Fiat 128 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1
Merc 280C 17.8 6 167.6 123 3.92 3.440 18.90 1 0 4 4
Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
e
dplyr::sample_frac(mtcars, dimension = 0.1, weight = mpg, substitute = FALSE)
mpg cyl disp hp drat wt qsec vs am gear carb
Honda Civic 30.4 4 75.7 52 4.93 1.615 18.52 1 1 4 2
Merc 450SE 16.4 8 275.8 180 3.07 4.070 17.40 0 0 3 3
Fiat X1-9 27.3 4 79.0 66 4.08 1.935 18.90 1 1 4 1
irá selecionar algum subconjunto aleatório de mtcars
usando o mpg
atributo como o peso amostral para cada linha. Se substitute = FALSE
é definido, então uma linha é removida da população de amostragem assim que for selecionada, enquanto ao definir substitute = TRUE
cada linha sempre permanecerá na população amostral e poderá ser selecionada diversas vezes.
Agora, exatamente os mesmos casos de uso são suportados para dataframes do Spark em sparklyr
1.4! Por exemplo:
retornará um subconjunto aleatório de tamanho 5 do dataframe do Spark mtcars_sdf
.
Mais importante ainda, o algoritmo de amostragem implementado em sparklyr
1.4 é algo que se encaixa perfeitamente no paradigma MapReduce: como dividimos nosso mtcars
dados em 4 partições de mtcars_sdf
especificando repartition = 4L
o algoritmo primeiro processará cada partição de forma independente e em paralelo, selecionando um conjunto de amostras de tamanho até 5 de cada e, em seguida, reduzirá todos os 4 conjuntos de amostras em um conjunto de amostras ultimate de tamanho 5, escolhendo registros com as 5 prioridades de amostragem mais altas. entre todos.
Como é possível tal paralelização, principalmente para o cenário de amostragem sem reposição, onde o resultado desejado é definido como o resultado de um processo sequencial? Uma resposta detalhada a esta pergunta está em esta postagem do weblogque inclui uma definição do problema (em specific, o significado exato dos pesos amostrais em termos de probabilidades), uma explicação de alto nível da solução atual e a motivação por trás dela, e também alguns detalhes matemáticos, todos escondidos em um hyperlink para um arquivo PDF, para que os leitores não orientados para a matemática possam entender a essência de todo o resto sem se assustarem, enquanto os leitores orientados para a matemática podem se divertir resolvendo todas as integrais antes de espiar a resposta.
Verbos arrumados
As implementações especializadas dos seguintes tidyr
verbos que funcionam eficientemente com dataframes Spark foram incluídos como parte do sparklyr
1.4:
Podemos demonstrar como esses verbos são úteis para organizar dados através de alguns exemplos.
Digamos que recebemos mtcars_sdf
um dataframe do Spark contendo todas as linhas de mtcars
mais o nome de cada linha:
# Supply: spark> (?? x 12)
mannequin mpg cyl disp hp drat wt qsec vs am gear carb
1 Mazda RX4 21 6 160 110 3.9 2.62 16.5 0 1 4 4
2 Mazda RX4 W… 21 6 160 110 3.9 2.88 17.0 0 1 4 4
3 Datsun 710 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1
4 Hornet 4 Dr… 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
5 Hornet Spor… 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
# … with extra rows
e gostaríamos de transformar todos os atributos numéricos em mtcar_sdf
(em outras palavras, todas as colunas, exceto a mannequin
coluna) em pares de valores-chave armazenados em 2 colunas, com o key
coluna que armazena o nome de cada atributo e o worth
coluna que armazena o valor numérico de cada atributo. Uma maneira de conseguir isso com tidyr
é utilizando o tidyr::pivot_longer
funcionalidade:
mtcars_kv_sdf <- mtcars_sdf %>%
tidyr::pivot_longer(cols = -mannequin, names_to = "key", values_to = "worth")
print(mtcars_kv_sdf, n = 5)
# Supply: spark> (?? x 3)
mannequin key worth
1 Mazda RX4 am 1
2 Mazda RX4 carb 4
3 Mazda RX4 cyl 6
4 Mazda RX4 disp 160
5 Mazda RX4 drat 3.9
# … with extra rows
Para desfazer o efeito de tidyr::pivot_longer
podemos aplicar tidyr::pivot_wider
para o nosso mtcars_kv_sdf
Spark dataframe e recupere os dados originais que estavam presentes em mtcars_sdf
:
tbl <- mtcars_kv_sdf %>%
tidyr::pivot_wider(names_from = key, values_from = worth)
print(tbl, n = 5)
# Supply: spark> (?? x 12)
mannequin carb cyl drat hp mpg vs wt am disp gear qsec
1 Mazda RX4 4 6 3.9 110 21 0 2.62 1 160 4 16.5
2 Hornet 4 Dr… 1 6 3.08 110 21.4 1 3.22 0 258 3 19.4
3 Hornet Spor… 2 8 3.15 175 18.7 0 3.44 0 360 3 17.0
4 Merc 280C 4 6 3.92 123 17.8 1 3.44 0 168. 4 18.9
5 Merc 450SLC 3 8 3.07 180 15.2 0 3.78 0 276. 3 18
# … with extra rows
Outra maneira de reduzir muitas colunas em menos é usando tidyr::nest
para mover algumas colunas para tabelas aninhadas. Por exemplo, podemos criar uma tabela aninhada perf
encapsulando todos os atributos relacionados ao desempenho de mtcars
(nomeadamente, hp
, mpg
, disp
e qsec
). No entanto, ao contrário dos dataframes R, os Spark Dataframes não têm o conceito de tabelas aninhadas, e o mais próximo das tabelas aninhadas que podemos obter é um perf
coluna contendo estruturas nomeadas com hp
, mpg
, disp
e qsec
atributos:
mtcars_nested_sdf <- mtcars_sdf %>%
tidyr::nest(perf = c(hp, mpg, disp, qsec))
Podemos então inspecionar o tipo de perf
coluna em mtcars_nested_sdf
:
sdf_schema(mtcars_nested_sdf)$perf$kind
(1) "ArrayType(StructType(StructField(hp,DoubleType,true), StructField(mpg,DoubleType,true), StructField(disp,DoubleType,true), StructField(qsec,DoubleType,true)),true)"
e inspecionar elementos estruturais individuais dentro perf
:
hp mpg disp qsec
110.00 21.00 160.00 16.46
Finalmente, também podemos usar tidyr::unnest
para desfazer os efeitos tidyr::nest
:
mtcars_unnested_sdf <- mtcars_nested_sdf %>%
tidyr::unnest(col = perf)
print(mtcars_unnested_sdf, n = 5)
# Supply: spark> (?? x 12)
mannequin cyl drat wt vs am gear carb hp mpg disp qsec
1 Mazda RX4 6 3.9 2.62 0 1 4 4 110 21 160 16.5
2 Hornet 4 Dr… 6 3.08 3.22 1 0 3 1 110 21.4 258 19.4
3 Duster 360 8 3.21 3.57 0 0 3 4 245 14.3 360 15.8
4 Merc 280 6 3.92 3.44 1 0 4 4 123 19.2 168. 18.3
5 Lincoln Con… 8 3 5.42 0 0 3 4 215 10.4 460 17.8
# … with extra rows
Escalador Robusto
RobustScaler é uma nova funcionalidade introduzida no Spark 3.0 (FAÍSCA-28399). Graças a um solicitação pull por @zero323uma interface R para RobustScaler
a saber, o ft_robust_scaler()
função, agora faz parte sparklyr
.
É frequentemente observado que muitos algoritmos de aprendizado de máquina têm melhor desempenho em entradas numéricas padronizadas. Muitos de nós aprendemos nas estatísticas 101 que, dada uma variável aleatória (X)podemos calcular sua média (mu = E(X))desvio padrão (sigma = sqrt{E(X^2) – (E(X))^2})e, em seguida, obter uma pontuação padrão (z = frac{X – mu}{sigma}) que tem média 0 e desvio padrão 1.
No entanto, observe ambos (EX)) e (E(X^2)) acima são quantidades que podem ser facilmente distorcidas por valores discrepantes extremos em (X)causando distorções (z). Um caso particularmente ruim seria se todos os não-outliers entre (X) estão muito perto de (0)portanto fazendo (EX)) perto de (0)enquanto os valores discrepantes extremos estão todos na direção negativa, arrastando assim para baixo (EX)) enquanto distorce (E(X^2)) para cima.
Uma forma alternativa de padronização (X) com base em seus valores de mediana, 1º quartil e 3º quartil, todos robustos contra valores discrepantes, seria o seguinte:
(displaystyle z = frac{X – textual content{Mediana}(X)}{textual content{P75}(X) – textual content{P25}(X)})
e é exatamente isso que RobustScaler ofertas.
Para ver ft_robust_scaler()
em ação e demonstrar sua utilidade, podemos passar por um exemplo inventado que consiste nas seguintes etapas:
- Extraia 500 amostras aleatórias da distribuição regular padrão
(1) -0.626453811 0.183643324 -0.835628612 1.595280802 0.329507772
(6) -0.820468384 0.487429052 0.738324705 0.575781352 -0.305388387
...
- Inspecione os valores mínimo e máximo entre os (500) amostras aleatórias:
(1) -3.008049
(1) 3.810277
- Agora crie (10) outros valores que são discrepantes extremos em comparação com o (500) amostras aleatórias acima. Dado que sabemos tudo (500) amostras estão dentro da faixa de ((-4, 4))podemos escolher (-501, -502, ldots, -509, -510) como nosso (10) discrepantes:
outliers <- -500L - seq(10)
- Copiar tudo (510) valores em um dataframe Spark chamado
sdf
library(sparklyr)
sc <- spark_connect(grasp = "native", model = "3.0.0")
sdf <- copy_to(sc, information.body(worth = c(sample_values, outliers)))
- Podemos então aplicar
ft_robust_scaler()
para obter o valor padronizado para cada entrada:
- Traçar o resultado mostra os pontos de dados não discrepantes sendo dimensionados para valores que ainda formam mais ou menos uma distribuição em forma de sino centrada em torno (0)como esperado, portanto o dimensionamento é robusto contra a influência dos valores discrepantes:
- Finalmente, podemos comparar a distribuição dos valores escalonados acima com a distribuição das pontuações z de todos os valores de entrada e observar como escalonar a entrada apenas com média e desvio padrão teria causado assimetria perceptível – que o escalonador robusto evitou com sucesso:
all_values <- c(sample_values, outliers)
z_scores <- (all_values - imply(all_values)) / sd(all_values)
ggplot(information.body(scaled = z_scores), aes(x = scaled)) +
xlim(-0.05, 0.2) +
geom_histogram(binwidth = 0.005)
- A partir dos 2 gráficos acima, pode-se observar que ambos os processos de padronização produziram algumas distribuições que ainda tinham formato de sino, a produzida por
ft_robust_scaler()
está centrado em torno (0)indicando corretamente a média entre todos os valores não discrepantes, enquanto a distribuição do escore z claramente não está centrada em (0) como o seu centro foi visivelmente deslocado pela (10) valores discrepantes.
RÁPIDAS
Os leitores que acompanham de perto os lançamentos do Apache Spark provavelmente notaram a recente adição de RÁPIDAS Suporte para aceleração de GPU no Spark 3.0. Acompanhando esse desenvolvimento recente, também foi criada uma opção para habilitar RAPIDS em conexões Spark. sparklyr
e enviado em sparklyr
1.4. Em um host com {hardware} compatível com RAPIDS (por exemplo, uma instância do Amazon EC2 do tipo ‘p3.2xlarge’), pode-se instalar sparklyr
1.4 e observe a aceleração de {hardware} RAPIDS sendo refletida nos planos de consulta física do Spark SQL:
library(sparklyr)
sc <- spark_connect(grasp = "native", model = "3.0.0", packages = "rapids")
dplyr::db_explain(sc, "SELECT 4")
== Bodily Plan ==
*(2) GpuColumnarToRow false
+- GpuProject (4 AS 4#45)
+- GpuRowToColumnar TargetSize(2147483647)
+- *(1) Scan OneRowRelation()
Todas as funções de ordem superior recentemente introduzidas no Spark 3.0, como array_sort()
com comparador personalizado, transform_keys()
, transform_values()
e map_zip_with()
são apoiados por sparklyr
1.4.
Além disso, todas as funções de ordem superior agora podem ser acessadas diretamente através dplyr
em vez de seus hof_*
homólogos em sparklyr
. Isto significa, por exemplo, que podemos executar o seguinte dplyr
consultas para calcular o quadrado de todos os elementos da matriz na coluna x
de sdf
e, em seguida, classifique-os em ordem decrescente:
library(sparklyr)
sc <- spark_connect(grasp = "native", model = "3.0.0")
sdf <- copy_to(sc, tibble::tibble(x = record(c(-3, -2, 1, 5), c(6, -7, 5, 8))))
sq_desc <- sdf %>%
dplyr::mutate(x = rework(x, ~ .x * .x)) %>%
dplyr::mutate(x = array_sort(x, ~ as.integer(signal(.y - .x)))) %>%
dplyr::pull(x)
print(sq_desc)
((1))
(1) 25 9 4 1
((2))
(1) 64 49 36 25
Reconhecimento
Em ordem cronológica, gostaríamos de agradecer às seguintes pessoas por suas contribuições para sparklyr
1.4:
Também apreciamos relatórios de bugs, solicitações de recursos e outros comentários valiosos sobre sparklyr
da nossa incrível comunidade de código aberto (por exemplo, o recurso de amostragem ponderada em sparklyr
1.4 foi em grande parte motivado por este Problema no Github arquivado por @ajinge alguns dplyr
correções de bugs relacionadas nesta versão foram iniciadas em #2648 e completou com isso solicitação pull por @wkdavis).
Por último, mas não menos importante, o autor deste submit está extremamente grato pelas fantásticas sugestões editoriais de @javierluraschi, @batpigandmee @skeydan.
Se você deseja saber mais sobre sparklyr
recomendamos verificar sparklyr.ai, spark.rstudio.come também algumas das postagens de lançamento anteriores, como brilhante 1.3 e brilhante 1.2.
Obrigado por ler!