resumos de quantis ponderados, cluster de iteração de energia, spark_write_rds() e muito mais


Sparklyr 1.6 já está disponível em CRAN!

Para instalar sparklyr 1.6 do CRAN, execute

Nesta postagem do weblog, destacaremos os seguintes recursos e melhorias do sparklyr 1.6:

Resumos de quantis ponderados

Apache Faísca é bem conhecido por oferecer suporte a algoritmos aproximados que trocam quantidades marginais de precisão por maior velocidade e paralelismo. Tais algoritmos são particularmente benéficos para realizar explorações preliminares de dados em escala, pois permitem aos usuários consultar rapidamente certas estatísticas estimadas dentro de uma margem de erro predefinida, evitando ao mesmo tempo o alto custo de cálculos exatos. Um exemplo é o algoritmo de Greenwald-Khanna para cálculo on-line de resumos de quantis, conforme descrito em Greenwald e Khanna (2001). Este algoritmo foi originalmente projetado para eficiência (épsilon)– aproximação de quantis dentro de um grande conjunto de dados sem a noção de pontos de dados com pesos diferentes, e a versão não ponderada dela foi implementada como
approxQuantile()

desde o Spark 2.0. No entanto, o mesmo algoritmo pode ser generalizado para lidar com entradas ponderadas e, como sparklyr usuário @Zhuk66 mencionado em este problemaum
versão ponderada
deste algoritmo é útil sparklyr recurso.

Para explicar adequadamente o que significa quantil ponderado, devemos esclarecer o que significa o peso de cada ponto de dados. Por exemplo, se tivermos uma sequência de observações ((1, 1, 1, 1, 0, 2, -1, -1))e gostaríamos de aproximar a mediana de todos os pontos de dados, então temos as duas opções a seguir:

  • Execute a versão não ponderada de approxQuantile() no Spark para verificar todos os 8 pontos de dados

  • Ou alternativamente, “compacte” os dados em 4 tuplas de (valor, peso):
    ((1, 0,5), (0, 0,125), (2, 0,125), (-1, 0,25))onde o segundo componente de cada tupla representa a frequência com que um valor ocorre em relação ao restante dos valores observados e, em seguida, encontre a mediana examinando as 4 tuplas usando a versão ponderada do algoritmo de Greenwald-Khanna

Também podemos analisar um exemplo inventado envolvendo a distribuição regular padrão para ilustrar o poder da estimativa de quantil ponderado em
sparklyr 1.6. Suponha que não possamos simplesmente executar qnorm() em R para avaliar o
função quantil
da distribuição regular padrão em (p ​​= 0,25) e (p ​​= 0,75)como podemos ter uma ideia vaga sobre o primeiro e o terceiro quantis desta distribuição? Uma maneira é amostrar um grande número de pontos de dados desta distribuição e, em seguida, aplicar o algoritmo de Greenwald-Khanna às nossas amostras não ponderadas, conforme mostrado abaixo:

library(sparklyr)

sc <- spark_connect(grasp = "native")

num_samples <- 1e6
samples <- information.body(x = rnorm(num_samples))

samples_sdf <- copy_to(sc, samples, title = random_string())

samples_sdf %>%
  sdf_quantile(
    column = "x",
    chances = c(0.25, 0.75),
    relative.error = 0.01
  ) %>%
  print()
##        25%        75%
## -0.6629242  0.6874939

Observe que porque estamos trabalhando com um algoritmo aproximado e especificamos
relative.error = 0.01o valor estimado de (-0,6629242) acima pode estar em qualquer lugar entre o 24º e o 26º percentil de todas as amostras. Na verdade, cai no (25.36896)-ésimo percentil:

## (1) 0.2536896

Agora, como podemos fazer uso da estimativa de quantil ponderado de sparklyr 1.6 para obter resultados semelhantes? Simples! Podemos amostrar um grande número de (x) valores uniformemente aleatórios de ((-infty, infty)) (ou alternativamente, basta selecionar um grande número de valores espaçados uniformemente entre ((-M, M)) onde (M) é aproximadamente (infante)) e atribua cada (x) avaliar um peso de
(displaystyle frac{1}{sqrt{2 pi}}e^{-frac{x^2}{2}})a densidade de probabilidade da distribuição regular padrão em (x). Finalmente, executamos a versão ponderada de sdf_quantile() de sparklyr 1.6, conforme mostrado abaixo:

library(sparklyr)

sc <- spark_connect(grasp = "native")

num_samples <- 1e6
M <- 1000
samples <- tibble::tibble(
  x = M * seq(-num_samples / 2 + 1, num_samples / 2) / num_samples,
  weight = dnorm(x)
)

samples_sdf <- copy_to(sc, samples, title = random_string())

samples_sdf %>%
  sdf_quantile(
    column = "x",
    weight.column = "weight",
    chances = c(0.25, 0.75),
    relative.error = 0.01
  ) %>%
  print()
##    25%    75%
## -0.696  0.662

Voilá! As estimativas não estão muito distantes dos percentis 25 e 75 (em relação ao nosso erro máximo admissível de (0,01)):

## (1) 0.2432144
## (1) 0.7460144

Clustering de iteração avançada

Energy iteration clustering (PIC), um método de cluster de gráfico simples e escalável apresentado em Lin e Cohen (2010)primeiro encontra uma incorporação de baixa dimensão de um conjunto de dados, usando iteração de potência truncada em uma matriz de similaridade de pares normalizada de todos os pontos de dados e, em seguida, usa essa incorporação como o “indicador de cluster”, uma representação intermediária do conjunto de dados que leva a uma rápida convergência quando usada como entrada para agrupamento k-means. Este processo está muito bem ilustrado na figura 1 do Lin e Cohen (2010) (reproduzido abaixo)

resumos de quantis ponderados, cluster de iteração de energia, spark_write_rds() e muito mais

em que a imagem mais à esquerda é a visualização de um conjunto de dados composto por 3 círculos, com pontos coloridos em vermelho, verde e azul indicando resultados de agrupamento, e as imagens subsequentes mostram o processo de iteração de energia transformando gradualmente o conjunto authentic de pontos no que parece ser ser três segmentos de linha disjuntos, uma representação intermediária que pode ser rapidamente separada em 3 clusters usando agrupamento k-means com (ok = 3).

Em sparklyr 1.6, ml_power_iteration() foi implementado para tornar o
Funcionalidade PIC
no Spark acessível a partir de R. Ele espera como entrada um dataframe Spark de 3 colunas que representa uma matriz de similaridade de pares de todos os pontos de dados. Duas das colunas neste dataframe devem conter índices de linhas e colunas baseados em 0, e a terceira coluna deve conter a medida de similaridade correspondente. No exemplo abaixo, veremos um conjunto de dados composto por dois círculos sendo facilmente separados em dois clusters por ml_power_iteration()com o kernel gaussiano sendo usado como medida de similaridade entre quaisquer 2 pontos:

gen_similarity_matrix <- perform() {
  # Guassian similarity measure
  guassian_similarity <- perform(pt1, pt2) {
    exp(-sum((pt2 - pt1) ^ 2) / 2)
  }
  # generate evenly distributed factors on a circle centered on the origin
  gen_circle <- perform(radius, num_pts) {
    seq(0, num_pts - 1) %>%
      purrr::map_dfr(
        perform(idx) {
          theta <- 2 * pi * idx / num_pts
          radius * c(x = cos(theta), y = sin(theta))
        })
  }
  # generate factors on each circles
  pts <- rbind(
    gen_circle(radius = 1, num_pts = 80),
    gen_circle(radius = 4, num_pts = 80)
  )
  # populate the pairwise similarity matrix (saved as a 3-column dataframe)
  similarity_matrix <- information.body()
  for (i in seq(2, nrow(pts)))
    similarity_matrix <- similarity_matrix %>%
      rbind(seq(i - 1L) %>%
        purrr::map_dfr(~ listing(
          src = i - 1L, dst = .x - 1L,
          similarity = guassian_similarity(pts(i,), pts(.x,))
        ))
      )

  similarity_matrix
}

library(sparklyr)

sc <- spark_connect(grasp = "native")
sdf <- copy_to(sc, gen_similarity_matrix())
clusters <- ml_power_iteration(
  sdf, ok = 2, max_iter = 10, init_mode = "diploma",
  src_col = "src", dst_col = "dst", weight_col = "similarity"
)

clusters %>% print(n = 160)
## # A tibble: 160 x 2
##        id cluster
##        
##   1     0       1
##   2     1       1
##   3     2       1
##   4     3       1
##   5     4       1
##   ...
##   157   156       0
##   158   157       0
##   159   158       0
##   160   159       0

A saída mostra pontos dos dois círculos sendo atribuídos a clusters separados, conforme esperado, após apenas um pequeno número de iterações do PIC.

spark_write_rds() + collect_from_rds()

spark_write_rds() e collect_from_rds() são implementados como uma alternativa que consome menos memória para gather(). Diferente gather()que recupera todos os elementos de um dataframe Spark por meio do nó do driver Spark, podendo causar lentidão ou falhas de falta de memória ao coletar grandes quantidades de dados,
spark_write_rds()quando usado em conjunto com collect_from_rds()pode recuperar todas as partições de um dataframe do Spark diretamente dos trabalhadores do Spark, em vez de por meio do nó do driver do Spark. Primeiro, spark_write_rds() distribuirá as tarefas de serialização de partições de dataframe do Spark no formato RDS versão 2 entre os trabalhadores do Spark. Os trabalhadores do Spark podem então processar várias partições em paralelo, cada uma manipulando uma partição por vez e persistindo a saída RDS diretamente no disco, em vez de enviar partições de dataframe para o nó do driver do Spark. Finalmente, as saídas RDS podem ser remontadas em dataframes R usando
collect_from_rds().

Abaixo é mostrado um exemplo de spark_write_rds() + collect_from_rds() uso, onde as saídas RDS são primeiro salvas no HDFS e depois baixadas para o sistema de arquivos native com hadoop fs -gete, finalmente, pós-processado com
collect_from_rds():

library(sparklyr)
library(nycflights13)

num_partitions <- 10L
sc <- spark_connect(grasp = "yarn", spark_home = "/usr/lib/spark")
flights_sdf <- copy_to(sc, flights, repartition = num_partitions)

# Spark employees serialize all partition in RDS format in parallel and write RDS
# outputs to HDFS
spark_write_rds(
  flights_sdf,
  dest_uri = "hdfs://:8020/flights-part-{partitionId}.rds"
)

# Run `hadoop fs -get` to obtain RDS information from HDFS to native file system
for (partition in seq(num_partitions) - 1)
  system2(
    "hadoop",
    c("fs", "-get", sprintf("hdfs://:8020/flights-part-%d.rds", partition))
  )

# Publish-process RDS outputs
partitions <- seq(num_partitions) - 1 %>%
  lapply(perform(partition) collect_from_rds(sprintf("flights-part-%d.rds", partition)))

# Optionally, name `rbind()` to mix information from all partitions right into a single R dataframe
flights_df <- do.name(rbind, partitions)

Semelhante a outros recentes sparklyr lançamentos, sparklyr 1.6 vem com uma série de melhorias relacionadas ao dplyr, como

  • Suporte para the place() predicado dentro choose() e summarize(throughout(...))
    operações em dataframes Spark
  • Adição de if_all() e if_any() funções
  • Compatibilidade complete com dbplyr API de back-end 2.0

choose(the place(...)) e summarize(throughout(the place(...)))

O dplyr the place(...) assemble é útil para aplicar uma função de seleção ou agregação a múltiplas colunas que satisfazem algum predicado booleano. Por exemplo,

retorna todas as colunas numéricas do iris conjunto de dados e

calcula a média de cada coluna numérica.

No sparklyr 1.6, ambos os tipos de operações podem ser aplicados aos dataframes do Spark, por exemplo,

if_all() e if_any()

if_all() e if_any() são duas funções de conveniência de dplyr 1.0.4 (ver
aqui para mais detalhes) que combinam efetivamente os resultados da aplicação de um predicado booleano a uma seleção organizada de colunas usando o método lógico and/or operadores.

A partir do sparklyr 1.6, if_all() e if_any() também pode ser aplicado a dataframes Spark, .eg,

Compatibilidade com dbplyr API de back-end 2.0

Sparklyr 1.6 é totalmente compatível com o mais recente dbplyr API de back-end 2.0 (implementando todas as alterações de interface recomendadas em
aqui), mantendo a compatibilidade retroativa com a edição anterior do dbplyr API, para que sparklyr os usuários não serão forçados a mudar para nenhuma versão específica do
dbplyr.

Esta deve ser uma mudança quase invisível ao usuário a partir de agora. Na verdade, a única mudança de comportamento discernível será o seguinte código

saída

(1) 2

se sparklyr está trabalhando com dbplyr 2.0+ e

(1) 1

caso contrário.

Agradecimentos

Em ordem cronológica, gostaríamos de agradecer aos seguintes colaboradores por terem feito sparklyr 1.6 incrível:

Gostaríamos também de agradecer à maravilhosa comunidade de código aberto por trás sparklyrsem os quais não teríamos beneficiado de numerosos
sparklyrrelatórios de bugs relacionados e sugestões de recursos.

Finalmente, o autor desta postagem do weblog também aprecia muito as sugestões editoriais altamente valiosas de @skeydan.

Se você deseja saber mais sobre sparklyrrecomendamos verificar
sparklyr.ai, spark.rstudio.come também alguns anteriores sparklyr lançar postagens como
brilhante 1.5
e brilhante 1.4.

Isso é tudo. Obrigado por ler!

Greenwald, Michael e Sanjeev Khanna. 2001. “Computação on-line de resumos de quantis com eficiência de espaço.” SIGMOD Rec. 30 (2): 58–66. https://doi.org/10.1145/376284.375670.

Lin, Frank e William Cohen. 2010. “Clustering de iteração de energia.” Em, 655-62.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *