दिलचस्प पोस्ट
रिच टेक्स्टबॉक्स स्ट्रिंग के विभिन्न भागों का रंग BouncyCastle बिना जावा में एक X509 प्रमाणपत्र बनाना? क्या कोई अद्वितीय एंड्रॉइड डिवाइस आईडी है? उपयोग में नहीं होने पर जावा ढेर कम करने का कोई तरीका क्या है? बिल्ट एंडियन के लिए लिटिल एंडियन को परिवर्तित करना IOS में numpad कीबोर्ड में 'संपन्न' बटन जोड़ने के लिए कैसे करें JSON नोड या एक्सप्रेस का उपयोग करने के लिए उचित तरीका है आईओएस में अपनी स्वयं के यूआरएल यूआरएल को संभालने के लिए संभव है? वस्तुओं में द्विआधारी खोज को लागू करें पायथन में एक नियमित ग्रिड पर अनियमित रूप से अंतरित डेटा को पुन: नमूना करना यूनिक्स लाइन के अंत में कनवर्ट करने के लिए विंडोज कमांड? फायरबसे एफसीएम फोर्स ऑन टॉकनफ्रेश () को कॉल करने के लिए कैसे एक JUnit परीक्षण में एकाधिक दावा से बचने के लिए? Ctrl और कर्सर कुंजियों को मारते समय विजुअल स्टूडियो को कैमेल कैस को समझें क्या (4> वाई> 1) सी ++ में एक मान्य स्टेटमेंट है? यदि आप ऐसा करते हैं तो इसका मूल्यांकन कैसे करें?

बड़ी डेटासेट के लिए COGROUP का उपयोग कैसे करें

मेरे पास दो rdd's अर्थात् val tab_a: RDD[(String, String)] और val tab_b: RDD[(String, String)] मैं उन डेटासेट के लिए cogroup का उपयोग कर रहा हूँ जैसे:

 val tab_c = tab_a.cogroup(tab_b).collect.toArray val updated = tab_c.map { x => { //somecode } } 

मैं मैप फ़ंक्शन के लिए tab_c क्यूग्रेटेड मानों का उपयोग कर रहा हूं और यह छोटे डेटासेट्स के लिए ठीक काम करता है, लेकिन बड़ी डेटासेट के मामले में यह Out Of Memory exception फेंकता है

मैंने अंतिम मूल्य को आरडीडी में परिवर्तित करने की कोशिश की है, लेकिन कोई भी वही त्रुटि नहीं है

 val newcos = spark.sparkContext.parallelize(tab_c) 

1. बड़े डेटासेट के लिए Cogroup का उपयोग कैसे करें?

2.क्या हम कूटबद्ध मूल्य को जारी रख सकते हैं?

कोड

  val source_primary_key = source.map(rec => (rec.split(",")(0), rec)) source_primary_key.persist(StorageLevel.DISK_ONLY) val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec)) destination_primary_key.persist(StorageLevel.DISK_ONLY) val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect() var srcmis: Array[String] = new Array[String](0) var destmis: Array[String] = new Array[String](0) var extrainsrc: Array[String] = new Array[String](0) var extraindest: Array[String] = new Array[String](0) var srcs: String = Seq("")(0) var destt: String = Seq("")(0) val updated = cos.map { x => { val key = x._1 val value = x._2 srcs = value._1.mkString(",") destt = value._2.mkString(",") if (srcs.equalsIgnoreCase(destt) == false && destt != "") { srcmis :+= srcs destmis :+= destt } if (srcs == "") { extraindest :+= destt.mkString("") } if (destt == "") { extrainsrc :+= srcs.mkString("") } } } 

कोड अपडेट किया गया:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2) // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)} {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}.. 

त्रुटि:

  ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

धन्यवाद

Solutions Collecting From Web of "बड़ी डेटासेट के लिए COGROUP का उपयोग कैसे करें"

जब आप collect() करते collect() आप मूल रूप से मास्टर नोड के लिए सभी परिणामस्वरूप डेटा को स्थानांतरित करने के लिए स्पार्क कह रहे हैं, जो आसानी से एक बाधा उत्पन्न कर सकते हैं आप उस समय स्पार्क का उपयोग नहीं कर रहे हैं, केवल एक मशीन में एक सादा सरणी।

गणना को ट्रिगर करने के लिए केवल कुछ नोड का उपयोग करें, जिसके लिए हर नोड पर डेटा की आवश्यकता होती है, इसलिए, वितरक एक वितरित फाइल सिस्टम के शीर्ष पर रहते हैं। उदाहरण के लिए saveAsTextFile()

यहां कुछ बुनियादी उदाहरण दिए गए हैं

याद रखें, यहां पूरे उद्देश्य (जो कि आपके पास बड़ा डेटा है) कोड को अपने डेटा में ले जाने और गणना करने के लिए, सभी डेटा को अभिकलन में लाने के लिए नहीं है।

टीएल; डीआर मत collect करें

इस कोड को सुरक्षित रूप से चलाने के लिए, अतिरिक्त मान्यताओं के बिना (कार्यकर्ता नोड्स के लिए औसत आवश्यकताएं काफी कम हो सकती हैं), प्रत्येक नोड (ड्राइवर और प्रत्येक निष्पादक) को सभी डेटा के लिए कुल मेमोरी आवश्यकताओं से काफी मेमोरी की आवश्यकता होगी।

यदि आप इसे स्पार्क के बाहर चलाने के लिए चाहते थे तो आपको केवल एक नोड की आवश्यकता होगी इसलिए स्पार्क यहां कोई लाभ प्रदान नहीं करता है।

हालांकि अगर आप collect.toArray करते हैं तो। collect.toArray और डेटा वितरण के बारे में कुछ मान्यताओं को बना सकते हैं, आप इसे ठीक कर सकते हैं।