दिलचस्प पोस्ट
मुद्रा मुद्रा के बिना स्वरूप मुद्रा मैं datetime.strptime के साथ अवधि (AM / PM) के लिए कैसे खाता करूं? कुल / चल रहे शेष राशि की गणना करें PHP, शाब्दिक कुंजियों के साथ एसोसिएटिव सरणी से पहले? WinAPI के साथ फ़ोल्डर के चयनित आइटम प्राप्त करें सी को एक निष्पादन योग्य बाइनरी फ़ाइल में कैसे संकलित करें और एंड्रॉइड शेल से Android में चलाएं? बैकबोन.जेएस और पुस्टस्टेट कोको में स्ट्रिंग्स की तुलना करना क्या करता है * = मतलब है? लिनक्स पर एक अंतरफलक का आईपी पता प्राप्त करें क्या हम एक जावा डेस्कटॉप अनुप्रयोग में किसी भी GUI अद्यतन के लिए EventQueue.invokeLater का उपयोग करना चाहिए? डेटाबेस> 500 लाख पंक्तियों को नियंत्रित कर सकता है क्या मैं <textarea> टैग के अंदर HTML स्वरूपण एम्बेड कर सकता हूं? संसाधन के पहले ही मौजूद होने पर POST के लिए HTTP प्रतिसाद कोड Symfony2 – खुद के विक्रेता बंडल का निर्माण – परियोजना और जीआईटी रणनीति

स्पार्क एसक्यूएल में यूज़र-डिफ़ाइंड कुल फ़ंक्शन को कैसे परिभाषित और इस्तेमाल किया जाए?

मुझे स्पार्क एसक्यूएल में एक यूडीएफ कैसे लिखना है:

def belowThreshold(power: Int): Boolean = { return power < -40 } sqlContext.udf.register("belowThreshold", belowThreshold _) 

क्या मैं एक समान कार्य को परिभाषित करने के लिए कुछ ऐसा कर सकता हूँ? यह कैसे किया जाता है?

संदर्भ के लिए, मैं निम्नलिखित SQL क्वेरी को चलाने के लिए चाहता हूँ:

 val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp FROM ifDF WHERE opticalReceivePower IS NOT null GROUP BY span, timestamp ORDER BY span""") 

इसे कुछ जैसा वापस करना चाहिए

Row(span1, false, T0)

मैं कुल कार्य करना चाहता हूं कि मुझे बताएं कि क्या opticalReceivePower लिए कोई मान है जो span और timestamp द्वारा निर्धारित समूहों में है जो दहलीज से नीचे हैं। क्या मुझे यूडीएफ को ऊपर वर्णित यूडीएफ को अलग तरह से लिखना है?

Solutions Collecting From Web of "स्पार्क एसक्यूएल में यूज़र-डिफ़ाइंड कुल फ़ंक्शन को कैसे परिभाषित और इस्तेमाल किया जाए?"

समर्थित तरीके

स्पार्क> = 2.3

वेक्टरयुक्त udf (केवल पायथन):

 from pyspark.sql.functions import pandas_udf from pyspark.sql.types import * import pandas as pd df = sc.parallelize([ ("a", 0), ("a", 1), ("b", 30), ("b", -50) ]).toDF(["group", "power"]) def below_threshold(threshold, group="group", power="power"): @pandas_udf("struct<group: string, below_threshold: boolean>") def below_threshold_(df): df = pd.DataFrame( df.groupby(group).apply(lambda x: (x[power] < threshold).any())) df.reset_index(inplace=True, drop=False) return df return below_threshold_ 

उदाहरण उपयोग:

 df.groupBy("group").apply(below_threshold(-40)).show() ## +-----+---------------+ ## |group|below_threshold| ## +-----+---------------+ ## | b| true| ## | a| false| ## +-----+---------------+ 

स्पार्क> = 2.0 (वैकल्पिक रूप से 1.6, लेकिन थोड़ा अलग एपीआई के साथ):

टाइप Datasets गए Datasets पर Aggregators का उपयोग करना संभव है:

 import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Encoder, Encoders} class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean] with Serializable { def zero = false def reduce(acc: Boolean, x: I) = acc | f(x) def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2 def finish(acc: Boolean) = acc def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean } val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold) 

स्पार्क> = 1.5 :

स्पार्क 1.5 में आप यूडीएफ़ को इस तरह बना सकते हैं हालांकि यह सबसे अधिक संभावना है:

 import org.apache.spark.sql.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.Row object belowThreshold extends UserDefinedAggregateFunction { // Schema you get as an input def inputSchema = new StructType().add("power", IntegerType) // Schema of the row which is used for aggregation def bufferSchema = new StructType().add("ind", BooleanType) // Returned type def dataType = BooleanType // Self-explaining def deterministic = true // zero value def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false) // Similar to seqOp in aggregate def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40) } // Similar to combOp in aggregate def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0)) } // Called on exit to get return value def evaluate(buffer: Row) = buffer.getBoolean(0) } 

उदाहरण उपयोग:

 df .groupBy($"group") .agg(belowThreshold($"power").alias("belowThreshold")) .show // +-----+--------------+ // |group|belowThreshold| // +-----+--------------+ // | a| false| // | b| true| // +-----+--------------+ 

स्पार्क 1.4 वर्कअराउंड :

मुझे यकीन नहीं है कि मैं आपकी आवश्यकताओं को सही ढंग से समझ रहा हूं, लेकिन जहाँ तक मैं सादा पुराने समेकन बता सकता हूं वह पर्याप्त होना चाहिए:

 val df = sc.parallelize(Seq( ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power") df .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType)) .groupBy($"group") .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold")) .show // +-----+--------------+ // |group|belowThreshold| // +-----+--------------+ // | a| false| // | b| true| // +-----+--------------+ 

स्पार्क <= 1.4 :

जहां तक ​​मुझे पता है, इस समय (स्पार्क 1.4.1), यूवीएएफ के लिए कोई समर्थन नहीं है, छत्ते के अलावा अन्य नहीं है स्पार्क 1.5 के साथ यह संभव होना चाहिए ( स्पार्क-3 9 47 देखें)

असमर्थित / आंतरिक विधियां

आंतरिक स्पार्क कई श्रेणियों का उपयोग करता है जिनमें ImperativeAggregates और DeclarativeAggregates

आंतरिक उपयोग के लिए अभिप्रेत है और आगे की सूचना के बिना बदल सकते हैं, इसलिए शायद यह संभव नहीं है कि आप अपने प्रोडक्शन कोड में उपयोग करना चाहते हैं, लेकिन केवल नीचे के पूर्णता के लिए DeclarativeAggregate साथ BelowThreshold इस तरह कार्यान्वित किया जा सकता है (स्पार्क 2.2- BelowThreshold साथ परीक्षण किया गया):

 import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ case class BelowThreshold(child: Expression, threshold: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = Seq(child, threshold) override def nullable: Boolean = false override def dataType: DataType = BooleanType private lazy val belowThreshold = AttributeReference( "belowThreshold", BooleanType, nullable = false )() // Used to derive schema override lazy val aggBufferAttributes = belowThreshold :: Nil override lazy val initialValues = Seq( Literal(false) ) override lazy val updateExpressions = Seq(Or( belowThreshold, If(IsNull(child), Literal(false), LessThan(child, threshold)) )) override lazy val mergeExpressions = Seq( Or(belowThreshold.left, belowThreshold.right) ) override lazy val evaluateExpression = belowThreshold override def defaultResult: Option[Literal] = Option(Literal(false)) } 

इसे आगे withAggregateFunction बराबर के साथ लिपटा जाना चाहिए।