Manejo de errores de Kafka usando Shopify Sarama -- go campo con shopify campo con apache-kafka camp Relacionados El problema

Kafka Error Handling using Shopify Sarama


3
vote

problema

Español

Así que estoy tratando de usar Kafka para mi aplicación, que tiene una acción de registro de productores en Kafka MQ y al consumidor que lo lee del MQ., ya que mi solicitud está en marcha, estoy usando el sarama Shopify para que lo haga posible. .

En este momento, puedo leer el MQ e imprimir el contenido del mensaje usando un

  fmt.Printf   

Howeveder, realmente me gustaría que el manejo de errores sea mejor que la impresión de la consola y estoy dispuesto a ir más allá.

Código ahora mismo para la conexión al consumidor:

  mqCfg := sarama.NewConfig()  master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg) if err != nil {     panic(err) // Don't want to panic when error occurs, instead handle it }   

y el procesamiento de mensajes:

      go func() {     defer wg.Done()     for message := range consumer.Messages() {         var msgContent Message         _ = json.Unmarshal(message.Value, &msgContent)         fmt.Printf("Reading message of type %s with id : %d ", msgContent.Type, msgContent.ContentId) //Don't want to print it     } }()   

Mis preguntas (soy nuevo para probar Kafka y New a Kafka en general):

  1. ¿Dónde podrían ocurrir los errores en el programa anterior, para que pueda manejarlos? Cualquier código de muestra será genial para que comience. Las condiciones de error que pude pensar son cuando el msgcontent no contiene realmente ningún tipo de campos CONTENTID en el JSON.

  2. En Kafka, ¿hay situaciones en las que el consumidor está tratando de leer en la compensación actual, pero por alguna razón no pudo (incluso cuando el JSON está bien formado)? ¿Es posible que mi consumidor retroceda para decir X Pasos por encima de la lectura de desplazamiento fallido y re-procesar las compensaciones? ¿O hay una mejor manera de hacer esto? Una vez más, ¿qué podrían ser estas situaciones?

Estoy abierto a leer y probar las cosas.

Original en ingles

So I am trying to use Kafka for my application which has a producer logging actions into the Kafka MQ and the consumer which reads it off the MQ.Since my application is in Go, I am using the Shopify Sarama to make this possible.

Right now, I'm able to read off the MQ and print the message contents using a

fmt.Printf 

Howeveer, I would really like the error handling to be better than console printing and I am willing to go the extra mile.

Code right now for consumer connection:

mqCfg := sarama.NewConfig()  master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg) if err != nil {     panic(err) // Don't want to panic when error occurs, instead handle it } 

And the processing of messages:

    go func() {     defer wg.Done()     for message := range consumer.Messages() {         var msgContent Message         _ = json.Unmarshal(message.Value, &msgContent)         fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it     } }() 

My questions (I am new to testing Kafka and new to kafka in general):

  1. Where could errors occur in the above program, so that I can handle them? Any sample code will be great for me to start with. The error conditions I could think of are when the msgContent doesn't really contain any Type of ContentId fields in the JSON.

  2. In kafka, are there situations when the consumer is trying to read at the current offset, but for some reason was not able to (even when the JSON is well formed)? Is it possible for my consumer to backtrack to say x steps above the failed offset read and re-process the offsets? Or is there a better way to do this? again, what could these situations be?

I'm open to reading and trying things.

        
   
   

Lista de respuestas

2
 
vote
vote
La mejor respuesta
 

Con respecto a 1) Verifique dónde registre los mensajes de error a continuación. Esto es más o menos lo que haría.

Respecto a 2) No sé tratar de caminar hacia atrás en un tema. Es muy posible simplemente creando un consumidor una y otra vez, con su desplazamiento inicial menos uno cada vez. Pero no lo recomendaría, ya que lo más probable es que termine la repetición del mismo mensaje una y otra vez. Hago consejos ahorrando su compensación a menudo para que pueda recuperarse si las cosas van al sur.

a continuación es un código que creo que aborda la mayoría de sus preguntas. No he intentado compilar esto. Y la API de Sarama ha estado cambiando últimamente, por lo que la API puede ser actualmente diferente.

  func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) {     wg.Add(1)     go func(){         defer wg.Done()         //to track the last known good offset we processed, which is          // updated after each successfully processed event.          saveprogress := func(off int64){             //Save the offset somewhere...a file...              //Ive also used kafka to store progress              //using a special topic as a WAL         }         defer saveprogress(lastgoodoffset)          client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig())         if err != nil {             log.Error(err)             return         }         defer client.Close()         sarama.NewConsumerConfig()         consumerConfig.OffsetMethod = sarama.OffsetMethodManual         consumerConfig.OffsetValue = int64(lastgoodoff)         consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig)         if err != nil {             log.Error(err)             return         }         defer consumer.Close()         for {             select {             case event := <-consumer.Events():                 if event.Err != nil {                     log.Error(event.Err)                     return                 }                 msgContent := &Message{}                 err = json.Unmarshal(message.Value, msgContent)                 if err != nil {                     log.Error(err)                     continue //continue to skip this message or return to stop without updating the offset.                 }                 // Send the message on to be processed.                 out <- msgContent                   lastgoodoff = event.Offset             }         }     }() }   
 

Regarding 1) Check where I log error messages below. This is more or less what I would do.

Regarding 2) I don't know about trying to walk backwards in a topic. Its very much possible by just creating a consumer over and over, with its starting offset minus one each time. But I wouldn't advise it, as most likely you'll end up replaying the same message over and over. I do advice saving your offset often so you can recover if things go south.

Below is code that I believe addresses most of your questions. I haven't tried compiling this. And sarama api has been changing lately, so the api may currently differ a bit.

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) {     wg.Add(1)     go func(){         defer wg.Done()         //to track the last known good offset we processed, which is          // updated after each successfully processed event.          saveprogress := func(off int64){             //Save the offset somewhere...a file...              //Ive also used kafka to store progress              //using a special topic as a WAL         }         defer saveprogress(lastgoodoffset)          client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig())         if err != nil {             log.Error(err)             return         }         defer client.Close()         sarama.NewConsumerConfig()         consumerConfig.OffsetMethod = sarama.OffsetMethodManual         consumerConfig.OffsetValue = int64(lastgoodoff)         consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig)         if err != nil {             log.Error(err)             return         }         defer consumer.Close()         for {             select {             case event := <-consumer.Events():                 if event.Err != nil {                     log.Error(event.Err)                     return                 }                 msgContent := &Message{}                 err = json.Unmarshal(message.Value, msgContent)                 if err != nil {                     log.Error(err)                     continue //continue to skip this message or return to stop without updating the offset.                 }                 // Send the message on to be processed.                 out <- msgContent                   lastgoodoff = event.Offset             }         }     }() } 
 
 

Relacionados problema

1  Conector de origen Kafka JDBC: cree temas de los valores de la columna  ( Kafka jdbc source connector create topics from column values ) 
Tengo un microservicio que utiliza Oracledb para publicar los cambios del sistema en la tabla EVENT_STORE . La tabla EVENT_STORE contiene una columna TYPE...

10  Clase de Kafkautils no encontrada en Spark Streaming  ( Kafkautils class not found in spark streaming ) 
Acabo de empezar con Spark Streaming y estoy tratando de construir una aplicación de muestra que cuenta las palabras de una corriente de Kafka. Aunque se comp...

0  Kafka - Leer un paquete de mensajes  ( Kafka read a pack of messages ) 
Tengo un trabajo por lotes, lo que llena los datos al tema de Kafka. Cada mensaje tiene datos y identificadores de trabajo. En el lado del consumidor, quiero ...

0  Autenticación Kafka con SASL - ¿Usuario administrador duplicado?  ( Kafka authentication with sasl duplicate admin user ) 
Estoy ejecutando un agente de Kafka distribuidado donde se establece la comunicación entre broker y SASL / SSL. Para esto, me adapté a la configuración de JAA...

4  Kafka Consumer se recupera después del procesamiento fallido de mensajes  ( Kafka consumer recover after failed message processing ) 
Estoy trabajando con el simple consumidor de Kafka en uno de mis proyectos y mi lógica deseada es cuando el consumidor no pudo procesar algún mensaje, se comp...

0  es posible activar el flujo de trabajo de Oozie justo y solo con un mensaje (es decir, con Kafka)  ( Is it possible to trigger oozie workflow just and only with a message ie with ) 
Me gustaría pasar por pase Tiempo de inicio y Frecuencia Parámetro para obtener el flujo de trabajo iniciado inmediatamente cuando se recibe el mensaje de...

0  ¿Kafka Broker siempre revisa si es el líder mientras responde a la solicitud de lectura / escritura  ( Does kafka broker always check if its the leader while responding to read write ) 
Estoy viendo org.apache.kafka.common.errors.notlederforpartitionException en mi productor que entiendo sucede cuando el productor intenta producir mensajes a ...

-2  La herramienta de línea de comando de consumo de Kafka no funciona como se esperaba  ( Kafka consumer command line tool not working as expected ) 
Estoy usando la herramienta de línea de comandos de Kafka para consumir datos de un tema de Kafka, pero esto no funciona como se esperaba. Estoy usando el sig...

0  Voltdb no busca datos de Tema de Kafka  ( Voltdb not fetching data from kafka topic ) 
Estoy usando Voltdb. Y mi caso de uso es importar datos de Kafka a Voltdb. Estoy usando el siguiente comando: Mando: Prueba de kafkaloader --brokers & lt; & ...

0  ¿Cómo obtener un mapa [cadena, cadena] devuelta por un consumidor de Kafka (Alpakka)?  ( How to get a map string string returned by a kafka consumer alpakka ) 
Debería obtener una mapa [cadena, cadena] de regreso de un consumidor de Kafka, pero realmente no sé cómo. Me las arreglé para configurar al consumidor, fun...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos