Servicio asíncrono que publica una cantidad posiblemente infinita de eventos -- java campo con multithreading campo con asynchronous campo con thread-safety campo con rx-java camp codereview Relacionados El problema

Asynchronous service that publishes a possibly infinite amount of events


2
vote

problema

Español

Quiero implementar un servicio que obtiene eventos de Ejecución y los publica a todos los suscriptores de la secuencia. Se supone que todo se debe correr de forma asíncrona, por lo que no bloquea el hilo de la interfaz de usuario. Así es como se supone que el servicio se utiliza:

  ETStreamService service = new ETStreamService(); service.getObservable().subscribe(e -> ...); service.start(); doSomeOtherWork(); service.stop();   

Implementé una versión genérica (manejo de errores omitido):

  public class ETStreamService<E extends ChronologicComparable<E>> {      private ETReceiver<E> receiver;     private AtomicBoolean running;     private PublishSubject<E> publishSubject;     private Thread streamThread;      private class ETStreamRunnable implements Runnable {         private ETReceiver<E> receiver;         private AtomicBoolean running;         private PublishSubject<E> publishSubject;          public ETStreamRunnable(AtomicBoolean running, ETReceiver<E> receiver, PublishSubject<E> publishSubject) {             this.receiver = receiver;             this.running = running;             this.publishSubject = publishSubject;         }          @Override         public void run() {             while(running.get()) {                 ETResponse<E> response = receiver.getNext();                  switch(response.getType()) {                 case NEW_DATA:                     publishSubject.onNext(response.getData());                     break;                 case NO_NEW_DATA_AVAILABLE:                     break;                 case SOURCE_DEPLETED:                     publishSubject.onComplete();                     running.set(false);                     break;                 }             }         }      }      public ETStreamService(ETReceiver<E> receiver) {         this.receiver = receiver;         this.running = new AtomicBoolean(false);         this.publishSubject = PublishSubject.create();     }      public void start() {         running.set(true);         streamThread = new Thread(new ETStreamRunnable(running, receiver, publishSubject));         streamThread.start();     }      public void stop() {         running.set(false);     }      public void shutdown() {         running.set(false);         publishSubject.onComplete();     }      public Observable<E> getObservable() {         return publishSubject;     } }   

La idea es ejecutar un hilo separado, que se puede iniciar y detener a través de la interfaz pública de la clase ETStreamService . El corredor que se ejecuta dentro del hilo hace la búsqueda y publicación de eventos. La corriente de eventos se puede obtener llamando getObservable() . onComplete() Si el 9988776655544339 no tiene más elementos para recibir (fuente finita, como una lista de datos pregrabados), o si 998877766555443310 Se llama método (para fuentes infinitas, como un agenda real).

Me gusta el diseño general, pero no estoy seguro de la seguridad del hilo, por lo que sería bueno si alguien pudiera revisarlo por mí.

Original en ingles

I want to implement a service that fetches eyetracking events and publishes them to all subscribers of the stream. The whole thing is supposed to run asynchronously, so it doesn't block the UI thread. This is how the service is supposed to be used:

ETStreamService service = new ETStreamService(); service.getObservable().subscribe(e -> ...); service.start(); doSomeOtherWork(); service.stop(); 

I implemented a generic version (error handling omitted):

public class ETStreamService<E extends ChronologicComparable<E>> {      private ETReceiver<E> receiver;     private AtomicBoolean running;     private PublishSubject<E> publishSubject;     private Thread streamThread;      private class ETStreamRunnable implements Runnable {         private ETReceiver<E> receiver;         private AtomicBoolean running;         private PublishSubject<E> publishSubject;          public ETStreamRunnable(AtomicBoolean running, ETReceiver<E> receiver, PublishSubject<E> publishSubject) {             this.receiver = receiver;             this.running = running;             this.publishSubject = publishSubject;         }          @Override         public void run() {             while(running.get()) {                 ETResponse<E> response = receiver.getNext();                  switch(response.getType()) {                 case NEW_DATA:                     publishSubject.onNext(response.getData());                     break;                 case NO_NEW_DATA_AVAILABLE:                     break;                 case SOURCE_DEPLETED:                     publishSubject.onComplete();                     running.set(false);                     break;                 }             }         }      }      public ETStreamService(ETReceiver<E> receiver) {         this.receiver = receiver;         this.running = new AtomicBoolean(false);         this.publishSubject = PublishSubject.create();     }      public void start() {         running.set(true);         streamThread = new Thread(new ETStreamRunnable(running, receiver, publishSubject));         streamThread.start();     }      public void stop() {         running.set(false);     }      public void shutdown() {         running.set(false);         publishSubject.onComplete();     }      public Observable<E> getObservable() {         return publishSubject;     } } 

The idea is to run a separate thread, which can be started and stopped through the public interface of the ETStreamService class. The Runnable that is run inside the thread does the fetching and publishing of events. The event stream can be obtained by calling getObservable(). onComplete() is emitted if the ETReceiver has no more elements to receive (finite source, like a list of pre-recorded data), or if the shutdown() method is called (for infinite sources, such as a real eyetracker).

I like the overall design, but i am unsure about the thread safety of it, so it would be nice if someone could review it for me.

              

Lista de respuestas


Relacionados problema

4  Probando un ViewModel en Android - MVVM con Databinding  ( Testing a viewmodel in android mvvm with databinding ) 
Estoy usando el patrón MVVM con el indicador de datos. Tengo pruebas escritas. Pero les quiero revisarse. La prueba está relacionada con JUnit prueba en Vie...

1  Actuar condicionalmente acciones asíncronas con Rxjava  ( Conditionally perform asynchronous actions with rxjava ) 
Esto parece funcionar bien, sin embargo, estoy seguro de que debe haber una manera más amable. El resultado debe ser: Si ishidden == verdadero, llame a CANHID...

0  Encontrar la media usando rxjava  ( Finding the mean using rxjava ) 
Estoy aprendiendo a Rxjava y esto es lo que intenté encontrar la media de las transacciones 'n'. infix operator >>> {associativity left} func >>><A,B>(a: A...

2  Secuencia de lanzamiento de aplicaciones de refactorización usando programación reactiva  ( Refactoring app launch sequence using reactive programming ) 
En el SplashActivity de MI Aplicación de Android, se produce la siguiente secuencia de eventos: Compruebe si se instala los servicios de Google Play Si...

3  Hilos de coordinación con arroyos reactivos  ( Coordinating threads with reactive streams ) 
Acabo de comenzar a aprender corrientes reactivos con rxjava . Después de leer un par de libros y muchos artículos que todavía estoy teniendo problemas para ...

3  Usando combinaciones de o y filtros con RxJava  ( Using combinations of or and and filters with rxjava ) 
Estoy tratando de hacer ejercicio de una manera de crear una combinación de OR y AND3 filtros en RxJava. Esto es lo que he hecho hasta ahora: public in...

2  Listado de videos de YouTube usando API con RxJava  ( Listing youtube videos using api with rxjava ) 
Soy nuevo en RxJava y tengo el siguiente código que utilizo para buscar videos de YouTube a través de la API y, en última instancia, mostrarlos en una lista. ...

1  Suscrito a múltiples fuentes en Rxjava  ( Subscribing to multiple sources in rxjava ) 
He usado Rxjava para observar múltiples fuentes y luego realizar acciones dependiendo de los valores de observables. Quiero saber si se puede simplificar aún ...

0  Obtención de datos con RxJava y Reequiño  ( Getting data with rxjava and retrofit ) 
Estoy usando RxJava con reequipamiento Para obtener datos de una API, filtrando los datos con un método 99887766655443314 99887776655443315 . ¿Hay alguna fo...

2  ¿Cómo implementar un caché de un solo valor en RxJava 3?  ( How to implement a single value cache in rxjava 3 ) 
kotlin 1.3.+ rxjava 3.0.+ koteest 4.1.+ // SingleValueCache.kt package com.example import io.reactivex.rxjava3.core.Single import java.util.co...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos