談反應式編程在服務端中的應用,數據庫操作優化,萬條記錄從20秒到0.5秒_網頁設計公司

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

節能減碳愛地球是景泰電動車的理念,是創立景泰電動車行的初衷,滿意態度更是服務客戶的最高品質,我們的成長來自於你的推薦。

反應式編程在客戶端編程當中的應用相當廣泛,而當前在服務端中的應用相對被提及較少。本篇將介紹如何在服務端編程中應用響應時編程來改進數據庫操作的性能。

開篇就是結論

利用 System.Reactive 配合 TaskCompelteSource ,可以將分散的單次數據庫插入請求合併會一個批量插入的請求。在確保正確性的前提下,實現數據庫插入性能的優化。

如果讀者已經了解了如何操作,那麼剩下的內容就不需要再看了。

預設條件

現在,我們假設存在這樣一個 Repository 接口來表示一次數據庫的插入操作。

  csharp

namespace Newbe.RxWorld.DatabaseRepository { public interface IDatabaseRepository { /// <summary> /// Insert one item and return total count of data in database /// </summary> /// <param name="item"></param> /// <returns></returns> Task<int> InsertData(int item); } }

接下來,我們在不改變該接口簽名的前提下,體驗一下不同的實現帶來的性能區別。

基礎版本

首先是基礎版本,採用的是最為常規的單次數據庫INSERT操作來完成數據的插入。本示例採用的是SQLite作為演示數據庫,方便讀者自行實驗。

  csharp

namespace Newbe.RxWorld.DatabaseRepository.Impl { public class NormalDatabaseRepository : IDatabaseRepository { private readonly IDatabase _database; public NormalDatabaseRepository( IDatabase database) { _database = database; } public Task<int> InsertData(int item) { return _database.InsertOne(item); } } }

常規操作。其中_database.InsertOne(item)的具體實現就是調用了一次INSERT

基礎版本在同時插入小於20次時基本上可以較快的完成。但是如果數量級增加,例如需要同時插入一萬條數據庫,將會花費約20秒鐘,存在很大的優化空間。

TaskCompelteSource

TaskCompelteSource 是 TPL 庫中一個可以生成一個可操作 Task 的類型。對於 TaskCompelteSource 不太熟悉的讀者可以通過該實例代碼了解。

此處也簡單解釋一下該對象的作用,以便讀者可以繼續閱讀。

對於熟悉 javascript 的朋友,可以認為 TaskCompelteSource 相當於 Promise 對象。也可以相當於 jQuery 當中的 $.Deferred 。

如果都不了解的朋友,可以聽一下筆者吃麻辣燙時想到的生活化例子。

吃麻辣燙 技術解釋
吃麻辣燙之前,需要先用盤子夾菜。 構造參數
夾好菜之後,拿到結賬處去結賬 調用方法
收銀員結賬完畢之後,會得到一個叫餐牌,會響鈴的那種 得到一個 Task 返回值
拿着菜牌找了一個位子坐下,玩手機等餐 正在 await 這個 Task ,CPU轉而處理其他事情
餐牌響了,去取餐,吃起來 Task 完成,await 節數,繼續執行下一行代碼

那麼 TaskCompelteSource 在哪兒呢?

首先,根據上面的例子,在餐牌響的時候,我們才會去取餐。那麼餐牌什麼時候才會響呢?當然是服務員手動按了一個在櫃檯的手動開關才觸發了這個響鈴。

那麼,櫃檯的這個開關,可以被技術解釋為 TaskCompelteSource 。

餐台開關可以控制餐牌的響鈴。同樣, TaskCompelteSource 就是一種可以控制 Task 的狀態的對象。

解決思路

有了前面對 TaskCompelteSource 的了解,那麼接下來就可以解決文章開頭的問題了。思路如下:

當調用 InsertData 時,可以創建一個 TaskCompelteSource 以及 item 的元組。為了方便說明,我們將這個元組命名為BatchItem

將 BatchItem 的 TaskCompelteSource 對應的 Task 返回出去。

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

搬家費用:依消費者運送距離、搬運樓層、有無電梯、步行距離、特殊地形、超重物品等計價因素後,評估每車次單

調用 InsertData 的代碼會 await 返回的 Task,因此只要不操作 TaskCompelteSource ,調用者就一會一直等待。

然後,另外啟動一個線程,定時將 BatchItem 隊列消費掉。

這樣就完成了單次插入變為批量插入的操作。

筆者可能解釋的不太清楚,不過以下所有版本的代碼均基於以上思路。讀者可以結合文字和代碼進行理解。

ConcurrentQueue 版本

基於以上的思路,我們採用 ConcurrentQueue 作為 BatchItem 隊列進行實現,代碼如下(代碼很多,不必糾結,因為下面還有更簡單的):

  csharp

namespace Newbe.RxWorld.DatabaseRepository.Impl { public class ConcurrentQueueDatabaseRepository : IDatabaseRepository { private readonly ITestOutputHelper _testOutputHelper; private readonly IDatabase _database; private readonly ConcurrentQueue<BatchItem> _queue; // ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable private readonly Task _batchInsertDataTask; public ConcurrentQueueDatabaseRepository( ITestOutputHelper testOutputHelper, IDatabase database) { _testOutputHelper = testOutputHelper; _database = database; _queue = new ConcurrentQueue<BatchItem>(); // 啟動一個 Task 消費隊列中的 BatchItem _batchInsertDataTask = Task.Factory.StartNew(RunBatchInsert, TaskCreationOptions.LongRunning); _batchInsertDataTask.ConfigureAwait(false); } public Task<int> InsertData(int item) { // 生成 BatchItem ,將對象放入隊列。返回 Task 出去 var taskCompletionSource = new TaskCompletionSource<int>(); _queue.Enqueue(new BatchItem { Item = item, TaskCompletionSource = taskCompletionSource }); return taskCompletionSource.Task; } // 從隊列中不斷獲取 BatchItem ,並且一批一批插入數據庫,更新 TaskCompletionSource 的狀態 private void RunBatchInsert() { foreach (var batchItems in GetBatches()) { try { BatchInsertData(batchItems).Wait(); } catch (Exception e) { _testOutputHelper.WriteLine($"there is an error : {e}"); } } IEnumerable<IList<BatchItem>> GetBatches() { var sleepTime = TimeSpan.FromMilliseconds(50); while (true) { const int maxCount = 100; var oneBatchItems = GetWaitingItems() .Take(maxCount) .ToList(); if (oneBatchItems.Any()) { yield return oneBatchItems; } else { Thread.Sleep(sleepTime); } } IEnumerable<BatchItem> GetWaitingItems() { while (_queue.TryDequeue(out var item)) { yield return item; } } } } private async Task BatchInsertData(IEnumerable<BatchItem> items) { var batchItems = items as BatchItem[] ?? items.ToArray(); try { // 調用數據庫的批量插入操作 var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item)); foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetResult(totalCount); } } catch (Exception e) { foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetException(e); } throw; } } private struct BatchItem { public TaskCompletionSource<int> TaskCompletionSource { get; set; } public int Item { get; set; } } } }

以上代碼中使用了較多的 Local Function 和 IEnumerable 的特性,不了解的讀者可以點擊此處進行了解。

正片開始!

接下來我們使用 System.Reactive 來改造上面較為複雜的 ConcurrentQueue 版本。如下:

  csharp

namespace Newbe.RxWorld.DatabaseRepository.Impl { public class AutoBatchDatabaseRepository : IDatabaseRepository { private readonly ITestOutputHelper _testOutputHelper; private readonly IDatabase _database; private readonly Subject<BatchItem> _subject; public AutoBatchDatabaseRepository( ITestOutputHelper testOutputHelper, IDatabase database) { _testOutputHelper = testOutputHelper; _database = database; _subject = new Subject<BatchItem>(); // 將請求進行分組,每50毫秒一組或者每100個一組 _subject.Buffer(TimeSpan.FromMilliseconds(50), 100) .Where(x => x.Count > 0) // 將每組數據調用批量插入,寫入數據庫 .Select(list => Observable.FromAsync(() => BatchInsertData(list))) .Concat() .Subscribe(); } // 這裏和前面對比沒有變化 public Task<int> InsertData(int item) { var taskCompletionSource = new TaskCompletionSource<int>(); _subject.OnNext(new BatchItem { Item = item, TaskCompletionSource = taskCompletionSource }); return taskCompletionSource.Task; } // 這段和前面也完全一樣,沒有變化 private async Task BatchInsertData(IEnumerable<BatchItem> items) { var batchItems = items as BatchItem[] ?? items.ToArray(); try { var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item)); foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetResult(totalCount); } } catch (Exception e) { foreach (var batchItem in batchItems) { batchItem.TaskCompletionSource.SetException(e); } throw; } } private struct BatchItem { public TaskCompletionSource<int> TaskCompletionSource { get; set; } public int Item { get; set; } } } }

代碼減少了 50 行,主要原因就是使用 System.Reactive 中提供的很強力的 Buffer 方法實現了 ConcurrentQueue 版本中的複雜的邏輯實現。

老師,可以更給力一點嗎?

我們,可以“稍微”優化一下代碼,將 Buffer 以及相關的邏輯獨立於“數據庫插入”這個業務邏輯。那麼我們就會得到一個更加簡單的版本:

  csharp

namespace Newbe.RxWorld.DatabaseRepository.Impl { public class FinalDatabaseRepository : IDatabaseRepository { private readonly IBatchOperator<int, int> _batchOperator; public FinalDatabaseRepository( IDatabase database) { var options = new BatchOperatorOptions<int, int> { BufferTime = TimeSpan.FromMilliseconds(50), BufferCount = 100, DoManyFunc = database.InsertMany, }; _batchOperator = new BatchOperator<int, int>(options); } public Task<int> InsertData(int item) { return _batchOperator.CreateTask(item); } } }

其中 IBatchOperator 等代碼,讀者可以到代碼庫中進行查看,此處就不在陳列了。

性能測試

基本可以測定如下:

在 10 條數據併發操作時,原始版本和批量版本沒有多大區別。甚至批量版本在數量少時會更慢,畢竟其中存在一個最大 50 毫秒的等待時間。

但是,如果需要批量操作併發操作一萬條數據,那麼原始版本可能需要消耗20秒,而批量版本僅僅只需要0.5秒。

所有的示例代碼均可以在代碼庫中找到。如果 Github Clone 存在困難,也可以點擊此處從 Gitee 進行 Clone

最後但是最重要!

最近作者正在構建以反應式Actor模式事件溯源為理論基礎的一套服務端開發框架。希望為開發者提供能夠便於開發出“分佈式”、“可水平擴展”、“可測試性高”的應用系統——Newbe.Claptrap

本篇文章是該框架的一篇技術選文,屬於技術構成的一部分。如果讀者對該內容感興趣,歡迎轉發、評論、收藏文章以及項目。您的支持是促進項目成功的關鍵。

當前項目已經快要發布 0.1 alpha 版本,歡迎參与討論。

GitHub 項目地址:https://github.com/newbe36524/Newbe.Claptrap

Gitee 項目地址:https://gitee.com/yks/Newbe.Claptrap

文章作者: newbe36524
本文章著作權歸作者所有,任何形式的轉載都請註明出處。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

透過選單樣式的調整、圖片的縮放比例、文字的放大及段落的排版對應來給使用者最佳的瀏覽體驗,所以不用擔心有手機版網站兩個後台的問題,而視覺效果也是透過我們前端設計師優秀的空間比例設計,不會因為畫面變大變小而影響到整體視覺的美感。