фильтровать искровой массив данных с полем строки, представляющим собой массив строк

Использование Spark 1.5 и Scala 2.10.6

Я пытаюсь отфильтровать фрейм данных через поле «теги», которое является массивом строк. Ищем все строки, имеющие тег 'private'.

val report = df.select("*")
  .where(df("tags").contains("private"))

получаю:

  

Исключение в потоке "main" org.apache.spark.sql.AnalysisException:   невозможно разрешить «Содержит (теги, приватные)» из-за несоответствия типов данных:   аргумент 1 требует строкового типа, однако «теги» имеют массив   тип;.

Метод фильтра лучше подходит?

ОБНОВЛЕНО:

данные поступают из адаптера cassandra, но минимальный пример, который показывает, что я пытаюсь сделать, а также выдает ошибку, указанную выше:

  def testData (sc: SparkContext): DataFrame = {
    val stringRDD = sc.parallelize(Seq("""
      { "name": "ed",
        "tags": ["red", "private"]
      }""",
      """{ "name": "fred",
        "tags": ["public", "blue"]
      }""")
    )
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    sqlContext.read.json(stringRDD)
  }
  def run(sc: SparkContext) {
    val df1 = testData(sc)
    df1.show()
    val report = df1.select("*")
      .where(df1("tags").contains("private"))
    report.show()
  }

ОБНОВЛЕНО: массив тегов может иметь любую длину, а «закрытый» тег может быть в любой позиции

ОБНОВЛЕНО: одно работающее решение: UDF

val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")}
val report = df1.filter(filterPriv(df1("tags")))
12 голосов | спросил navicore 17 Jam1000000amSun, 17 Jan 2016 03:14:14 +030016 2016, 03:14:14

2 ответа


0

Я думаю, что если вы используете where(array_contains(...)), это сработает. Вот мой результат:

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame

scala> def testData (sc: SparkContext): DataFrame = {
     |     val stringRDD = sc.parallelize(Seq
     |      ("""{ "name": "ned", "tags": ["blue", "big", "private"] }""",
     |       """{ "name": "albert", "tags": ["private", "lumpy"] }""",
     |       """{ "name": "zed", "tags": ["big", "private", "square"] }""",
     |       """{ "name": "jed", "tags": ["green", "small", "round"] }""",
     |       """{ "name": "ed", "tags": ["red", "private"] }""",
     |       """{ "name": "fred", "tags": ["public", "blue"] }"""))
     |     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
     |     import sqlContext.implicits._
     |     sqlContext.read.json(stringRDD)
     |   }
testData: (sc: org.apache.spark.SparkContext)org.apache.spark.sql.DataFrame

scala>   
     | val df = testData (sc)
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> val report = df.select ("*").where (array_contains (df("tags"), "private"))
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> report.show
+------+--------------------+
|  name|                tags|
+------+--------------------+
|   ned|[blue, big, private]|
|albert|    [private, lumpy]|
|   zed|[big, private, sq...|
|    ed|      [red, private]|
+------+--------------------+

Обратите внимание, что это работает, если вы пишете where(array_contains(df("tags"), "private")), но если вы пишете where(df("tags").array_contains("private")) (более точно аналогично тому, что вы написали изначально), происходит сбой с array_contains is not a member of org.apache.spark.sql.Column. Глядя на исходный код Column, я вижу, что есть кое-что для обработки contains (для этого создаем экземпляр Contains), но не array_contains. Может быть, это недосмотр.

ответил Robert Dodier 18 Jam1000000amMon, 18 Jan 2016 00:20:21 +030016 2016, 00:20:21
0

Вы можете использовать порядковый номер для ссылки на массив json, например, для в вашем случае df("tags")(0). Вот рабочий образец

scala> val stringRDD = sc.parallelize(Seq("""
     |       { "name": "ed",
     |         "tags": ["private"]
     |       }""",
     |       """{ "name": "fred",
     |         "tags": ["public"]
     |       }""")
     |     )
stringRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[87] at parallelize at <console>:22

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> sqlContext.read.json(stringRDD)
res28: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> val df=sqlContext.read.json(stringRDD)
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> df.columns
res29: Array[String] = Array(name, tags)

scala> df.dtypes
res30: Array[(String, String)] = Array((name,StringType), (tags,ArrayType(StringType,true)))

scala> val report = df.select("*").where(df("tags")(0).contains("private"))
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]

scala> report.show
+----+-------------+
|name|         tags|
+----+-------------+
|  ed|List(private)|
+----+-------------+
ответил Aravind R. Yarram 17 Jam1000000amSun, 17 Jan 2016 06:39:28 +030016 2016, 06:39:28

Похожие вопросы

Популярные теги

security × 330linux × 316macos × 2827 × 268performance × 244command-line × 241sql-server × 235joomla-3.x × 222java × 189c++ × 186windows × 180cisco × 168bash × 158c# × 142gmail × 139arduino-uno × 139javascript × 134ssh × 133seo × 132mysql × 132