La mejor manera para que un trabajo actualice la salida de otro trabajo -- java campo con hadoop campo con mapreduce campo con distributed-computing campo con avro camp Relacionados El problema

Best way for a job to update output from another job


2
vote

problema

Español

Aquí está mi escenario. Tengo un trabajo que procesó una gran cantidad de datos CSV y lo escribe utilizando AVRO en los archivos divididos por fecha. Me han dado un pequeño archivo con el que quiero usar para actualizar algunos de estos archivos con entradas adicionales con un segundo trabajo que puedo ejecutar cada vez que esto debe suceder en lugar de volver a reprocesar todo el conjunto de datos.

Aquí es una especie de cómo se ve la idea:

  • Job1: Procesar muchos datos de CSV, lo escribe en archivos AVRO comprimidos divididos en archivos por fecha de entrada. Los datos de origen no se dividen por fecha, por lo que este trabajo hará eso.
  • Job2 (Ejecute según sea necesario entre Job1 RUNS): procese el archivo de actualización pequeña y use esto para agregar las entradas al archivo AVRO apropiado apropiado. Si no existe, crea un nuevo archivo.
  • Job3 (siempre se ejecuta): produce algunas métricas para informar de la producción de Job1 (y posiblemente el trabajo 2).

Entonces, tengo que hacerlo de esta manera escribiendo un trabajo de Java. Mi primer trabajo parece funcionar bien. También lo hace 3. No estoy seguro de cómo acercarse al trabajo 2.

Aquí está lo que estaba pensando:

  • Pase el archivo de actualización en el uso de caché distribuido. Analizar este archivo a producir una lista de fechas en la clase de trabajo y usar esto para filtrar el Archivos de Job1, que será la entrada de este trabajo.
  • En el Mapper, acceda al archivo de actualización distribuido y agreguelos a la colección de mis objetos AVRO que he leído. ¿Qué pasa si el archivo todavía no existe aquí? ¿Esto funciona?
  • use el reductor para escribir la nueva colección de objetos

¿Es así como uno implementaría esto? Si no, ¿cuál es la mejor manera? ¿Tiene sentido un combinador aquí? Siento que la respuesta es no.

gracias de antemano.

Original en ingles

Here is my scenario. I have a job that processed a large amount of csv data and writes it out using Avro into files divided up by date. I have been given a small file with that I want to use to update a few of these files with additional entries with a second job I can run whenever this needs to happen instead of reprocessing the whole data set again.

Here is sort of what the idea looks like:

  • Job1: Process lots of csv data, writes it out in compressed Avro files split into files by entry date. The source data is not divided by date so this job will do that.
  • Job2 (run as needed between Job1 runs): Process small update file and use this to add the entries to the appropriate appropriate Avro file. If it doesn't exist create a new file.
  • Job3 (always runs): Produce some metrics for reporting from the output of Job1 (and possibly Job 2).

So, I have to do it this way writing a Java job. My first job seems to work fine. So does 3. I'm not sure on how to approach job 2.

Here is what I was thinking:

  • Pass the update file in using distributed cache. Parse this file to produce a list of dates in the Job class and use this to filter the files from Job1 which will be the input of this job.
  • In the mapper, access the distributed update file and add them to the collection of my avro objects I've read in. What if the file doesn't exist yet here? Does this work?
  • Use Reducer to write the new object collection

Is this how one would implement this? If not what is the better way? Does a combiner make sense here? I feel like the answer is no.

Thanks in advance.

              

Lista de respuestas

3
 
vote
vote
La mejor respuesta
 

Puede seguir por debajo del enfoque:

1) Ejecutar job1 en todo su archivo CSV

2) Ejecute Job2 en un archivo pequeño y cree una nueva salida

3) Para la actualización, debe ejecutar un trabajo más, en este trabajo, cargue la salida de Job2 en el método de configuración () y tome la salida de Job1 como una entrada MAP (). Luego escriba la lógica de actualización y genere salida final.

4) Luego ejecute su trabajo3 para su procesamiento.

Según yo, esto funcionará.

 

You can follow below approach:

1) run job1 on all your csv file

2) run job2 on small file and create new output

3) For update, you need to run one more job, in this job, load the output of job2 in setup() method and take output of job1 as a map() input. Then write the logic of update and generate final output.

4) then run your job3 for processing.

According to me, this will work.

 
 
   
   
2
 
vote

Solo una idea loca: ¿por qué necesita actualizar job1 de salida?

  • Job1 hace su trabajo produciendo un archivo de fecha. ¿Por qué no agregarlo con PUSFIX único como UUID aleatorio?
  • Información de actualización de los procesos de Job2. Tal vez varias veces. La lógica del nombre de archivo de salida es el mismo: nombre basado en la fecha y postfix único.
  • Job3 recopila Job1 y Job2 Salida agrupándolos en divisiones por prefijo de fecha con todas las postfixes y tomando como entrada.

Si la agrupación basada en la fecha es objetivo, aquí tiene muchas ventajas que para mí, obvias:

  • No le importa Abuot 'si tiene una salida de Job1 para esta fecha'.
  • Aunque no le importa si necesita actualizar una salida de Job1 con varios resultados de Job2.
  • No rompe el enfoque HDFS con la limitación de 'No actualización de archivos' que tiene una potencia completa de "Escribir una vez" procesamiento sencillo.
  • Solo necesita un *file != NULL1 para su trabajo3. No parece tan complejo.
  • Si necesita combinar datos de diferentes fuentes, no hay problema.
  • Job3 puede ignorar el hecho de que recibe datos de varias fuentes. InputFormat debe cuidarse.
  • Se pueden combinar varias salidas Job1 de la misma manera.

Limitaciones:

  • Esto podría producir más archivos pequeños de los que puede pagar para grandes conjuntos de datos y varias pases.
  • Necesita personal InputFormat .

En cuanto a mí, buena opción si entiendo correctamente su caso y usted puede / necesitar procesar archivos por fecha como entrada para Job3.

Espero que esto te ayude.

 

Just one crazy idea: why do you need actually update job1 output?

  • JOB1 does its job producing one file for date. Why not add it with unique postfix like random UUID?
  • JOB2 processes 'update' information. Maybe several times. The logic of output file naming is the same: date based name and unique postfix.
  • JOB3 collects JOB1 and JOB2 output grouping them into splits by date prefix with all postfixes and taking as input.

If date-based grouping is target, here you have lot of advantages as for me, obvious ones:

  • You don't care abuot 'if you have output from JOB1 for this date'.
  • You even don't care if you need to update one JOB1 output with several JOB2 results.
  • You don't break HDFS approach with 'no file update' limitation having full power of 'write once' straightforward processing.
  • You need only some specific InputFormat for your JOB3. Looks not so complex.
  • If you need to combine data from different sources, no problem.
  • JOB3 itself can ignore fact that it receives data from several sources. InputFormat should take care.
  • Several JOB1 outputs can be combined the same way.

Limitations:

  • This could produce more small files than you can afford for large datasets and several passes.
  • You need custom InputFormat.

As for me good option if I properly understand your case and you can / need to group files by date as input for JOB3.

Hope this will help you.

 
 
 
 
1
 
vote

Para Job2, puede leer el archivo de actualización para filtrar las particiones de datos de entrada en el código de controlador y configurarlo en las rutas de entrada. Puede seguir el enfoque actual para leer el archivo de actualización como distribuir el archivo archivo de caché. En caso de que desee fallar el trabajo si no puede leer el archivo de actualización, lanzar excepción en el método de configuración en sí.

Si su lógica de actualización no requiere agregación en el lado de reducción, configure solo Job2 como trabajo solo MAP. Es posible que deba crear lógica para identificar las particiones de entrada actualizadas en Job3, ya que recibirá la salida de ORAB1 y la salida JOB2.

 

For Job2, You can read the update file to filter the input data partitions in Driver code and set it in Input paths. You can follow the current approach to read the update file as distribute cache file.In case you want to fail the job if you are unable to read update file , throw exception in setup method itself.

If your update logic does not require aggregation at reduce side, Set Job2 as map only job.You might need to build logic to identify updated input partitions in Job3 as it will receive the Job1 output and Job2 output.

 
 

Relacionados problema

1  ¿Alguno de ahorro, ProtoBUF, AVRO, etc. Soporte de adultos en datos codificados directamente?  ( Do any of thrift protobuf avro etc support quering on encoded data directly ) 
¿Alguno de ahorro de ahorro, ProtoBUF, AVRO, etc. Soporte a la extraña en los datos compactos resultantes? ¿O algo así, como un servidor de Thrift, primero te...

2  Bibliotecas de clientes idiomáticas con Cassandra + PHP?  ( Idiomatic client libraries with cassandra php ) 
En este video (a las 29:00), Eric Evans de RackSpace le dice a una audiencia que usar Thrift y Avro es una mala idea. En su lugar, aboga por usar las bibliote...

2  ¿Conversión del esquema AVRO al esquema de Google BigQuery en Python?  ( Converting from avro schema to google bigquery schema in python ) 
¿Hay una biblioteca de Python que convierta los esquemas de Avro a los esquemas de Bigquery? noté que el Java SDK para Apache Beam tiene una utilidad que ...

0  Analizando múltiples archivos AVRO (archivos AVSC) que se refieren con Python (Fastavro)  ( Parsing multiple avro avsc files which refer each other using python fastavro ) 
Tengo un esquema AVRO que se encuentra actualmente en un solo archivo AVSC como a continuación. Ahora quiero mover la dirección de registro a un archivo AVSC ...

1  Herramienta CSV a JSON Converter  ( Csv to json converter tool ) 
Tengo un archivo CSV que quiero convertirme a AVRO; Debido a que no hay herramientas para convertir directamente de CSV a AVRO, decidí usar una herramienta en...

1  Spark Maven Dependencia Incompatibilidad entre Delta-Core y Spark-Avro  ( Spark maven dependency incompatibility between delta core and spark avro ) 
Estoy tratando de agregar delta-núcleo a mi proyecto SCALA SPARK, ejecutando 2.4.4. Un comportamiento extraño que estoy viendo es que parece estar en confli...

-1  Manipulación de datos decodificados con múltiples tipos usando Goavro  ( Handling decoded data with multiple types using goavro ) 
¿Qué estoy tratando de hacer? Para cada cambio en la base de datos, estoy tratando de convertir el evento Debezium en un CSV de los valores de la base de da...

0  Excepción NOSUCHMETHOD arrojada cuando construye AvroparquetWriter en Java Spark  ( Nosuchmethod exception thrown when build avroparquetwriter in java spark ) 
Estoy usando Java y Spark y tengo el siguiente código todo id - user_id - name - description - reminder - reminder_date111 Sin embargo, recibí un error ...

4  Serialización AVRO: ¿Qué partes son y no son seguras de hilo?  ( Avro serialization which parts are and arent thread safe ) 
Estoy viendo información en conflicto sobre esto en diferentes lugares en línea, así que apreciaría y la respuesta autorizada de alguien, que realmente sabe. ...

2  PIG-AVRO: Cómo personalizar el camino, el AvRrostorage carga un archivo  ( Pig avro how to customize the way the avrostorage loads a file ) 
Tengo un requisito en el que necesitamos personalizar la forma en que cargamos un archivo en cerdo usando AVRROSTORAGE: Por ejemplo, tengo un archivo AVRO c...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos