demo地址: "BulkAll" 批量導入 實現目標:想要使用ElasticSearch的 .Net Api客戶端NEST批量導入數據,併發非同步高效的批量導入 NEST提供了BulkAll 不廢話,上代碼 如果想要對處理導入過程進行監控可以這麼替換 還可以使用C 的local function特性 ...
demo地址:BulkAll
批量導入
實現目標:想要使用ElasticSearch的 .Net Api客戶端NEST批量導入數據,併發非同步高效的批量導入
NEST提供了BulkAll
不廢話,上代碼
const int size = 1000;
var tokenSource = new CancellationTokenSource();
var observableBulk = elasticClient.BulkAll(list, f => f
.MaxDegreeOfParallelism(8)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(size)
.RefreshOnCompleted()
.Index(indexName)
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
, tokenSource.Token);
var countdownEvent = new CountdownEvent(1);
Exception exception = null;
var bulkAllObserver = new BulkAllObserver();
observableBulk.Subscribe(bulkAllObserver);
countdownEvent.Wait(tokenSource.Token);
如果想要對處理導入過程進行監控可以這麼替換BulkAllObserver
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
},
onError: ex =>
{
WriteLine("BulkAll Error : {0}", ex);
exception = ex;
countdownEvent.Signal();
},
() =>
{
WriteLine("BulkAll Finished");
countdownEvent.Signal();
});
還可以使用C#的local function特性,如下所示
void OnCompleted()
{
WriteLine("BulkAll Finished");
countdownEvent.Signal();
}
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
},
onError: ex =>
{
WriteLine("BulkAll Error : {0}", ex);
exception = ex;
countdownEvent.Signal();
},
OnCompleted);
完成demo,請點擊 BulkAll 查看