Generar condición de unión dinámica Spark / Scala -- scala campo con apache-spark camp Relacionados El problema

generate dynamic join condition spark/scala


0
vote

problema

Español

Tengo una matriz de tupla y quiero generar una condición de unión (o) usando eso.

e.g.

  input -->  [("leftId", "rightId"), ("leftId", leftAltId")]  output -->  leftDF("leftId") === rightDF("rightId") || leftDF("leftAltId") === rightDF("rightAltId")   

Firma del método:

    def inner(leftDF: DataFrame, rightDF: DataFrame, fieldsToJoin: Array[(String,String)]): Unit = {    }   

He intentado usar la operación de reducción en la matriz, pero la salida de mi operación de reducción es la columna y no la cadena, por lo tanto, no se puede devolver la entrada. Podría hacer recursivo, pero esperando que haya una forma más sencilla de iniciar la variable de columna vacía y construir la consulta. ¿Pensamientos?

Original en ingles

I have a array of tuple and I want to generate a join condition(OR) using that.

e.g.

input -->  [("leftId", "rightId"), ("leftId", leftAltId")]  output -->  leftDF("leftId") === rightDF("rightId") || leftDF("leftAltId") === rightDF("rightAltId") 

method signature:

  def inner(leftDF: DataFrame, rightDF: DataFrame, fieldsToJoin: Array[(String,String)]): Unit = {    } 

I tried using reduce operation on the array but output of my reduce operation is Column and not String hence it can't be fed back as input. I could do recursive but hoping there's simpler way to initiate empty column variable and build the query. thoughts ?

     

Lista de respuestas

2
 
vote
vote
La mejor respuesta
 

Puedes hacer algo así:

  val cond = fieldsToJoin.map(x => col(x._1) === col(x._2)).reduce(_ || _) leftDF.join(rightDF, cond)   

Básicamente, primero convierte la matriz en una matriz de condiciones (COL transforma la cadena a columna y luego === realiza la comparación) y luego la reducción agrega el "o" entre ellos. El resultado es una columna que puede usar.

 

You can do something like this:

val cond = fieldsToJoin.map(x => col(x._1) === col(x._2)).reduce(_ || _) leftDF.join(rightDF, cond) 

Basically you first turn the array into an array of conditions (col transforms the string to column and then === does the comparison) and then the reduce adds the "or" between them. The result is a column you can use.

 
 

Relacionados problema

3  Codificadores de Spark Java - CAMPOS DE CAMBIO EN LALISTAS DE COLECCIÓN  ( Spark java encoders switch fields on collectaslist ) 
Tengo el siguiente esquema en un conjunto de datos - root |-- userId: string (nullable = true) |-- data: map (nullable = true) | |-- key: string | ...

1  Envío de chispas - no se pudo analizar la URL maestra  ( Spark submit could not parse master url ) 
Estoy recibiendo un error al ejecutar el trabajo SPARK usando SPARK-SHEP EN WINDOWS 10 MÁQUINA. El comando es: c:workspacesSpark2Demo>spark-submit --class ...

25  Recapacitación / Datos de giro en Spark RDD y / o Spark DataFrames  ( Reshaping pivoting data in spark rdd and or spark dataframes ) 
Tengo algunos datos en el siguiente formato (ya sea RDD o Spark DataFrame): from pyspark.sql import SQLContext sqlContext = SQLContext(sc) rdd = sc.paral...

0  Implementación de Java de javastreamingContext.Filestream  ( Java implementation of javastreamingcontext filestream ) 
Tengo algunos problemas de programación con Spark Streaming. Como quiero crear un flujo de entrada y leerlos utilizando un formato de entrada autodefinido. La...

4  Cómo se inicia una aplicación SPARK usando SBT RUN  ( How a spark application starts using sbt run ) 
En realidad, quiero saber el mecanismo subyacente de cómo ocurre esto cuando ejecuto sbt run ¡Se inicia la aplicación SPLP! Cuál es la diferencia entre es...

2  ¿Por qué el DataStax Cassandra SparkContext está seleccionando 124 filas en RDD al intentar seleccionar solo una fila única a través del límite 1  ( Why is datastax cassandra sparkcontext is selecting 124 rows in rdd when trying ) 
Estoy usando DataStax Cassandra SparkContext (SC) y ejecutando el siguiente código SCALA: val table = sc.cassandraTable("database_name", "table_name") val ...

6  Pyspark truncará un decimal  ( Pyspark truncate a decimal ) 
Estoy trabajando en Pyspark y tengo una latitud variable que tiene muchos lugares decimales. Necesito crear dos nuevas variables de esto, una que se redondea ...

1  Pyspark DataFrame - Force ansia en caché de datos de datos - Tomar (1) VS Count ()  ( Pyspark dataframe force eager dataframe cache take1 vs count ) 
Uno de los enfoques para forzar el almacenamiento en caché / persistencia está llamando a una acción después del caché / persistente, por ejemplo: df.cache...

0  NegativearRaysizeException mientras la predicción de entrenamiento IO Universal Recomendación  ( Negativearraysizeexception while training prediction io universal recommender ) 
Estoy tratando de implementar un sistema de predicción IO. Estoy obteniendo la percepción negativa y la fase de entrenamiento. Ayuda es apreciado. Los eve...

-1  Cómo mantener el orden de los datos al seleccionar los valores distintos de la columna del conjunto de datos  ( How to maintain the order of the data while selecting the distinct values of col ) 
Tengo un conjunto de datos como abajo, datetime24 Necesito seleccionar los valores distintos del COL1 y mi conjunto de datos resultante debe tener el pe...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos