Como Skroutz lida com a evolução do esquema em tempo actual na Amazon Redshift com Debezium


Este publish foi co-autor de Kostas Diamantis de Skroutz.

No Skroutzsomos apaixonados pelo nosso produto e é sempre nossa principal prioridade. Estamos constantemente trabalhando para melhorar e evoluí -lo, apoiado por uma grande e talentosa equipe de engenheiros de software program. A inovação e a evolução contínuas do nosso produto levam a atualizações frequentes, geralmente necessitando de mudanças e acréscimos aos esquemas de nossos bancos de dados operacionais.

Quando decidimos construir nossa própria plataforma de dados para atender às nossas necessidades de dados, como apoiar relatórios, inteligência de negócios (BI) e tomada de decisão, o principal desafio-e também um requisito rigoroso-era para garantir que não bloqueasse ou atrasasse o desenvolvimento de nosso produto.

Nós escolhemos Amazon Redshift Para promover a democratização de dados, capacitando equipes em toda a organização com acesso contínuo a dados, permitindo insights mais rápidos e tomada de decisão mais informada. Essa opção suporta uma cultura de transparência e colaboração, à medida que os dados se tornam prontamente disponíveis para análise e inovação em todos os departamentos.

No entanto, acompanhar as mudanças de esquema de nossos bancos de dados operacionais, enquanto atualiza o knowledge warehouse sem coordenar constantemente com as equipes de desenvolvimento, adiar os lançamentos ou o risco de perda de dados, tornou -se um novo desafio para nós.

Neste publish, compartilhamos como lidamos com a evolução do esquema em tempo actual no Amazon Redshift com Debezium.

Visão geral da solução

A maioria dos nossos dados reside em nossos bancos de dados operacionais, como MariaDB e MongoDB. Nossa abordagem envolve o uso da técnica de captura de dados de alteração (CDC), que lida automaticamente na evolução do esquema das lojas de dados que estão sendo capturadas. Para isso, usamos o Debezium junto com um cluster Kafka. Esta solução permite que as alterações no esquema sejam propagadas sem interromper os consumidores da Kafka.

No entanto, a evolução do esquema de manuseio no Amazon Redshift se tornou um gargalo, levando -nos a desenvolver uma estratégia para enfrentar esse desafio. É importante observar que, no nosso caso, as alterações em nossos bancos de dados operacionais envolvem principalmente a adição de novas colunas, em vez de interromper alterações como alterar os tipos de dados. Portanto, implementamos um processo semi-manual para resolver esse problema, juntamente com um mecanismo de alerta obrigatório para nos notificar sobre quaisquer alterações de esquema. Esse processo de duas etapas consiste em lidar com a evolução do esquema em tempo actual e no tratamento de atualizações de dados em uma etapa handbook assíncrona. O diagrama arquitetônico a seguir ilustra um modelo de implantação híbrida, integrando os componentes locais e baseados em nuvem.

Como Skroutz lida com a evolução do esquema em tempo actual na Amazon Redshift com Debezium

O fluxo de dados começa com dados do MariaDB e MongoDB, capturados usando o Debezium para CDC no modo próximo em tempo actual. Os dados capturados são transmitidos para um cluster de Kafka, onde os consumidores de Kafka (construídos na estrutura Ruby Karafka) leem e escrevam na área de estadiamento, seja no Amazon Redshift ou Amazon Easy Storage Service (Amazon S3). Na área de estadiamento, os Dataloaders promovem os dados para tabelas de produção no Amazon Redshift. Nesse estágio, aplicamos o conceito de dimensão lentamente em mudança (SCD) a essas tabelas, usando o Tipo 7 para a maioria delas.

No knowledge warehousing, um SCD é uma dimensão que armazena dados e, embora geralmente seja estável, pode mudar com o tempo. Várias metodologias abordam as complexidades do gerenciamento de SCD. SCD Tipo 7 coloca a chave substituta e a chave pure na tabela de fatos. Isso permite ao usuário selecionar os registros de dimensão apropriados com base em:

  • An information efetiva primária no registro de fatos
  • As informações mais recentes ou atuais
  • Outras datas associadas ao registro de fatos

Posteriormente, os trabalhos analíticos são executados para criar tabelas de relatório, permitindo os processos de BI e relatórios. O diagrama a seguir fornece um exemplo do processo de modelagem de dados de uma tabela de estadiamento para uma tabela de produção.

Esquema de banco de dados evolução: staging.shops para produção.shops com colunas de tempora e versão adicionais

A arquitetura retratada no diagrama mostra apenas nosso pipeline do CDC, que busca dados de nossos bancos de dados operacionais e não inclui outros pipelines, como os para buscar dados por meio de APIs, processos de lote programados e muito mais. Observe também que nossa convenção é que dw_* As colunas são usadas para capturar informações de metadados de SCD e outros metadados em geral. Nas seções a seguir, discutimos os principais componentes da solução com mais detalhes.

Fluxo de trabalho em tempo actual

Para a parte da evolução do esquema, focamos na coluna dw_md_missing_dataque captura as mudanças de evolução do esquema em tempo actual que ocorrem nos bancos de dados de origem. Quando uma nova alteração é produzida para o cluster Kafka, o consumidor da Kafka é responsável por escrever essa alteração na tabela de estadiamento no Amazon Redshift. Por exemplo, uma mensagem produzida por Debezium ao cluster Kafka terá a seguinte estrutura quando uma nova entidade for criada:

{
  "earlier than": null,
  "after": {
    "id": 1,
    "identify": "shop1",
    "state": "hidden"
  },
  "supply": {
    ...
    "ts_ms": "1704114000000",
    ...
  },
  "op": "c",
  ...
}

O consumidor Kafka é responsável por preparar e executar a instrução SQL Insert:

INSERT INTO staging.outlets (
  id,
  "identify",
  state,
  dw_md_changed_at,
  dw_md_operation,
  dw_md_missing_data
)
VALUES
  (
    1,
    'shop1',
    'hidden',
    '2024-01-01 13:00:00',
    'create',
    NULL
  )
;

Depois disso, digamos que uma nova coluna seja adicionada à tabela de origem chamada new_columncom o valor new_value.
A nova mensagem produzida para o cluster Kafka terá o seguinte formato:

{
  "earlier than": { ... },
  "after": {
    "id": 1,
    "identify": "shop1",
    "state": "hidden",
    "new_column": "new_value"
  },
  "supply": {
    ...
    "ts_ms": "1704121200000"
    ...
  },
  "op": "u"
  ...
}

Agora, a declaração SQL Insert executada pelo consumidor Kafka será a seguinte:

INSERT INTO staging.outlets (
  id,
  "identify",
  state,
  dw_md_changed_at,
  dw_md_operation,
  dw_md_missing_data
)
VALUES
  (
    1,
    'shop1',
    'hidden',
    '2024-01-01 15:00:00',
    'replace',
    JSON_PARSE('{"new_column": "new_value"}') /* <-- examine this */
  )
;

O consumidor realiza uma inserção como faria para o esquema conhecido, e qualquer coisa nova é adicionada ao dw_md_missing_data coluna como JSON de valor-chave. Depois que os dados forem promovidos da tabela de preparação para a tabela de produção, eles terão a seguinte estrutura.

Tabela de produção.

Nesse ponto, o fluxo de dados continua em execução sem qualquer perda de dados ou a necessidade de comunicação com as equipes responsáveis ​​por manter o esquema nos bancos de dados operacionais. No entanto, esses dados podem não ser facilmente acessíveis para os consumidores de dados, analistas ou outras personas. Vale a pena notar que dw_md_missing_data é definido como uma coluna do Tipo Tremendous Knowledge, que foi introduzido no Amazon Redshift para armazenar dados ou documentos semiestruturados como valores.

Mecanismo de monitoramento

Para rastrear novas colunas adicionadas a uma tabela, temos um processo programado que é executado semanalmente. Este processo verifica as tabelas no Amazon Redshift com valores no dw_md_missing_data coluna e gera uma lista de tabelas que exigem ação handbook para disponibilizar esses dados por meio de um esquema estruturado. Uma notificação é então enviada para a equipe.

Etapas de remediação handbook

No exemplo acima mencionado, as etapas manuais para disponibilizar esta coluna seriam:

  1. Adicione as novas colunas às tabelas de preparação e produção:
ALTER TABLE staging.outlets ADD COLUMN new_column varchar(255);
ALTER TABLE manufacturing.outlets ADD COLUMN new_column varchar(255);

  1. Atualize o esquema conhecido do Kafka Client. Nesta etapa, precisamos apenas adicionar o novo nome da coluna a uma lista de matrizes simples. Por exemplo:
class ShopsConsumer < ApplicationConsumer
  SOURCE_COLUMNS = (
    'id',
    'identify',
    'state',
    'new_column' # this one is the brand new column
  )
 
  def eat
    # Ruby code for:
    #   1. knowledge cleansing
    #   2. knowledge transformation
    #   3. preparation of the SQL INSERT assertion
 
    RedshiftClient.conn.exec <<~SQL
      /*
        generated SQL INSERT assertion
      */
    SQL
  finish
finish

  1. Atualize a lógica SQL do Dataloader para a nova coluna. Um Dataloader é responsável por promover os dados da área de preparação para a tabela de produção.
class DataLoader::ShopsTable < DataLoader::Base
  class << self
    def load
      RedshiftClient.conn.exec <<~SQL
        CREATE TABLE staging.shops_new (LIKE staging.outlets);
      SQL
 
      RedshiftClient.conn.exec <<~SQL
        /*
          We transfer the info to a brand new desk as a result of in staging.outlets
          the Kafka shopper will proceed add new rows
        */
        ALTER TABLE staging.shops_new APPEND FROM staging.outlets;
      SQL
 
      RedshiftClient.conn.exec <<~SQL
        BEGIN;
          /*
            SQL to deal with
              * knowledge deduplications and many others
              * extra transformations
              * all the required operations so as to apply the info modeling we want for this desk
          */
 
          INSERT INTO manufacturing.outlets (
            id,
            identify,
            state,
            new_column, /* --> this one is the brand new column <-- */
            dw_start_date,
            dw_end_date,
            dw_current,
            dw_md_changed_at,
            dw_md_operation,
            dw_md_missing_data
          )
          SELECT
            id,
            identify,
            state,
            new_column, /* --> this one is the brand new column <-- */
            /*
              right here is the logic to use the info modeling (sort 1,2,3,4...7)
            */
          FROM
            staging.shops_new
          ;
 
          DROP TABLE staging.shops_new;
        END TRANSACTION;
      SQL
    finish
  finish
finish

  1. Transferir os dados que foram carregados nesse meio tempo a partir do dw_md_missing_data Tremendous coluna para a coluna recém -adicionada e depois limpe. Nesta etapa, só precisamos executar uma migração de dados como o seguinte:
BEGIN;
 
  /*
    Switch the info from the `dw_md_missing_data` to the corresponding column
  */
  UPDATE manufacturing.outlets
  SET new_column = dw_md_missing_data.new_column::varchar(255)
  WHERE dw_md_missing_data.new_column IS NOT NULL;
 
  /*
    Clear up dw_md_missing_data column
  */
  UPDATE manufacturing.outlets
  SET dw_md_missing_data = NULL
  WHERE dw_md_missing_data IS NOT NULL;
 
END TRANSACTION;

Para executar as operações anteriores, garantimos que ninguém mais understand alterações no manufacturing.outlets tabela porque queremos que nenhum novo dados seja adicionado ao dw_md_missing_data coluna.

Conclusão

A solução discutida nesta postagem permitiu que a Skroutz gerenciasse a evolução do esquema em bancos de dados operacionais enquanto atualizava perfeitamente o knowledge warehouse. Isso aliviou a necessidade de coordenação constante da equipe de desenvolvimento e removeu os riscos de perda de dados durante os lançamentos, promovendo a inovação em vez de sufocá -la.

À medida que a migração de Skroutz para as abordagens da AWS Cloud, as discussões estão em andamento sobre como a arquitetura atual pode ser adaptada para se alinhar mais de perto com os princípios centrados na AWS. Para esse fim, uma das mudanças consideradas é a ingestão de streaming de desvio para o Amazon de Amazon gerenciou streaming para Apache Kafka (Amazon MSK) ou Kafka de código aberto, o que possibilitará que a Skroutz processe grandes volumes de dados de streaming de várias fontes com baixa latência e alta taxa de transferência para obter informações em segundos.

Se você enfrentar desafios semelhantes, discuta com um representante da AWS e trabalhe para trás do seu caso de uso para fornecer a solução mais adequada.


Sobre os autores

Konstantina Mavrodimitraki é uma arquiteta sênior de soluções da Amazon Internet Companies, onde ajuda os clientes a projetar sistemas escaláveis, robustos e seguros nos mercados globais. Com profunda experiência em estratégia de dados, knowledge warehousing e sistemas de massive knowledge, ela ajuda as organizações a transformar suas paisagens de dados. Tecnólogo apaixonado e pessoa de pessoas, Konstantina adora explorar tecnologias emergentes e apoia as comunidades de tecnologia locais. Além disso, ela gosta de ler livros e brincar com seu cachorro.

Kostas Diamantis é o chefe do knowledge warehouse da Skroutz Firm. Com experiência em engenharia de software program, ele passou para a engenharia de dados, usando sua experiência técnica para criar soluções de dados escaláveis. Apaixonado pela tomada de decisões orientada a dados, ele se concentra na otimização de pipelines de dados, aprimorando os recursos de análise e impulsionando as idéias de negócios.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *