दिलचस्प पोस्ट
डी 3 बार ग्राफ़ उदाहरण स्थानीय रूप से काम नहीं कर रहा है साझा मेमोरी का उपयोग कर प्रक्रियाओं के बीच सेमाफोर कैसे साझा करें ऐप्पल इंटरफ़ेस बिल्डर: यूआईएमएजेव्यू में सबव्यू को जोड़ना यूआरएल Slugify एल्गोरिथ्म सी # में? कैसे जांचने के लिए कि एक स्ट्रिंग अजगर में एक सूची से एक तत्व है MySQL इनर क्वेरी एकाधिक मसौदाओं में शामिल हों स्ट्रिंग में यूआरएल की उपस्थिति का पता लगाने के लिए मैं जावा 8 स्ट्रीमों के साथ कार्टेशियन उत्पाद कैसे बना सकता हूं? लक्ष्य निर्भरता के लिए एक्सकोडबिल्ड अलग प्रावधान प्रोफाइल पेज रिफ्रेश के बिना जावा के साथ हैश को कैसे हटाया जा सकता है? मैं विजुअल स्टूडियो के बिना एमएसटीईस्ट का उपयोग कैसे करूं? एचटीएमएल और सीएसएस अस्पष्ट करने के लिए उपकरण एंड्रॉइड में साझा उपयोगकर्ता आईडी क्या है, और इसका इस्तेमाल कैसे किया जाता है? सी ++ में शुद्ध आभासी नाशक व्यू बिल्ड समय क्या है?

स्पार्क डेटाफ़्रेम का उपयोग करते हुए JSON डेटा कॉलम की क्वेरी कैसे करें?

मेरे पास एक सीसांद्रा टेबल है जो सादगी के लिए कुछ ऐसा दिखता है:

key: text jsonData: text blobData: blob 

मैं इसका उपयोग करके चिंगारी और स्पार्क-कासांद्रा-कनेक्टर का उपयोग करने के लिए एक बुनियादी डेटा फ्रेम बना सकता हूं:

 val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() 

मैं हालांकि अपने अंतर्निहित संरचना में JSON डेटा का विस्तार करने के लिए संघर्ष कर रहा हूँ। मैं अंततः जेएसएसएन स्ट्रिंग के भीतर के गुणों के आधार पर फ़िल्टर करना और ब्लॉब डेटा वापस करना चाहता हूं। कुछ jsonData.foo = "बार" और वापस ब्लॉबडाटा। क्या यह संभव है?

Solutions Collecting From Web of "स्पार्क डेटाफ़्रेम का उपयोग करते हुए JSON डेटा कॉलम की क्वेरी कैसे करें?"

स्पार्क 2.1+

आप from_json फ़ंक्शन का उपयोग कर सकते हैं:

 import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("k", StringType, true), StructField("v", DoubleType, true) )) df.withColumn("jsonData", from_json($"jsonData", schema)) 

स्पार्क 1.6+

आप get_json_object उपयोग कर सकते हैं, जो एक स्तंभ और पथ लेता है:

 import org.apache.spark.sql.functions.get_json_object val exprs = Seq("k", "v").map( c => get_json_object($"jsonData", s"$$.$c").alias(c)) df.select($"*" +: exprs: _*) 

और फ़ील्ड को अलग-अलग स्ट्रिंग्स के लिए निकालता है, जिसे आगे अपेक्षित प्रकारों में जाँचा जा सकता है।

स्पार्क <= 1.5 :

क्या यह संभव है?

जहाँ तक मुझे पता है कि यह सीधे संभव नहीं है आप इस तरह से कुछ की कोशिश कर सकते हैं:

 val df = sc.parallelize(Seq( ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData") 

मुझे लगता है कि blob फ़ील्ड JSON में प्रदर्शित नहीं किया जा सकता। अन्यथा आप टैक्सी छोड़ते हैं और इसमें शामिल होने से:

 import org.apache.spark.sql.Row val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{ case Row(key: String, json: String) => s"""{"key": "$key", "jsonData": $json}""" }) val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema // root // |-- jsonData: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: double (nullable = true) // |-- key: long (nullable = true) // |-- blobData: string (nullable = true) 

एक वैकल्पिक (सस्ता, हालांकि अधिक जटिल) दृष्टिकोण एक JSD को पार्स करने के लिए एक यूडीएफ का उपयोग करना है और किसी struct या map कॉलम को आउटपुट करना है। उदाहरण के लिए ऐसा कुछ:

 import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show // +---+--------------------+------------------+----------+ // |key| jsonData| blobData|parsedJSON| // +---+--------------------+------------------+----------+ // | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| // | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| // +---+--------------------+------------------+----------+ parsed.printSchema // root // |-- key: string (nullable = true) // |-- jsonData: string (nullable = true) // |-- blobData: string (nullable = true) // |-- parsedJSON: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: integer (nullable = false) 

from_json फ़ंक्शन ठीक उसी प्रकार है जिसे आप ढूंढ रहे हैं। आपका कोड कुछ ऐसा दिखेगा:

 val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() //You can define whatever struct type that your json states val schema = StructType(Seq( StructField("key", StringType, true), StructField("value", DoubleType, true) )) df.withColumn("jsonData", from_json(col("jsonData"), schema)) 

अंतर्निहित JSON स्ट्रिंग है

 "{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}"; 

नीचे JSON को फ़िल्टर करने और Cassandra में आवश्यक डेटा लोड करने की स्क्रिप्ट है

  sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2") .write.format("org.apache.spark.sql.cassandra") .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name")) .mode(SaveMode.Append) .save()