दिलचस्प पोस्ट
सर्वर सॉकेट 2 क्रोम अनुरोध प्राप्त करता है जब मैं क्रोम से भेजता हूं और जब मैं फ़ायरफ़ॉक्स से भेजता हूं तब प्राप्त होता है CakePHP में base_url एसएसएल की जांच कैसे करें मेटलैब मैट्रिक्स में डुबकी लगाने के लिए सामान्य विधि MVC4 मचान नियंत्रक जोड़ें त्रुटि देता है "मेटाडेटा प्राप्त करने में असमर्थ …" एंड्रॉइड: संसाधनों से नाम कैसे मिल सकता है? मैक पर apache2 के लिए $ पाथ परिवेश चर वर्तमान स्थिति को पुनः लोड करना – ताज़ा डेटा JavaFX में WebView का प्रदर्शन जनक संदर्भ में बाल संदर्भ में स्प्रिंग बीन घोषित करना यह निर्धारित करना कि वस्तु ऑब्जेक्ट VBA में एक संग्रह का सदस्य है ओपनजीएल पिक्सल में शीर्ष स्थिति को परिभाषित करता है जाँच कैसे करें कि एक चर का प्रकार स्ट्रिंग है? समय जावाएफ़एक्स कैनवास आवेदन एक स्ट्रिंग को अल्पविराम से विभाजित करें, लेकिन जावास्क्रिप्ट का उपयोग करके दोहरे उद्धरण चिह्नों में अल्पविराम को अनदेखा करें

निरीक्षण करें और सदस्यता लें- जहां पर काम किया जा रहा है

इस प्रश्न को पढ़ने के आधार पर: सदस्यता और निरीक्षण के बीच अंतर क्या है?

ObserveOn कि जहां Subscribe हैंडलर हैंडलर में है, वह निष्पादित होता है:

stream.Subscribe(_ => { // this code here });

SubscribeOn विधि सेट करता है कि स्ट्रीम का सेटअप किस धागा पर किया जाता है।

मुझे यह समझने के लिए प्रेरित किया गया है कि यदि ये स्पष्ट रूप से सेट नहीं हैं, तो कार्यपूल का उपयोग किया जाता है।

अब मेरा सवाल है, कहें कि मैं ऐसा कुछ करता हूं:

Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));

कहां कहां हैं, Where predicate और SelectMany lots_of SelectMany lots_of SelectMany lots_of जा रहे हैं, यह देखते हुए कि some_action पर some_action निष्पादित किया जा रहा है?

Solutions Collecting From Web of "निरीक्षण करें और सदस्यता लें- जहां पर काम किया जा रहा है"

SubscribeOn बारे में बहुत सारी भ्रामक जानकारी है, और ObserveOn

सारांश

  • SubscribeOn IObservable<T> की एक ही विधि को कॉल करने के लिए कॉल, जो Subscribe , और Subscribe द्वारा वापस आये IDisposable हैंडल पर IObservable<T> करने के लिए कॉल करता है।
  • ObserveOn IObserver<T> , जो OnNext , OnCompleted और OnError के तरीकों के लिए कॉल करने पर कॉल करता है।
  • दोनों तरीकों से संबंधित कॉल को निर्दिष्ट शेड्यूलर पर बना दिया जाता है।

विश्लेषण और प्रात्यक्षिक

बयान

निरीक्षण करें कि सदस्य हैंडलर में कोड निष्पादित होने पर सेट होता है:

मददगार से अधिक भ्रामक है जो आप " OnNext को सब्सक्राइब करें" के रूप में संदर्भित कर रहे हैं वह वास्तव में एक OnNext हैंडलर है। याद रखें, IObservable की Subscribe पद्धति एक IObservable स्वीकार करता है जिस पर OnNext , OnCompletedOnCompleted और OnError विधियां हैं, लेकिन यह एक्सटेंशन विधियां हैं जो सुविधा अधिभार प्रदान करती हैं जो लैम्ब्डा को स्वीकार करते हैं और आपके लिए IObserver कार्यान्वयन का निर्माण करते हैं।

मुझे हालांकि शब्द उपयुक्त; मैं सोचता हूं कि "सदस्यता लें सदस्यता लें" उस नमूने में कोड है, जिसे Subscribe कहा जाता है, तब लागू होता है। इस तरह, विवरण ऊपर से अधिक सब्सकोड के उद्देश्य से मिलते हैं।

SubscribeOn

सब्सक्रिप्शन एक अनुक्रमित के Subscribe पद्धति का कारण होता है जो कि निर्दिष्ट शेड्यूलर या प्रसंग पर अतुल्यकालिक निष्पादित किया जा सके। आप इसका उपयोग तब करते हैं जब आप जिस तरह से चल रहे सभी थ्रेड से सब्सक्रिप्शन विधि को कॉल नहीं करना चाहते हैं – आमतौर पर क्योंकि यह लंबे समय से चल रहा है और आप कॉलिंग थ्रेड को ब्लॉक नहीं करना चाहते हैं

जब आप Subscribe करें कहते हैं, तो आप एक अनदेखी कह रहे हैं जो अवलोकन के एक लंबी श्रृंखला का हिस्सा हो सकता है। यह केवल अवलोकन है कि सब्सक्रिप्शन को इसके प्रभाव पर लागू किया गया है। अब यह मामला हो सकता है कि श्रृंखला में सभी अवलोकनों का तुरंत और उसी धागा पर सब्सक्राइब किया जाएगा – लेकिन यह मामला नहीं होना चाहिए। उदाहरण के लिए Concat के बारे में सोचें – जो पूर्ववर्ती धारा समाप्त हो जाने पर केवल प्रत्येक लगातार स्ट्रीम की सदस्यता लेती है, और आम तौर पर यह पूर्ववर्ती स्ट्रीम जो कि ऑन- OnCompleted से बुलाया जाता है, पर जगह ले जाएगा।

तो सब्सक्रिप्शन सदस्यता के लिए आपकी कॉल के बीच बैठता है और आप जिस Subscribe को स्वीकार कर रहे हैं, कॉल को अवरुद्ध करते हैं और इसे अतुल्यकालिक बनाते हैं।

यह सदस्यता के निपटान को भी प्रभावित करता है Subscribe एक IDisposable हैंडल देता है जिसका उपयोग सदस्यता समाप्त करने के लिए किया जाता है। SubscribeOn से Dispose लिए कॉल सुनिश्चित करता है आपूर्ति अनुसूचक पर निर्धारित हैं

भ्रम की एक सामान्य बात यह समझने की कोशिश कर रही है कि SubscribeOn क्या है, यह है कि एक नमूने के Subscribe हैंडलर को यह उसी धागा पर ऑन- OnNext , OnCompleted या OnError भी कहा जा सकता है। हालांकि, इसका उद्देश्य इन कॉलों को प्रभावित नहीं करना है Subscribe पद्धति रिटर्न से पहले एक स्ट्रीम पूरी होने के लिए यह असामान्य नहीं है Observable.Return , उदाहरण के लिए, ऐसा करता है। चलो एक नज़र डालते हैं।

यदि आपने जासूस पद्धति का उपयोग किया है, तो मैंने लिखा है, और निम्न कोड चलाया है:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

आपको यह आउटपुट मिलता है (थ्रेड आईडी बिल्कुल भिन्न हो सकता है):

 Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

आप देख सकते हैं कि संपूर्ण सदस्यता हैंडलर उसी धागा पर चल रहा था, और लौटने से पहले समाप्त हुआ।

चलिए इस एसिंक्रोनस को चलाने के लिए सब्सक्रिप्शन का उपयोग करें। हम Return नजर रखेंगे और SubscribeOn नजर रखेंगे:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

यह आउटपुट (लाइन नंबर मेरे द्वारा जोड़े गए):

 01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed. 

01 – मुख्य विधि धागा 1 पर चल रहा है।

02 – Return परिकल्पित कॉलिंग थ्रेड पर मूल्यांकन किया गया है। हम यहां सिर्फ IObservable रहे हैं, अभी तक कोई भी सदस्यता नहीं ले रहा है।

03 – सब्सक्रिप्शन ऑब्जेक्टिव का कॉलिंग थ्रेड पर मूल्यांकन किया गया है।

04 – अब आखिर में हम Subscribe की Subscribe पद्धति को कॉल करते हैं।

05 – Subscribe विधि एसिंक्रोनस पूर्ण …

06 – … और धागा 1 मुख्य विधि को लौटाता है। यह सब्सक्राइब पर कार्रवाई का प्रभाव है!

07 – इस बीच, SubscribeOn ने Return लिए डिफ़ॉल्ट शेड्यूलर पर एक कॉल शेड्यूल किया है। यह धागा 2 पर प्राप्त हुआ है।

08 – और जैसा कि Return करता है, यह Subscribe धागा पर OnNext को कॉल करता है …

09 – और SubscribeOn सिर्फ अब एक पास है

10,11 – OnCompleted लिए समान

12 – और अंतिम सबस्क्रिप्शन हैंडलर के लिए किया जाता है।

उम्मीद है कि SubscribeOn के उद्देश्य और प्रभाव को साफ करता है!

ObserveOn

यदि आप ObserveOn बारे में सोचते हैं कि Subscribe पद्धति के लिए एक इंटरसेप्टर के रूप में कॉल को एक अलग थ्रेड पर ObserveOn है, तो ObserveOn करें एक ही काम करता है, परन्तु OnNext , OnCompleted और OnError कॉल के लिए।

हमारे मूल उदाहरण को याद करें:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

जो इस आउटपुट को दिया था:

 Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

अब इसका उपयोग करने के लिए ObserveOn :

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

हम निम्नलिखित आउटपुट प्राप्त करते हैं:

 01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2 

01 – मुख्य विधि थ्रेड 1 पर चल रही है I

02 – पहले के रूप में, लौटते समय की गणना का मूल्यांकन कॉलिंग धागे पर किया जाता है। हम यहां सिर्फ IObservable रहे हैं, अभी तक कोई भी सदस्यता नहीं ले रहा है।

03 – निरीक्षण पर कॉलिंग धागा पर भी मूल्यांकन किया जाता है।

04 – अब हम कॉलिंग थ्रेड पर फिर से सदस्यता लेते हैं, सबसे पहले ObserveOn ऑब्जेक्ट …

05 – … जो फिर से Return लिए कॉल को पास करता है।

06 – अब अपने Subscribe हैंडलर में OnNext Return कॉल OnNext

07 – यहां पर ObserveOn का प्रभाव है ObserveOn हम देख सकते हैं कि OnNext को अतुल्यकालिक थ्रेड 2 पर निर्धारित किया गया है

08 – इस बीच Return कॉल OnCompleted 1 पर OnCompleted

09 – और Return की सदस्यता हैंडलर पूर्ण करता है …

10 – और उसके बाद भी सदस्यता का ObserveOn

11 – इसलिए मुख्य विधि में नियंत्रण वापस कर दिया गया है

12 – इस बीच, ObserveOn , शट डाउन किया गया है Return OnCompleted इसे थ्रेड पर कॉल करें। यह 09-11 के दौरान किसी भी समय हो सकता था क्योंकि यह एसिंक्रोनस चल रहा है बस ऐसा होता है, अंततः इसे अब बुलाया गया है।

ठेठ उपयोग के मामलों क्या हैं?

जब आप एक लंबे समय से चलने योग्य नमूने की Subscribe लेंगे और जितनी जल्दी हो सके डिस्पैचर थ्रेड को प्राप्त करना चाहते हैं, आपको अक्सर सबसे सब्सक्रिप्शन देखा जाएगा – शायद इसलिए कि आप जानते हैं कि यह उन अवलोकनों में से एक है जो सब कुछ सदस्यता में काम करता है हैंडलर। इसे अवलोकन श्रृंखला के अंत में लागू करें, क्योंकि यह आपकी पहली सदस्यता है जब आप सदस्यता लेते हैं।

आप सबसे अधिक बार देखेंगे जब आप सुनिश्चित OnNext चाहते हैं कि एक GUI में उपयोग किया जाता है, OnCompleted OnNext , और OnError कॉल्स डिस्पैचर थ्रेड पर वापस लौटे जाते हैं। इसे यथासंभव देर से संक्रमण के लिए अवलोकन श्रृंखला के अंत में लागू करें।

उम्मीद है कि आप यह देख सकते हैं कि आपके प्रश्न का उत्तर यह है कि ObserveOnDispatcher उन थ्रेड्स में कोई फर्क नहीं पड़ेगा, Where और SelectMany कई निष्पादित होते हैं – यह सब निर्भर करता है कि क्या धागा स्ट्रीम उनसे कह रहा है! स्ट्रीम के सदस्यता हैंडलर को कॉलिंग थ्रेड पर लागू किया जाएगा, लेकिन यह कहने में असंभव है कि stream और कार्यान्वयन कैसे किया stream है, इसके बिना यह Where और Where SelectMany

लाइबेट्स के साथ ऑबजेबेट्स जो सदस्यता कॉल से अधिक जीवित हैं

अब तक ऊपर, हम विशेष रूप से Observable.Return में देख रहे हैं। Return Subscribe बहाल करनेवाला के भीतर अपनी स्ट्रीम को पूरा करता है यह असामान्य नहीं है, लेकिन Subscribe हेन्डलर से बाहर निकलने के लिए स्ट्रीम के लिए समान रूप से समान है उदाहरण के लिए Observable.Timer उदाहरण के लिए:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

यह निम्न देता है:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 

आप स्पष्ट रूप से सदस्यता को पूरा करने के बाद देख सकते हैं और फिर एक अलग थ्रेड पर बाद में कहा जा रहा है OnNext और OnCompleted

नोट करें कि ObserveOn या ObserveOn का कोई भी संयोजन उस पर कोई प्रभाव नहीं पड़ेगा, जिस पर धागा या शेड्यूलर Timer ऑन- OnNext और ऑन- OnCompleted ऑन को लागू करने के लिए OnNext

निश्चित रूप से, आप Subscribe धागा को निर्धारित करने के लिए SubscribeOn का उपयोग कर सकते हैं:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

(मैं जानबूझकर NewThreadScheduler को बदल रहा हूँ ताकि Timer के मामले में भ्रम को रोकने के लिए SubscribeOn रूप में एक ही थ्रेड पूल थ्रेड प्राप्त हो रहा हो)

देते हुए:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3 

यहां आप अपने Subscribe कॉल के बाद लौट (1) लौटने पर मुख्य धागे को स्पष्ट रूप से देख सकते हैं, लेकिन Timer सदस्यता को अपना धागा (2) मिल रहा है, लेकिन थ्रेड (3) पर चलने वाला OnNext और OnCompleted कॉल।

अब ObserveOn , चलो कोड को बदलते हैं (कोड में निम्नलिखित के लिए, न्यूजेट पैकेज आरएक्स-वाईपीएफ का उपयोग करें):

 var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

यह कोड थोड़ा अलग है पहली पंक्ति सुनिश्चित करता है कि हमारे पास एक डिस्पैचर है, और हम ObserveOnDispatcher भी लाते हैं – यह ObserveOn तरह ObserveOn , सिवाय इसके कि यह निर्दिष्ट करता है कि हमें DispatcherScheduler ObserveOnDispatcher का उपयोग किसी भी थ्रेड के ObserveOnDispatcher करना चाहिए। ObserveOnDispatcher का मूल्यांकन किया गया है

यह कोड निम्न आउटपुट देता है:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1 

ध्यान दें कि डिस्पैचर (और मुख्य धागा) धागा 1 है। Timer अभी भी अपने चयन (2) के धागे पर OnNext और OnCompleted कॉल कर रहा है – परन्तु ObserveOnDispatcher डिस्पैचर, प्रेषक थ्रेड, थ्रेड (1) पर वापस कॉल करता है।

यह भी ध्यान रखें कि यदि हम डिस्पैचर थ्रेड को अवरुद्ध करना चाहते हैं (एक थ्रेड से कहें। Thread.Sleep ) तो आप देखेंगे कि ObserveOnDispatcher ब्लॉक होगा (यह कोड LINQPad मुख्य विधि के अंदर सर्वश्रेष्ठ काम करता है):

 var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked"); 

और आप इस तरह आउटपुट देखेंगे:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1 

ObserveOnDispatcher माध्यम से कॉल केवल Sleep जाने के बाद ही बाहर निकलने में सक्षम होता है।

प्रमुख बिंदु

यह ध्यान में रखना उपयोगी है कि रिएक्टिव एक्सटेंशंस अनिवार्य रूप से एक मुक्त-थ्रेडेड लाइब्रेरी है, और यह संभवतः आलसी होने की कोशिश करता है कि यह किस प्रकार चल रहा है – आपको जानबूझकर हस्तक्षेप करना ObserveOn , ObserveOn , SubscribeOn और विशिष्ट शेड्यूलर को उन ऑपरेटरों को स्वीकार कर जो उन्हें स्वीकार करते हैं इसे बदलने के लिए

इसमें एक ObserveOn उपभोक्ता को कुछ भी नहीं है जो इसे आंतरिक रूप से कर रहा है, यह नियंत्रित करने के लिए कर सकते हैं – ObserveOn और SubscribeOn वे सज्जाकार हैं जो पर्यवेक्षकों की सतह क्षेत्र को लपेटते हैं और ObserveOn पर मार्शल कॉल के लिए निगरानी रखते हैं। उम्मीद है कि इन उदाहरणों ने स्पष्ट किया है

मैंने जेम्स का जवाब बहुत स्पष्ट और व्यापक पाया। हालांकि, इसके बावजूद मैं अभी भी खुद को मतभेदों को समझाने की कोशिश करता हूं

इसलिए मैंने एक बहुत ही सरल / बेवकूफ उदाहरण बनाया है जो मुझे रेखांकन से प्रदर्शित करने की अनुमति देता है कि किस समय की शेड्यूलर चीजों को बुलाया जा रहा है। मैंने एक क्लास MyScheduler बनाया है जो तुरंत क्रियाएं निष्पादित करता है, लेकिन कंसोल रंग बदल जाएगा।

ObserveOn अनुसूचक से पाठ आउटपुट लाल में आउटपुट है और ObserveOn शेड्यूलर से आउटपुट नीले रंग में है।

 using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace SchedulerExample { class Program { static void Main(string[] args) { var mydata = new[] {"A", "B", "C", "D", "E"}; var observable = Observable.Create<string>(observer => { Console.WriteLine("Observable.Create"); return mydata.ToObservable(). Subscribe(observer); }); observable. SubscribeOn(new MyScheduler(ConsoleColor.Red)). ObserveOn(new MyScheduler(ConsoleColor.Blue)). Subscribe(s => Console.WriteLine("OnNext {0}", s)); Console.ReadKey(); } } } 

यह आउटपुट:

अनुसूचक

और संदर्भ के लिए MyScheduler (वास्तविक उपयोग के लिए उपयुक्त नहीं):

 using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace SchedulerExample { class MyScheduler : IScheduler { private readonly ConsoleColor _colour; public MyScheduler(ConsoleColor colour) { _colour = colour; } public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { return Execute(state, action); } private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { var tmp = Console.ForegroundColor; Console.ForegroundColor = _colour; action(this, state); Console.ForegroundColor = tmp; return Disposable.Empty; } public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public DateTimeOffset Now { get { return DateTime.UtcNow; } } } } 

मैं अक्सर गलती करता .SubcribeOn को थ्रेड सेट करने के लिए उपयोग किया जाता है जहां कोड अंदर .Subscribe है। .Subscribe को निष्पादित किया जा रहा है। लेकिन याद रखना, बस लगता है कि प्रकाशित और सब्सक्राइब को यिन-यांग जैसी जोड़ी होना चाहिए। सेट करने के लिए जहां ObserveOn Subscribe's code को निष्पादित किया जा रहा है का उपयोग करें ObserveOn सेट करने के लिए जहां Observable's code निष्पादित किया गया था या संक्षेप में मंत्र: where-what , Subscribe-Observe , Observe-Subscribe Subscribe-Observe , Observe-Subscribe