反應式編程是一種編程范式,用于處理數(shù)據(jù)流和變化的傳播。這意味著當一個組件發(fā)出數(shù)據(jù)流時,更改將通過響應式編程庫傳播到其他組件。變化的傳播將持續(xù)到最終接收器。事件驅動和反應式編程之間的區(qū)別在于事件驅動的編程圍繞事件而反應式編程圍繞數(shù)據(jù)。
reactivex或rx用于反應式編程
reactivex或raective extension是最著名的反應式編程實現(xiàn)。reactivex的工作取決于以下兩個類 -
可觀察的類
此類是數(shù)據(jù)流或事件的來源,它打包傳入的數(shù)據(jù),以便數(shù)據(jù)可以從一個線程傳遞到另一個線程。在某些觀察者訂閱數(shù)據(jù)之前,它不會提供數(shù)據(jù)。
觀察者
此類使用 observable 發(fā)出的數(shù)據(jù)流??梢杂卸鄠€具有可觀察性的觀察者,每個觀察者將接收發(fā)射的每個數(shù)據(jù)項。觀察者可以通過訂閱觀察者來接收三種類型的事件
- on_next()事件 - 它意味著數(shù)據(jù)流中有一個元素。
- on_completed()事件 - 它意味著排放結束,不再有物品到來。
- on_error()事件 - 它還意味著發(fā)射結束,但是在 observable 拋出錯誤的情況下。
rxpy - 用于反應式編程的python模塊
rxpy是一個python模塊,可用于反應式編程。我們需要確保安裝該模塊。以下命令可用于安裝rxpy模塊
pip install rxpy
例
以下是一個python腳本,它使用 rxpy 模塊及其 observable 和 observe 類 進行反應式編程?;旧嫌袃蓚€類
- get_strings() - 用于從觀察者獲取字符串。
- printobserver() - 用于從觀察者打印字符串。 它使用觀察者類的所有三個事件。它還使用了subscribe()類。
from rx import observable, observer def get_strings(observer): observer.on_next("ram") observer.on_next("mohan") observer.on_next("shyam") observer.on_completed() class printobserver(observer): def on_next(self, value): print("received {0}".format(value)) def on_completed(self): print("finished") def on_error(self, error): print("error: {0}".format(error)) source = observable.create(get_strings) source.subscribe(printobserver())
輸出
received ram received mohan received shyam finished
用于反應式編程的pyfunctional庫
pyfunctional 是另一個可用于反應式編程的python庫。它使我們能夠使用python編程語言創(chuàng)建功能程序。它很有用,因為它允許我們使用鏈式函數(shù)運算符創(chuàng)建數(shù)據(jù)管道。
rxpy和pyfunctional之間的差異
這兩個庫都用于反應式編程并以類似的方式處理流,但它們之間的主要區(qū)別取決于數(shù)據(jù)的處理。 rxpy 處理系統(tǒng)中的數(shù)據(jù)和事件,而 pyfunctional 則專注于使用函數(shù)式編程范例轉換數(shù)據(jù)。
安裝pyfunctional模塊
我們需要在使用之前安裝此模塊。它可以在pip命令的幫助下安裝如下
pip install pyfunctional
例
下面的示例使用 pyfunctional 模塊及其 seq 類,它們充當我們可以迭代和操作的流對象。在這個程序中,它使用將每個值加倍的lamda函數(shù)映射序列,然后過濾x大于4的值,最后將序列縮減為所有剩余值的總和。
from functional import seq result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y) print ("result: {}".format(result))
輸出
result: 6