Think about que você tem alguns dados de streaming. Pode ser de um sensor da Web das Coisas (IoT), ingestão de dados de log ou até mesmo dados de impressão do comprador. Independentemente da fonte, você foi encarregado de agir com base nos dados – alertando ou acionando quando algo ocorre. Martin Fowler diz: “Você mesmo pode construir um mecanismo de regras simples. Tudo o que você precisa é criar vários objetos com condições e ações, armazená-los em uma coleção e percorrê-los para avaliar as condições e executar as ações.”
UM mecanismo de regras de negócios (ou simplesmente mecanismo de regras) é um sistema de software program que executa muitas regras com base em alguma entrada para determinar alguma saída. De forma simplista, são muitas declarações “se então”, “e” e “ou” que são avaliadas em alguns dados. Existem muitos sistemas de regras de negócios diferentes, como Drools, OpenL Tablets ou mesmo RuleBook, e todos eles compartilham um ponto em comum: eles definem regras (coleção de objetos com condições) que são executadas (avaliam as condições) para derivar uma saída (executar as ações). O seguinte é um exemplo simplista:
Quando uma única condição ou uma composição de condições é avaliada como verdadeira, deseja-se enviar um alerta para potencialmente agir sobre esse evento (acionar o calor para aquecer a sala a 50 graus).
Esta postagem demonstra como implementar um mecanismo de regras dinâmicas usando Serviço gerenciado da Amazon para Apache Flink. Nossa implementação oferece a capacidade de criar regras dinâmicas que podem ser criadas e atualizadas sem a necessidade de alterar ou reimplantar o código subjacente ou a implementação do próprio mecanismo de regras. Discutimos a arquitetura, os principais serviços da implementação, alguns detalhes de implementação que você pode usar para construir seu próprio mecanismo de regras e um Equipment de desenvolvimento de nuvem AWS (AWS CDK) para implantá-lo em sua própria conta.
Visão geral da solução
O fluxo de trabalho da nossa solução começa com a ingestão dos dados. Presumimos que temos alguns dados de origem. Poderia ser de vários lugares, mas para esta demonstração, usamos dados de streaming (dados do sensor IoT) como dados de entrada. É sobre isso que avaliaremos nossas regras. Por exemplo, vamos supor que estamos analisando dados de nosso termostato doméstico AnyCompany. Veremos atributos como temperatura, ocupação, umidade e muito mais. O termostato publica os respectivos valores a cada 1 minuto, então basearemos nossas regras nessa ideia. Como estamos ingerindo esses dados quase em tempo actual, precisamos de um serviço projetado especificamente para esse caso de uso. Para esta solução usamos Fluxos de dados do Amazon Kinesis.
Em um mecanismo de regras tradicional, pode haver uma lista finita de regras. A criação de novas regras provavelmente envolveria uma revisão e reimplantação da base de código, uma substituição de algum arquivo de regras ou algum processo de substituição. No entanto, um mecanismo de regras dinâmicas é diferente. Assim como nossos dados de entrada de streaming, nossas regras também podem ser transmitidas. Aqui podemos usar o Kinesis Knowledge Streams para transmitir nossas regras à medida que elas são criadas.
Neste ponto, temos dois fluxos de dados:
- Os dados brutos do nosso termostato
- As regras de negócios talvez criadas por meio de uma interface de usuário
O diagrama a seguir ilustra que podemos conectar esses fluxos.
Conectando fluxos
Um caso de uso típico do serviço gerenciado para Apache Flink é consultar e analisar dados de forma interativa em tempo actual e produzir continuamente insights para casos de uso urgentes. Pensando nisso, se você tiver uma regra que corresponda à queda da temperatura abaixo de um determinado valor (principalmente no inverno), pode ser basic avaliar e produzir um resultado o mais oportuno possível.
Os conectores Apache Flink são componentes de software program que movem dados para dentro e para fora de um aplicativo Managed Service for Apache Flink. Conectores são integrações flexíveis que permitem ler arquivos e diretórios. Eles consistem em módulos completos para interação com serviços AWS e sistemas de terceiros. Para obter mais detalhes sobre conectores, consulte Use conectores Apache Flink com serviço gerenciado para Apache Flink.
Usamos dois tipos de conectores (operadores) para esta solução:
- Fontes – Forneça entrada para seu aplicativo a partir de um fluxo de dados, arquivo ou outra fonte de dados do Kinesis
- Pias – Envie a saída do seu aplicativo para um fluxo de dados do Kinesis, Amazon Knowledge Firehose fluxo ou outro destino de dados
Os aplicativos Flink são fluxos de dados de streaming que podem ser transformados por operadores definidos pelo usuário. Esses fluxos de dados formam gráficos direcionados que começam com uma ou mais fontes e terminam em um ou mais sumidouros. O diagrama a seguir ilustra um exemplo de fluxo de dados (fonte). Conforme discutido anteriormente, temos dois fluxos de dados Kinesis que podem ser usados como fontes para nosso programa Flink.
O trecho de código a seguir mostra como configuramos nossas fontes Kinesis em nosso código Flink:
Usamos um estado de transmissãoque pode ser usado para combinar e processar conjuntamente dois fluxos de eventos de uma maneira específica. Um estado de transmissão é uma boa opção para aplicativos que precisam unir um fluxo de baixo rendimento e um fluxo de alto rendimento ou que precisam atualizar dinamicamente sua lógica de processamento. O diagrama a seguir ilustra um exemplo de como o estado de transmissão está conectado. Para mais detalhes, consulte Um guia prático para transmitir estado no Apache Flink.
Isso se encaixa na ideia do nosso mecanismo de regras dinâmicas, onde temos um fluxo de regras de baixo rendimento (adicionado conforme necessário) e um fluxo de transações de alto rendimento (chegando em intervalos regulares, como uma por minuto). Esse fluxo de transmissão nos permite pegar nosso fluxo de transações (ou os dados do termostato) e conectá-lo ao fluxo de regras, conforme mostrado no seguinte trecho de código:
Para saber mais sobre o estado de transmissão, consulte O padrão de estado de transmissão. Quando o fluxo de transmissão está conectado ao fluxo de dados (como no exemplo anterior), ele se torna um BroadcastConnectedStream
. A função aplicada a este fluxo, que nos permite processar as transações e regras, implementa o processBroadcastElement
método. O KeyedBroadcastProcessFunction
interface fornece três métodos para processar registros e emitir resultados:
- processBroadcastElement() – Isso é chamado para cada registro do stream transmitido (nosso stream de regras).
- processElement() – Isso é chamado para cada registro do fluxo codificado. Ele fornece acesso somente leitura ao estado de transmissão para evitar modificações que resultem em diferentes estados de transmissão nas instâncias paralelas da função. O
processElement
O método recupera a regra do estado de transmissão e o evento de sensor anterior do estado codificado. Se a expressão for avaliada comoTRUE
(discutido na próxima seção), um alerta será emitido. - onTimer() – Isto é chamado quando um temporizador previamente registrado é acionado. Os temporizadores podem ser registrados no
processElement
método e são usados para realizar cálculos ou limpar estados no futuro. Isso é usado em nosso código para garantir que quaisquer dados antigos (conforme definido por nossa regra) sejam removidos conforme necessário.
Podemos lidar com a regra na instância do estado de transmissão da seguinte maneira:
Observe o que acontece no código quando o standing da regra é INACTIVE
. Isto removeria a regra do estado de transmissão, que então não consideraria mais a regra a ser usada. Da mesma forma, lidar com a transmissão de uma regra que é ACTIVE
adicionaria ou substituiria a regra dentro do estado de transmissão. Isso nos permite fazer alterações dinamicamente, adicionando e removendo regras conforme necessário.
Avaliando regras
As regras podem ser avaliadas de diversas maneiras. Embora não seja um requisito, nossas regras foram criadas de forma Linguagem de Expressão Java (JEXL) formato compatível. Isso nos permite avaliar regras fornecendo uma expressão JEXL junto com o contexto apropriado (as transações necessárias para reavaliar a regra ou pares de valores-chave) e simplesmente chamando o método de avaliação:
Um recurso poderoso do JEXL é que ele não apenas suporta expressões simples (como aquelas que incluem comparação e aritmética), mas também possui suporte para funções definidas pelo usuário. JEXL permite chamar qualquer método em um objeto Java usando a mesma sintaxe. Se houver um POJO com o nome SENSOR_cebb1baf_2df0_4267_b489_28be562fccea
que tem o método hasNotChanged
você chamaria esse método usando a expressão. Você pode encontrar mais dessas funções definidas pelo usuário que usamos em nosso SensorMapState
aula.
Vejamos um exemplo de como isso funcionaria, usando uma expressão de regra que diz o seguinte:
"SENSOR_cebb1baf_2df0_4267_b489_28be562fccea.hasNotChanged(5)"
Esta regra, avaliada pela JEXL, seria equivalente a um sensor que não muda há 5 minutos
A função correspondente definida pelo usuário (parte do SensorMapState
) que é exposto ao JEXL (usando o contexto) é o seguinte:
Dados relevantes, como os abaixo, iriam para a janela de contexto, que seria então usada para avaliar a regra.
Neste caso, o resultado (ou valor de isAlertTriggered
) é TRUE
.
Criando coletores
Assim como criamos fontes anteriormente, também podemos criar sumidouros. Esses coletores serão usados como ultimate de nosso processamento de fluxo, onde nossos resultados analisados e avaliados serão emitidos para uso futuro. Assim como nossa fonte, nosso coletor também é um fluxo de dados do Kinesis, onde um consumidor Lambda downstream irá iterar os registros e processá-los para tomar a ação apropriada. Existem muitas aplicações de processamento downstream; por exemplo, podemos persistir esse resultado de avaliação, criar uma notificação push ou atualizar um painel de regras.
Com base na avaliação anterior, temos a seguinte lógica dentro da própria função do processo:
Quando a função de processo emite o alerta, a resposta do alerta é enviada ao coletor, que pode então ser lida e usada posteriormente na arquitetura:
Neste ponto, podemos processá-lo. Temos uma função Lambda registrando os registros onde podemos ver o seguinte:
Embora simplificados neste exemplo, esses trechos de código formam a base para obter os resultados da avaliação e enviá-los para outro lugar.
Conclusão
Nesta postagem, demonstramos como implementar um mecanismo de regras dinâmicas usando o serviço gerenciado para Apache Flink com as regras e os dados de entrada transmitidos por meio do Kinesis Knowledge Streams. Você pode aprender mais sobre isso com o e-learning que temos disponível.
À medida que as empresas procuram implementar mecanismos de regras quase em tempo actual, esta arquitetura apresenta uma solução atraente. O serviço gerenciado para Apache Flink oferece recursos avançados para transformar e analisar dados de streaming em tempo actual, ao mesmo tempo que simplifica o gerenciamento de cargas de trabalho do Flink e integra-se perfeitamente a outros serviços da AWS.
Para ajudá-lo a começar com esta arquitetura, temos o prazer de anunciar que publicaremos nosso código completo do mecanismo de regras como amostra no GitHub. Este exemplo abrangente irá além dos trechos de código fornecidos em nossa postagem, oferecendo uma visão mais aprofundada das complexidades da construção de um mecanismo de regras dinâmicas com o Flink.
Incentivamos você a explorar este código de exemplo, adaptá-lo ao seu caso de uso específico e aproveitar todo o potencial do processamento de dados em tempo actual em seus aplicativos. Confira o Repositório GitHube não hesite em entrar em contato conosco caso tenha dúvidas ou comentários ao embarcar em sua jornada com o Flink e a AWS!
Sobre os Autores
Steven Carpinteiro é desenvolvedor sênior de soluções na equipe de prototipagem e engenharia de clientes da AWS Industries (PACE), ajudando os clientes da AWS a dar vida a ideias inovadoras por meio da prototipagem rápida na plataforma AWS. Ele possui mestrado em Ciência da Computação pela Wayne State College em Detroit, Michigan. Conecte-se com Steven no LinkedIn!
Aravindharaj Rajendran é desenvolvedor sênior de soluções na equipe de prototipagem e engenharia de clientes da AWS Industries (PACE), com sede em Herndon, VA. Ele ajuda os clientes da AWS a materializar suas ideias inovadoras por meio da prototipagem rápida usando a plataforma AWS. Fora do trabalho, ele adora jogar no PC, badminton e viajar.