¿Cómo procesamos los datos de la corriente de Kinesis cada una hora? -- amazon-web-services campo con amazon-kinesis campo con amazon-kcl camp Relacionados El problema

How do we process the kinesis stream data every one hour?


0
vote

problema

Español

Tengo una corriente de kinesis que se está escribiendo continuamente con PutrECord. En el consumidor consumo utilizando procesrecords de KCL. Estos registros del proceso se llama Siempre que haya un registro en el Stream. Necesita que estos registros de proceso se excuten cada x hora.

Aquí están las cosas que probé.

  kinesisClientLibConfiguration = new KinesisClientLibConfiguration(applicationName, kinesisStreamName,             credsProvider, workerId).withInitialPositionInStream(initialPositionInStream)                     .withCallProcessRecordsEvenForEmptyRecordList(true)                     .withIdleTimeBetweenReadsInMillis(3600000) //1 hr in millis                     .withKinesisEndpoint(kinesisEndpoint);   

Esto no parece estar trabajando. Lanza la siguiente excepción

      [INFO] 2018-02-08T18:41:24.124 com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask getRecordsResult  ShardId shardId-000000000003: getRecords threw ExpiredIteratorException - restarting after greatest seqNum passed to customer com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Thu Feb 08 13:06:04 UTC 2018 while right now it is Thu Feb 08 13:11:23 UTC 2018 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: d72ba81b-9a14-cf1d-85dd-e6a01179f91c)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)     at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)     at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2276)     at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2252)     at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1062)     at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1038)     at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.get(KinesisProxy.java:158)     at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.get(MetricsCollectingKinesisProxyDecorator.java:74)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getRecords(KinesisDataFetcher.java:74)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.SynchronousGetRecordsRetrievalStrategy.getRecords(SynchronousGetRecordsRetrievalStrategy.java:31)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockingGetRecordsCache.getNextResult(BlockingGetRecordsCache.java:50)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResultAndRecordMillisBehindLatest(ProcessTask.java:377)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResult(ProcessTask.java:342)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:159)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)     at java.util.concurrent.FutureTask.run(Unknown Source)     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)     at java.lang.Thread.run(Unknown Source)   

¿Puede alguien por favor hágamelo saber si hay un trabajo para lograr esto? Gracias

Original en ingles

I have a kinesis stream which is being continuously written with putRecord. In the consumer I consume using processRecords of KCL. This process records gets called whenever there is a record in the stream.I need this process records to be excuted every X hour.

Here are the things that I tried.

kinesisClientLibConfiguration = new KinesisClientLibConfiguration(applicationName, kinesisStreamName,             credsProvider, workerId).withInitialPositionInStream(initialPositionInStream)                     .withCallProcessRecordsEvenForEmptyRecordList(true)                     .withIdleTimeBetweenReadsInMillis(3600000) //1 hr in millis                     .withKinesisEndpoint(kinesisEndpoint); 

This does'nt seem to be working. It throws the below exception

    [INFO] 2018-02-08T18:41:24.124 com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask getRecordsResult  ShardId shardId-000000000003: getRecords threw ExpiredIteratorException - restarting after greatest seqNum passed to customer com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator expired. The iterator was created at time Thu Feb 08 13:06:04 UTC 2018 while right now it is Thu Feb 08 13:11:23 UTC 2018 which is further in the future than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; Request ID: d72ba81b-9a14-cf1d-85dd-e6a01179f91c)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)     at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)     at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)     at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2276)     at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2252)     at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1062)     at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1038)     at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.get(KinesisProxy.java:158)     at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.get(MetricsCollectingKinesisProxyDecorator.java:74)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getRecords(KinesisDataFetcher.java:74)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.SynchronousGetRecordsRetrievalStrategy.getRecords(SynchronousGetRecordsRetrievalStrategy.java:31)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockingGetRecordsCache.getNextResult(BlockingGetRecordsCache.java:50)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResultAndRecordMillisBehindLatest(ProcessTask.java:377)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResult(ProcessTask.java:342)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:159)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)     at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)     at java.util.concurrent.FutureTask.run(Unknown Source)     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)     at java.lang.Thread.run(Unknown Source) 

Can someone please let me know if there is a work around for achieving this? Thanks

        
   
   

Lista de respuestas


Relacionados problema

0  KCL ¿Cómo conocer el número de registros de un ProcessRecordSInput?  ( Kcl how to know the number of records for one processrecordsinput ) 
Tengo un consumidor de kinesis escrito en Java con KCL. Desde el lado del consumidor, puedo acelerar la velocidad con el método SetMaxRecords para establecer ...

0  Cómo KCL INTERNAMENTE INTERNAMENTE SOBRE EL NUEVO RECIENTE DE RECURSO  ( How kcl internally spawns new recordprocessor ) 
Tengo un consumidor de Java (implementación de KCL) para consumir mensajes de 1 secuencia de kinesis con 2 fragmentos. El KCL crea 2Procesadores de grabació...

5  El fragmento de kinesis getRecords.iteratoragemillisegundos alcanzó el máximo de 86.4m (1 día) y no disminuye aunque consuma  ( Kinesis shard getrecords iteratoragemilliseconds reached maximum 86 4m 1 day a ) 
Estoy consumiendo un flujo de kinesis con SPARK Streaming 2.2.0 y usando sproaming-kinesis-asl_2.11 . La corriente de kinesis tiene 150 fragmentos y estoy...

0  ¿Cómo funciona la última posición en la corriente en Kinesis, KCL?  ( How does latest position in stream works in kinesis kcl ) 
Estamos construyendo un servicio basado en secuencias de Kinesis / Dynamodb y tenemos la siguiente pregunta sobre el comportamiento de los puntos de control. ...

1  Cambiar el tiempo de conmutación por error para AWS KCL  ( Change failover time for aws kcl ) 
AWS recomienda aumentar el tiempo de conmutación por error para KCL (Kinesis), si las aplicaciones con problemas de conectividad. https://docs.aws.amazon.com...

0  Consumidor de primavera para la corriente de Kinesis AWS  ( Spring consumer for aws kinesis stream ) 
Actualmente, estoy usando el flujo de Kinesis AWS con un solo fragmento y solo un consumidor basado en la boot de la primavera y la biblioteca de KCL. Ahora...

5  Dump Kinesis Client Library (KCL) registra en archivo  ( Dump kinesis client library kcl logs to file ) 
Estoy usando la biblioteca de clientes Kinesis (KCL) para suscribirse a la corriente de Kinesis. Todos los registros de KCL están impresos en la consola. Nece...

3  Carga de equilibrio y escalado en la aplicación basada en la biblioteca de clientes Kinesis (KCL)  ( Load balancing and scaling in kinesis client library kcl based application ) 
Estoy usando conectores de amazon-kinesis para construir una aplicación de lado del cliente de Kinesis . Estoy averiguiendo algunas cosas al respecto. Cóm...

0  Cómo obtener el volcado de hilo de KCL NODEJS  ( How to get kcl nodejs thread dump ) 
Tengo una aplicación NODEJS que consume datos de flujo de kinesis utilizando KCL para NODEJS. Esta aplicación se ejecuta como un contenedor de Docker en AWS E...

6  ¿Cómo manejar los escenarios de reprocesamiento en AWS Kinesis?  ( How to handle reprocessing scenarios in aws kinesis ) 
Estoy explorando a AWS Kinesis para un requisito de procesamiento de datos que reemplaza el procesamiento de la ETL de LOTE VIEJO con un enfoque basado en la ...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos