BIG DATA - Importação incremental via Sqoop



Fala Galerinha, passei um tempo sumido devido a necessidade de um tempo dedicado a estudo de coisas novas pra trazer ao nosso blog. Nestes estudos, acabei focando um pouco sobre as premissas do tão falado BiiiiiiiiG Data, isso mesmo. E nest post, vou mostrar pra vocês uma técnica para importação de fontes através de sqoop de forma incremental. Então vamos lá!


. Introdução

O Sqoop é uma ferramenta projetada para transferir dados entre o Hadoop e bancos de dados relacionais ou mainframes. Você pode usar o Sqoop para importar dados de um sistema de gerenciamento de banco de dados relacional (RDBMS) como MySQL ou Oracle ou um mainframe para o HDFS (Hadoop Distributed File System), transformar os dados no Hadoop MapReduce e exportar os dados de volta para um RDBMS .

O Sqoop automatiza a maior parte desse processo, confiando no banco de dados para descrever o esquema para os dados a serem importados. O Sqoop usa o MapReduce para importar e exportar os dados, o que fornece operação em paralelo, bem como tolerância a falhas.

Este documento descreve como começar a usar o Sqoop para mover dados entre bancos de dados e Hadoop ou mainframe para o Hadoop e fornece informações de referência para a operação do conjunto de ferramentas de linha de comando Sqoop. Este documento destina-se a:
·         Programadores de sistema e aplicativos
·         Administradores do sistema
·         Administradores de banco de dados
·         Analistas de dados
·         Engenheiros de dados

O Apache Sqoop é uma ferramenta projetada para transferir grandes quantidades de dados entre o Apache Hadoop e armazenamentos de dados estruturados, como bancos de dados relacionais. O Sqoop fornece uma ferramenta de importação incremental que você pode usar para recuperar somente as linhas que são mais recentes que algumas linhas de conjunto importadas anteriormente. E a ferramenta de mesclagem Sqoop permite combinar dois conjuntos de dados, em que as entradas em um conjunto de dados substituem entradas em um conjunto de dados mais antigo.

Vamos examinar um cenário típico em detalhes. Considere um RDMS remoto (DB2, por exemplo) com uma tabela TURMA.PRESENCA que contenha informações básicas sobre os alunos. A tabela PRESENCA inclui uma coluna denominada ID_REGISTRO (INTEGER) e uma coluna denominada DATA_ALTERCAO (DATE).

Você pode usar a ferramenta de importação incremental Sqoop para atualizar as informações mais recentes do aluno do DB2 para o Hive, conforme mostrado no exemplo a seguir:

sqoop import -incremental lastmodified --check-column DATA --last-value 2017-02-08...

A operação de importação incremental é executada com base em valores na coluna TIME e importa registros de "2017-02-08" para atual. Informações correspondentes no log do Sqoop podem se parecer com o seguinte exemplo:

17/02/08 22:44:47 INFO tool.ImportTool: Incremental import based on column DATA 17/02/08 22:44:47 INFO tool.ImportTool: Lower bound value: '2017-02-08' 17/02/08 22:44:47 INFO tool.ImportTool: Upper bound value: '2017-02-09 22:44:47.564217'

A tabela Hive TURMA.PRESENCA é atualizada com os registros mais recentes e um novo arquivo é criado no diretório da tabela no HDFS:

 [ambari-qa@mauve1 ~]$ hadoop fs -ls /apps/hive/warehouse/student Found 2 items -rwxrwx--- 1 ambari-qa hadoop 26 2017-02-08 23:13 /apps/hive/warehouse/student/part-m-00000 -rwxrwx--- 1 ambari-qa hadoop 26 2017-02-09 21:51 /apps/hive/warehouse/student/part-m-00000_copy_1 Include new records from 201-02-08 to 2017-02-09

Como o diretório da tabela Hive pode aumentar significativamente em tamanho com tarefas de importação incrementais diárias, é recomendável usar a ferramenta de mesclagem Sqoop para gerar um arquivo mais simplificado.

Vamos dar uma olhada em outro exemplo. Você pode usar o comando codegen para gerar o código que interage com os registros do banco de dados. Por exemplo:

sqoop codegen --connect --username --password --table TURMA --outdir /tmp/sqoop --fields-terminated-by '\t'

Você pode usar a ferramenta de mesclagem Sqoop para "achatar" dois conjuntos de dados em um, conforme mostrado no exemplo a seguir:

 sqoop merge --new-data /apps/hive/warehouse/student/part-m-00000 --onto /apps/hive/warehouse/student/part-m-00000_copy_1 --target-dir /tmp/sqoop_merge --jar-file /tmp/sqoop-ambari-qa/compile/9062c87c959e4090dcec5995a439b514/TIME.jar --class-name DATA --merge-key DATA

Você também pode usar a ferramenta de mesclagem para extrair dados especiais do HDFS. Por exemplo, para extrair os primeiros dois meses dos dados do aluno, você poderia executar o comando que é semelhante ao seguinte exemplo:

 sqoop merge --new-data /apps/hive/warehouse/student/part-m-00000_copy_1 --onto /apps/hive/warehouse/student/part-m-00000_copy_2 --target-dir /tmp/student_first_two_month --jar-file /tmp/sqoop-ambari-qa/compile/9062c87c959e4090dcec5995a439b514/DATA.jar --class-name TIME --merge-key DATA

Depois que a operação de mesclagem for concluída, você poderá importar os dados de volta para um repositório de dados Hive ou HBase.

2. Sqoop-Import

Propósito

A ferramenta de importação importa uma tabela individual de um RDBMS para o HDFS. Cada linha de uma tabela é representada como um registro separado no HDFS. Os registros podem ser armazenados como arquivos de texto (um registro por linha) ou em representação binária como Avro ou Sequence Files.

Paralelismo de controle

O Sqoop importa dados em paralelo da maioria das origens do banco de dados. Você pode especificar o número de tarefas do mapa (processos paralelos) a serem usados ​​para executar a importação usando o –m ou --num-mappers argumento. Cada um desses argumentos recebe um valor inteiro que corresponde ao grau de paralelismo a ser empregado. Por padrão, quatro tarefas são usadas. Alguns bancos de dados podem ter desempenho aprimorado, aumentando esse valor para 8 ou 16. Não aumente o grau de paralelismo maior do que o disponível em seu cluster MapReduce; as tarefas serão executadas em série e provavelmente aumentarão o tempo necessário para executar a importação. Da mesma forma, não aumente o grau de paralelismo acima do que seu banco de dados pode suportar. Conectar 100 clientes simultâneos ao seu banco de dados pode aumentar a carga no servidor de banco de dados para um ponto em que o desempenho sofra como resultado.

Ao realizar importações paralelas, o Sqoop precisa de um critério pelo qual possa dividir a carga de trabalho. O Sqoop usa uma coluna de divisão para dividir a carga de trabalho. Por padrão, o Sqoop identificará a coluna de chave primária (se presente) em uma tabela e a usará como a coluna de divisão. Os valores baixo e alto da coluna de divisão são recuperados do banco de dados e as tarefas do mapa operam em componentes de tamanhos uniformes do intervalo total. Por exemplo, se você tivesse uma tabela com uma coluna de chave primária da id cujo valor mínimo é 0 eo valor máximo foi de 1000, e Sqoop foi direcionado para usar 4 tarefas, Sqoop seria executado quatro processos que cada executar instruções SQL da forma SELECT * FROM sometable WHERE id >= lo AND id < hi, com (lo, hi) setembro para (0, 250), (250, 500), (500, 750) e (750, 1001) nas diferentes tarefas.

Se os valores atuais da chave primária não estiverem uniformemente distribuídos em seu intervalo, isso poderá resultar em tarefas desbalanceadas. Você deve escolher explicitamente uma coluna diferente com o --split-by argumento. Por exemplo, --split-by employee_id. O Sqoop atualmente não pode ser dividido em índices de várias colunas. Se a sua tabela não tiver coluna de índice ou tiver uma chave de várias colunas, você também deverá escolher manualmente uma coluna de divisão.

Se uma tabela não tiver uma chave primária definida e --split-by <col>não for fornecida, ela não será alterada, a menos que o número de mapeadores seja explicitamente definido como um com a --num-mappers 1opção ou a --autoreset-to-one-mapper opção seja usada. A opção --autoreset-to-one-mapper é normalmente usada com a ferramenta de importação de todas as tabelas para manipular tabelas automaticamente sem uma chave primária em um esquema.

Eis um exemplo utilizado em um de nossos Jobs :

/usr/bin/sqoop job -Dmapred.job.queue.name=TESTE\
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
-Dhadoop.security.credential.provider.path=jceks://file/home/master/keystore.jceks \
--options-file ../../../../conf/dados_brutos/teste.props \
--options-file ../../hql/teste.hql \
--hive-import \
--hive-database bruto \
--hive-table teste \
--target-dir /apps/hive/warehouse/...
--fields-terminated-by '\b' \
--mapreduce-job-name teste.java \
-m 50 \
--null-string "NULL" --split-by ID_REGISTRO \
--append \
--check-column ID_REGISTRO \
--incremental append \
--last-value 177936582 \
--boundary-query 'SELECT MIN(ID_REGISTRO), MAX(ID_REGISTRO) FROM teste'

Neste exemplo foi utilizado o campo ID_REGISTRO como modelo.
Outro ponto importante que deve ser ressaltado é, a criação das tabelas para aceitação de partições e transações ACID assim como para procedimentos SCD.

2. Particionamento de Dados
Static Partitioning:
O particionamento no Hive não difere muito da mesma técnica realizada em um RDBMS convencional.
Podemos entender o particionamento como sendo uma técnica utilizada para criar um conjunto de dados gravados fisicamente separados da tabela principal tendo como parâmetro uma determinada coluna separada por faixa de valores.
Ficou confuso ? Então vamos ver na prática como tudo isso funciona.

Criando nossa tabela

Obviamente o Hive na vida real trata com arquivos de grande volume, então para facilitar nossa vida eu achei melhor não disponibilizar um arquivo com 100 terabytes Ok ?
Em primeiro lugar vamos criar uma simples tabela sem nenhum particionamento.
Vamos fazer o download do arquivo csv que será nossa fonte de dados para alimentar a tabela.
Download do arquivo CSV ou importação da fonte
Após fazer o download do arquivo vamos então copiá-lo para o HDFS e seguir o procedimento que já é de conhecimento de todos conforme mostrado o exemplo na imagem abaixo:

Cria o diretório no HDFS

hdfs dfs -mkdir /user/hive/warehouse/pessoas

Copie nosso csv para o diretório criado

hdfs dfs -copyFromLocal pessoas.csv /user/hive/warehouse/pessoas
Agora utilizando o client do Hive vamos criar nossa tabela a partir do arquivo copiado.
CREATE TABLE pessoas (
 id STRING,
 nome STRING,
 sobrenome STRING,
 nascimento STRING )
 ROW FORMAT
 DELIMITED FIELDS TERMINATED BY ','
 LOCATION '/user/hive/warehouse/pessoas';

Até o momento criamos apenas uma simples tabela no Hive utilizando o banco de dados default. Para conferir os resultados liste seu conteúdo e veja que existe uma coluna chamada nascimento que como o próprio nome diz registra a data de nascimento da pessoa.
Abaixo segue a imagem para que possa conferir os seus resultados:

Criando o particionamento no Hive

Se tudo deu certo até o momento chegou a hora de criar nossa partição.
Vamos criar a tabela responsável por tratar as partições:
 CREATE TABLE pessoas_partition (
 id STRING,
 nome STRING,
 sobrenome STRING, 
 nascimento STRING)
 PARTITIONED BY (part_nascimento STRING);
 

Para que os dados sejam inseridos na tabela particionada existem duas formas e aqui vamos tratar da Static Partition.
Neste tipo, Static Partition, escolhemos os dados que serão inseridos na partição. Veja um exemplo:
Agora criaremos em nossa partição part_nascimento todas as pessoas nascidas de 90 até 99.

INSERT INTO pessoas_partition PARTITION (part_nascimento='anos90')
SELECT * FROM pessoas WHERE YEAR(nascimento)>='1990' AND YEAR(nascimento)<='1999';

O Hive irá entender a partição “part_nascimento” como uma coluna então caso tenha várias partições e deseje consultar apenas a de interesse pode utilizar o seguinte:

SELECT * FROM pessoas_partition WHERE part_nascimento='anos90';
Vamos continuar e criar mais duas partições uma para os anos 80 e outra para 2000
INSERT INTO pessoas_partition PARTITION (part_nascimento='anos80')
SELECT * FROM pessoas WHERE YEAR(nascimento)>='1980' AND YEAR(nascimento)<='1989';
INSERT INTO pessoas_partition PARTITION (part_nascimento='anos2000')
SELECT * FROM pessoas WHERE YEAR(nascimento)>='2000' AND YEAR(nascimento)<='2099'
Agora podemos mostrar todas as nossas partições utilizando o comando:
SHOW PARTITIONS pessoas_partition;
E fisicamente os arquivos no HDFS separados.
Por hoje é só e, espero que tenha sido interessante compartilhar este conhecimento. Por hoje é só e até a próxima! 
De seu amigo da vizinhança, Bruno Rafael :)

Comentários

Postagens mais visitadas deste blog

E Esse Tal de Nano Service?

Executar Audio em Java Swing

Validando Email em Java Com e Sem expressão Regular