日韩欧美国产精品免费一二-日韩欧美国产精品亚洲二区-日韩欧美国产精品专区-日韩欧美国产另-日韩欧美国产免费看-日韩欧美国产免费看清风阁

LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

.NET 高性能緩沖隊列實現(xiàn) BufferQueue

freeflydom
2024年8月5日 9:33 本文熱度 1509

前言

BufferQueue 是一個用 .NET 編寫的高性能的緩沖隊列實現(xiàn),支持多線程并發(fā)操作。

項目地址:https://github.com/eventhorizon-cli/BufferQueue

項目是從 mocha 項目中獨立出來的一個組件,經(jīng)過修改以提供更通用的緩沖隊列功能。

目前支持的緩沖區(qū)類型為內(nèi)存緩沖區(qū),后續(xù)會考慮支持更多類型的緩沖區(qū)。

適用場景

生產(chǎn)者和消費者之間的速度不一致,需要并發(fā)批量處理數(shù)據(jù)的場景。

因為目前只有內(nèi)存版本,不適用于不允許數(shù)據(jù)丟失的業(yè)務(wù)場景。

功能說明

支持創(chuàng)建多個 Topic,每個 Topic 可以有多種數(shù)據(jù)類型。每一對 Topic 和數(shù)據(jù)類型對應(yīng)一個獨立的緩沖區(qū)。

 

支持創(chuàng)建多個 Consumer Group,每個 Consumer Group 的消費進(jìn)度都是獨立的。支持多個 Consumer Group 并發(fā)消費同一個 Topic。

支持同一個 Consumer Group 創(chuàng)建多個 Consumer,以負(fù)載均衡的方式消費數(shù)據(jù)。

支持?jǐn)?shù)據(jù)的批量消費,可以一次性獲取多條數(shù)據(jù)。

支持 pull 模式和 push 模式兩種消費模式。

pull 模式下和 push 模式下都支持 auto commit 和 manual commit 兩種提交方式。auto commit 模式下,消費者在收到數(shù)據(jù)后自動提交消費進(jìn)度,如果消費失敗不會重試。manual commit 模式下,消費者需要手動提交消費進(jìn)度,如果消費失敗只要不提交進(jìn)度就可以重試。

需要注意的是,當(dāng)前版本出于簡化實現(xiàn)的考慮,暫不支持消費者的動態(tài)擴(kuò)容和縮容,需要在創(chuàng)建消費者時指定消費者數(shù)量。

使用示例

安裝 Nuget 包:

dotnet add package BufferQueue

項目基于 Microsoft.Extensions.DependencyInjection,使用時需要先注冊服務(wù)。

BufferQueue 支持兩種消費模式:pull 模式和 push 模式。


builder.Services.AddBufferQueue(options =>

{

    options.UseMemory(bufferOptions =>

        {

            // 每一對 Topic 和數(shù)據(jù)類型對應(yīng)一個獨立的緩沖區(qū),可以設(shè)置 partitionNumber

            bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);

            bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);

            bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);

        })

        // 添加 push 模式的消費者

        // 掃描指定程序集中的標(biāo)記了 BufferPushCustomerAttribute 的類,

        // 注冊為 push 模式的消費者

        .AddPushCustomers(typeof(Program).Assembly);

});


// 在 HostedService 中使用 pull模式 消費數(shù)據(jù)

builder.Services.AddHostedService<Foo1PullConsumerHostService>();

pull 模式的消費者示例:

public class Foo1PullConsumerHostService(

    IBufferQueue bufferQueue,

    ILogger<Foo1PullConsumerHostService> logger) : IHostedService

{

    private readonly CancellationTokenSource _cancellationTokenSource = new();


    public Task StartAsync(CancellationToken cancellationToken)

    {

        var token = CancellationTokenSource

            .CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)

            .Token;


        var consumers = bufferQueue.CreatePullConsumers<Foo>(

            new BufferPullConsumerOptions

            {

                TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100,

            }, consumerNumber: 4);


        foreach (var consumer in consumers)

        {

            _ = ConsumeAsync(consumer, token);

        }


        return Task.CompletedTask;

    }


    public Task StopAsync(CancellationToken cancellationToken)

    {

        _cancellationTokenSource.Cancel();

        return Task.CompletedTask;

    }


    private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken)

    {

        await foreach (var buffer in consumer.ConsumeAsync(cancellationToken))

        {

            foreach (var foo in buffer)

            {

                // Process the foo

                logger.LogInformation("Foo1PullConsumerHostService.ConsumeAsync: {Foo}", foo);

            }

        }

    }

}

push 模式的消費者示例:

通過 BufferPushCustomer 特性注冊 push 模式的消費者。

push consumer 會被注冊到 DI 容器中,可以通過構(gòu)造函數(shù)注入其他服務(wù),可以通過設(shè)置 ServiceLifetime 來控制 consumer 的生命周期。

BufferPushCustomerAttribute 中的 concurrency 參數(shù)用于設(shè)置 push consumer 的消費并發(fā)數(shù),對應(yīng) pull consumer 的 consumerNumber。


[BufferPushCustomer(

    topicName: "topic-foo2",

    groupName: "group-foo2",

    batchSize: 100,

    serviceLifetime: ServiceLifetime.Singleton,

    concurrency: 2)]

public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo>

{

    public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken)

    {

        foreach (var foo in buffer)

        {

            logger.LogInformation("Foo2PushConsumer.ConsumeAsync: {Foo}", foo);

        }


        return Task.CompletedTask;

    }

}

[BufferPushCustomer(

    "topic-bar",

    "group-bar",

    100,

    ServiceLifetime.Scoped,

    2)]

public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar>

{

    public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer,

        CancellationToken cancellationToken)

    {

        foreach (var bar in buffer)

        {

            logger.LogInformation("BarPushConsumer.ConsumeAsync: {Bar}", bar);

        }


        var commitTask = committer.CommitAsync();

        if (!commitTask.IsCompletedSuccessfully)

        {

            await commitTask.AsTask();

        }

    }

}

Producer 示例:

通過 IBufferQueue 獲取到指定的 Producer,然后調(diào)用 ProduceAsync 方法發(fā)送數(shù)據(jù)。

[ApiController]

[Route("/api/[controller]")]

public class TestController(IBufferQueue bufferQueue) : ControllerBase

{

    [HttpPost("foo1")]

    public async Task<IActionResult> PostFoo1([FromBody] Foo foo)

    {

        var producer = bufferQueue.GetProducer<Foo>("topic-foo1");

        await producer.ProduceAsync(foo);

        return Ok();

    }


    [HttpPost("foo2")]

    public async Task<IActionResult> PostFoo2([FromBody] Foo foo)

    {

        var producer = bufferQueue.GetProducer<Foo>("topic-foo2");

        await producer.ProduceAsync(foo);

        return Ok();

    }


    [HttpPost("bar")]

    public async Task<IActionResult> PostBar([FromBody] Bar bar)

    {

        var producer = bufferQueue.GetProducer<Bar>("topic-bar");

        await producer.ProduceAsync(bar);

        return Ok();

    }

}

BufferQueue 內(nèi)部設(shè)計概述

Topic 的隔離

BufferQueue 有以下的特性:

同一個數(shù)據(jù)類型 下的 不同 Topic 的 BufferQueue 互不干擾。

同一個 Topic 下的 不同數(shù)據(jù)類型 的 BufferQueue 互不干擾。

 

這個特性是通過以下兩層接口設(shè)計實現(xiàn)的:

IBufferQueue:根據(jù) TopicName 和 類型參數(shù) T 將請求轉(zhuǎn)發(fā)給具體的 IBufferQueue<T> 實現(xiàn)(借助 KeyedService 實現(xiàn)),其中參數(shù) T 代表 Buffer 所承載的數(shù)據(jù)實體的類型。

IBufferQueue<T>:具體的 BufferQueue 實現(xiàn),負(fù)責(zé)管理 Topic 下的數(shù)據(jù)。屬于 Buffer 模塊的內(nèi)部實現(xiàn),不對外暴露。

 

Partition 的設(shè)計

為了保證消費速度,BufferQueue 將數(shù)據(jù)劃分為多個 Partition,每個 Partition 都是一個獨立的隊列,每個 Partition 都有一個對應(yīng)的消費者線程。

Producer 以輪詢的方式往每個 Partition 中寫入數(shù)據(jù)。

Consumer 最多不允許超過 Partition 的數(shù)量,Partition 按平均分配到組內(nèi)每個 Customer 上。

當(dāng)一個 Consumer 被分配了多個 Partition 時,以輪訓(xùn)的方式進(jìn)行消費。

每個 Partition 上會記錄不同消費組的消費進(jìn)度,不同組之間的消費進(jìn)度互不干擾。

 

對并發(fā)的支持

Producer 支持并發(fā)寫入。

Consumer 消費時是綁定 Partition 的,為保證能正確管理 Partition 的消費進(jìn)度,Consumer 不支持并發(fā)消費。

如果要增加消費速度,需創(chuàng)建多個 Consumer。

Partition 的動態(tài)擴(kuò)容

Partition 的基本組成單元是 Segment,Segment 代表保存數(shù)據(jù)的數(shù)組,多個 Segment 通過鏈表的形式組合成一個 Partition。

當(dāng)一個 Segment 寫滿后,通過在其后面追加一個 Segment 實現(xiàn)擴(kuò)容。

Segment 中用于保存數(shù)據(jù)的數(shù)組的每一個元素稱為 Slot,每個 Slot 都有一個Partition 內(nèi)唯一的自增 Offset。

 

Segment 的回收機(jī)制

每次在 Partition 中新增 Segment 時,會從頭判斷此前的 Segment 是否已經(jīng)被所有消費組消費完,回收最后一個消費完的 Segment 作為新的 Segment 追加到 Partition 末尾使用。

 

Benchmark

測試環(huán)境:Apple M2 Max 64GB

寫入性能測試

與 BlockingCollection 對比并發(fā),并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12, partitionNumber 為 1 和 12。

測試結(jié)果

 

在并發(fā)寫入時,BufferQueue 的寫入性能明顯優(yōu)于 BlockingCollection。

消費性能測試

pull 模式 consumer 與 BlockingCollection 對比并發(fā)讀取性能,并發(fā)線程數(shù)為 CPU 邏輯核心數(shù) 12,partitionNumber 為 12。

測試結(jié)果

 

在批量消費時,隨著批量大小的增加,BufferQueue 的消費性能優(yōu)勢更加明顯。


轉(zhuǎn)自https://www.cnblogs.com/eventhorizon/p/18331018


該文章在 2024/8/5 9:51:53 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點晴ERP是一款針對中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點晴PMS碼頭管理系統(tǒng)主要針對港口碼頭集裝箱與散貨日常運作、調(diào)度、堆場、車隊、財務(wù)費用、相關(guān)報表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點,圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點晴WMS倉儲管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務(wù)都免費,不限功能、不限時間、不限用戶的免費OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved

主站蜘蛛池模板: 亚洲综合色一区二区三区 | 欧美一区二区成人精品视频 | 国产欧美一二三区 | 91成人国产网 | 大地影院高清mv在线观看 | 国产欧美一区二区精品婷婷 | 国产在线精品一区二区不卡 | 青草青草久| 国产91网站在线观看免费 | 免费精品99久 | 亚洲视频在线观看免费 | 国产中文9 | 国产亚洲老 | 中文字字幕在线精品乱码高清 | 中文字幕在线观看不卡 | 艾栗栗国产精品视频一区 | 国产乱人视频在 | 欧美日韩另类视频在线观看 | 欧美亚洲综合成人a∨在线 亚洲国产中文字幕在线观看 | 亚洲裸男gv网站 | 欧美色欧美亚洲高清在线观看 | 亚洲人成激情在线播放 | 亚洲欧美日韩中文国产不卡 | 国产国产乱片在线播放 | 国产日韩欧美在线 | 亚洲欧美日韩污在线观看 | 国产午夜福利不卡在线观看 | 午夜视频精品视在线播放 | 国产精品老女人精品视频 | 日韩高清在线观看 | 国产真实乱子 | 乱子伦精品视频 | 亚洲国产精品va在线播放 | 亚洲一区日韩高清中文字幕亚洲 | 国产乱码一二 | 综合乱伦国产中文 | 久99精| 欧美日韩国产在 | 国产一区二区三区在线免费 | 豆奶视频官网下载 | 精品欧乱仑在线 |