दिलचस्प पोस्ट
Laravel सुवक्ता समूह () और प्रत्येक समूह की गणना भी वापस करता है एनोटेशन @ आईडी और @ जनरेटेड वैल्यू (रणनीति = जनरेशन प्रकार.आईएनएनटीटीआई) का क्या उपयोग है? क्यों पीढ़ी प्रकार की पहचान है? सी गेटलाइन () – कैसे बफर के साथ निपटने के लिए / कैसे सरणी में मूल्यों की अज्ञात संख्या को पढ़ने के लिए ऐरे बनाम लिस्ट <टी>: किसका उपयोग करने के लिए? एनएसएमयूटीबलएआरआरआर से निकालने का सबसे अच्छा तरीका क्या है? क्या उन्हें वास्तव में 'निल' के लिए पॉइंटर्स सेट करना चाहिए? उपाय। जब एक प्रॉक्सी हो तो install_github कैसे करें sqrt के लिए अनिर्धारित संदर्भ (या अन्य गणितीय कार्य) एक साजिश के अक्ष पर टिक के निशान की रिक्ति बदलते हैं? सिंक्रनाइज़ेशन कॉन्टैक्ट। मुख्य यूआई धागे पर निरंतरता में निरर्थक है डी 3 नोड लेबलिंग एक PostgreSQL चयन क्वेरी में अधिकतम कॉलम की संख्या क्या है इस java.lang.NoClassDefFoundError को कैसे हल करें: org / apache / commons / io / output / deferfileoutputstream? पीआईएल के साथ एक पीएनजी छवि का अल्फा मान कैसे प्राप्त करें? आवेश त्रुटि: लिनक्स अतिथि में फ़ोल्डर्स माउंट करने में विफल

प्रत्येक समूह की पहली पंक्ति का चयन कैसे करें?

मेरे पास एक डेटाफ्रेम है, जिसका अनुसरण किया गया है:

df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) 

परिणाम दिखते हैं:

 +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 0| cat13| 22.1| | 0| cat95| 19.6| | 0| cat105| 1.3| | 1| cat67| 28.5| | 1| cat4| 26.8| | 1| cat13| 12.6| | 1| cat23| 5.3| | 2| cat56| 39.6| | 2| cat40| 29.7| | 2| cat187| 27.9| | 2| cat68| 9.8| | 3| cat8| 35.6| | ...| ....| ....| +----+--------+----------+ 

जैसा कि आप देख सकते हैं, डेटाफ्रेम का क्रम बढ़ते Hour में ऑर्डर किया जाता है, फिर कुलवॉल्यू द्वारा अवरोही क्रम में।

मैं प्रत्येक समूह की शीर्ष पंक्ति का चयन करना चाहूंगा, अर्थात्

  • घंटे के समूह से == 0 चुनें (0, बिल्ली 26, 30.9)
  • घंटे के समूह से == 1 का चयन करें (1, बिल्ली 67,28.5)
  • घंटे के समूह से == 2 का चयन करें (2, बिल्ली 56,39.6)
  • और इसी तरह

तो वांछित उत्पादन होगा:

 +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 1| cat67| 28.5| | 2| cat56| 39.6| | 3| cat8| 35.6| | ...| ...| ...| +----+--------+----------+ 

प्रत्येक समूह की शीर्ष एन पंक्तियों को भी चुनने में सक्षम हो सकता है

किसी भी सहायताको बहुत सराहा जाएगा।

Solutions Collecting From Web of "प्रत्येक समूह की पहली पंक्ति का चयन कैसे करें?"

विंडो फ़ंक्शंस :

ऐसा कुछ ऐसा करना चाहिए:

 import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window val df = sc.parallelize(Seq( (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3), (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3), (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8), (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue") val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc) val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+ 

महत्वपूर्ण डेटा तिरछा के मामले में यह विधि अक्षम हो जाएगी।

join बाद सादा एसक्यूएल एकत्रीकरण :

वैकल्पिक रूप से आप एकत्रित डेटा फ्रेम में शामिल हो सकते हैं:

 val dfMax = df.groupBy($"hour").agg(max($"TotalValue")) val dfTopByJoin = df.join(broadcast(dfMax), ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value")) .drop("max_hour") .drop("max_value") dfTopByJoin.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+ 

यह डुप्लिकेट मान रखेगा (यदि एक से अधिक श्रेणी प्रति घंटे समान कुल मूल्य के साथ हैं)। आप निम्नानुसार इन्हें हटा सकते हैं:

 dfTopByJoin .groupBy($"hour") .agg( first("category").alias("category"), first("TotalValue").alias("TotalValue")) 

structs ऊपर आदेश का उपयोग करना :

साफ, हालांकि बहुत अच्छी तरह से परीक्षण नहीं किया गया, चाल जो कि जुड़ने या विंडो कार्यों की आवश्यकता नहीं है:

 val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+ 

डाटासेट एपीआई (स्पार्क 1.6+, 2.0+) के साथ:

स्पार्क 1.6 :

 case class Record(Hour: Integer, Category: String, TotalValue: Double) df.as[Record] .groupBy($"hour") .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) .show // +---+--------------+ // | _1| _2| // +---+--------------+ // |[0]|[0,cat26,30.9]| // |[1]|[1,cat67,28.5]| // |[2]|[2,cat56,39.6]| // |[3]| [3,cat8,35.6]| // +---+--------------+ 

स्पार्क 2.0 या बाद में :

 df.as[Record] .groupByKey(_.Hour) .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y) 

आखिरी दो विधियां नक्शा साइड का संयोजन कर सकती हैं और पूर्ण फेरबदल की आवश्यकता नहीं है, इसलिए अधिकांश समय विंडो फ़ंक्शन के मुकाबले बेहतर प्रदर्शन को प्रदर्शित करते हैं और जुड़ते हैं।

का प्रयोग न करें :

 df.orderBy(...).groupBy(...).agg(first(...), ...) 

ऐसा लग सकता है कि काम (विशेषकर local मोड में) पर अविश्वसनीय है ( SPARK-16207 )। संबंधित जिरा मुद्दे को जोड़ने के लिए त्शेख ज़्हहर को क्रेडिट।

उसी नोट पर लागू होता है

 df.orderBy(...).dropDuplicates(...) 

जो आंतरिक रूप से समान निष्पादन योजना का उपयोग करता है।

स्पार्क 2.0.2 के लिए कई कॉलमों के समूहिंग के साथ:

 import org.apache.spark.sql.functions.row_number import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc) val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") 

यदि डेटाफ़्रेम को कई कॉलमों द्वारा समूहीकृत किया जाना है, तो यह मदद कर सकता है

 val keys = List("Hour", "Category"); val selectFirstValueOfNoneGroupedColumns = df.columns .filterNot(keys.toSet) .map(_ -> "first").toMap val grouped = df.groupBy(keys.head, keys.tail: _*) .agg(selectFirstValueOfNoneGroupedColumns) 

उम्मीद है कि इस तरह की किसी की भी इसी तरह की समस्या है

चिंगारी के लिए> 2.0 हम बस कर सकते हैं:
groupBy($"Hour").agg(df_op.columns.map((_, "first")).toMap)

विस्तार से ओपी के सेटअप का प्रयोग करें:

 val df_op = df.groupBy($"Hour", $"Category") .agg(sum($"value") as "TotalValue") .sort($"Hour".asc, $"TotalValue".desc)) df_op.groupBy($"Hour").agg(df_op.columns.map((_, "first")).toMap) 

यह Compute aggregates by specifying a map from column name to aggregate methods. agg करने के लिए agg की agg विधि का उपयोग कर रहा है Compute aggregates by specifying a map from column name to aggregate methods.first एक एसक्यूएल एकत्रीकरण कार्य है

हम रैंक () विंडो फ़ंक्शन का उपयोग कर सकते हैं (जहां आप रैंक = 1 का चयन करेंगे) रैंक सिर्फ एक समूह की प्रत्येक पंक्ति के लिए संख्या जोड़ता है (इस स्थिति में यह समय होगा)

यहाँ एक उदाहरण है ( https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank से )

 val dataset = spark.range(9).withColumn("bucket", 'id % 3) import org.apache.spark.sql.expressions.Window val byBucket = Window.partitionBy('bucket).orderBy('id) scala> dataset.withColumn("rank", rank over byBucket).show +---+------+----+ | id|bucket|rank| +---+------+----+ | 0| 0| 1| | 3| 0| 2| | 6| 0| 3| | 1| 1| 1| | 4| 1| 2| | 7| 1| 3| | 2| 2| 1| | 5| 2| 2| | 8| 2| 3| +---+------+----+