1.背景 由於歷史原因,筆者所在的公司原有的ES查詢驅動採用的是 PlainElastic.Net, 經過詢問原來是之前PlainElastic.Net在園子里文檔較多,上手比較容易,所以最初作者選用了該驅動,而發佈也由於歷史原因都部署在 windows 伺服器上,基於 .NET Framework ...
1.背景
由於歷史原因,筆者所在的公司原有的ES查詢驅動採用的是 PlainElastic.Net, 經過詢問原來是之前PlainElastic.Net在園子里文檔較多,上手比較容易,所以最初作者選用了該驅動,而發佈也由於歷史原因都部署在 windows 伺服器上,基於 .NET Framework開發。
後來由於遷移 .NET CORE 平臺的需要,對代碼進行了升級,同時部署平臺也遷移至 CentOS7 伺服器,升級過程比較順利,由於沒有使用特殊API,所以幾乎沒有對業務代碼做更多的修改,同時測試階段由於沒有多餘的機器,仍然放在了原有Windows伺服器上做的測試,一切都沒有問題,完美上線。
事發突然,某天接到運維部門反饋,部署查詢服務的機器突然出現 TCP 連接數超高的問題,同時這台機器其他的TCP服務也無法建立新的連接,但已經建立的連接不受影響。聯想到 ElasticSearch 查詢服務是基於HTTP 請求的,腦子裡馬上聯想到 .NET Core 下 HttpClient 如果每次訪問都創建新實例,則會每次都建立新的TCP連接,而 Linux 對已釋放埠回收的時間視窗,會導致在高併發情況下,客戶端機器埠占用持續增加,同時被調用服務端連接數也會持續增加。
基於此猜測,立馬去扒了一下PlainElastic.Net源代碼:
源碼地址:https://github.com/Yegoroff/PlainElastic.Net/blob/master/src/PlainElastic.Net/Connection/ElasticConnection.cs
果然如猜測的那樣,每次都創建了新的 HttpWebRequest 實例,看了作者的最後維護時間也已經是3年前了,可能是後來官方驅動日趨完善,作者也便停止了維護。
既然如此,那麼讓我們看下官方最新驅動源碼是否如我們想象,是基於HttpClientFactory來解決這個問題的?
上述代碼看來,官方驅動並非是採用微軟官方建議的 HttpClientFactory ,而是官方底層自己維護的一個線程安全的字典來管理 HttpClient 實例池,雖是自己實現,但效果一樣:相同地址的請求,是鏈接復用的,這樣就解決不斷開啟 TCP 連接的問題。
問題找到,立馬進行驅動升級:
2.驅動升級
說明: ElasticSearch.Net官方驅動地址:https://www.elastic.co/guide/en/elasticsearch/client/net-api/6.x/index.html
官方驅動分為 Low Level Client 和 NEST(Heigh Level Client),其中Low Level Client 僅僅做了最基本的封裝,幾乎等價於HTTP原生調用,帶來了極大的靈活性的同時,也帶來使用成本,而對於開發人員來說使用 NEST 提供的更加高級的API,可以更加快速的進行開發工作,也同時可以利用到 .NET 所提供的各種語法糖,比如 => 表達式。
話不多說,看示例代碼:
實例創建
public ElasticService()
{
var uris = new Uri[] { new Uri("http://172.17.78.111:9200"), new Uri("http://172.17.78.112:9200") }; //支持多個節點
var connectionPool = new SniffingConnectionPool(uris);
var settings = new ConnectionSettings(connectionPool).DefaultIndex("testindex");//註意index不可以大寫
settings.BasicAuthentication("", ""); //設置賬號密碼,沒有可以跳過
this._client = new ElasticClient(settings);
}
插入待測試數據
public class People
{
public Guid Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public DateTime Birthday { get; set; }
public bool Gender { get; set; }
public string Address { get; set; }
public DateTime CreateTime { get; set; } = DateTime.Now;
}
//批量插入
public async Task<IBulkResponse> AddPeopleAsync(People[] peoples)
{
var descriptor = new BulkDescriptor();
foreach (var p in peoples)
{
var response = await _client.IndexDocumentAsync(p);
descriptor.Index<People>(op => op.Document(p));
}
return await _client.BulkAsync(descriptor);//批量插入
}
多查詢條件拼接
public QueryContainer BuildQueryContainer(SearchCondition condition)
{
var queryCombin = new List<Func<QueryContainerDescriptor<People>, QueryContainer>>();
if (!string.IsNullOrEmpty(condition.Name))
queryCombin.Add(mt => mt.Match(m => m.Field(t => t.Name).Query(condition.Name))); //字元串匹配
if (condition.Age.HasValue)
queryCombin.Add(mt => mt.Range(m => m.Field(t => t.Address).GreaterThanOrEquals(condition.Age))); //數值區間匹配
if (!string.IsNullOrEmpty(condition.Address))
queryCombin.Add(mt => mt.MatchPhrase(m => m.Field(t => t.Address).Query(condition.Address))); //短語匹配
if (!condition.Gender.HasValue)
queryCombin.Add(mt => mt.Term(m => m.Field(t => t.Gender).Value(condition.Gender)));//精確匹配
return Query<People>.Bool(b => b
.Must(queryCombin)
.Filter(f => f
.DateRange(dr => dr.Field(t => t.CreateTime) //時間範圍匹配
.GreaterThanOrEquals(DateMath.Anchored(condition.BeginCreateTime.ToString("yyyy-MM-ddTHH:mm:ss")))
.LessThanOrEquals(DateMath.Anchored(condition.EndCreateTime.ToString("yyyy-MM-ddTHH:mm:ss"))))));
}
提示:Match 和 MatchPhrase 的區別,例如對於"長寧區"
- Match 會將"長寧區"進行分詞匹配,例如只要包含"區"的數據(比如靜安區),也會被查詢命中
- MatchPhrase 則可以理解為短語匹配,只有當數據包含“長寧區”完整短語的數據,才會被查詢命中
增加分頁查詢介面
public async Task<PagedResult<People[]>> QueryPeopleAsync(SearchCondition condition, int pageIndex, int pageSize)
{
var query = this.BuildQueryContainer(condition);
var response = await this._client.SearchAsync<People>(s => s
.Index("testindex")
.From(pageIndex * pageSize)
.Size(pageSize)
.Query(q => query)
.Sort(st => st.Descending(d => d.CreateTime)));
if (response.ApiCall.Success)
{
return new PagedResult<People[]>
{
PageIndex = pageIndex,
PageSize = pageSize,
Total = response.Total,
ReturnObj = response.Hits.Select(s => s.Source).ToArray()
};
}
return new PagedResult<People[]> { IsSuccess = false };
}
編寫單元測試
[TestMethod]
public async Task QueryPeopleTest()
{
var condition = new SearchCondition
{
Address="長寧區",
BeginCreateTime = DateTime.Now.AddDays(-1),
EndCreateTime = DateTime.Now
};
var result = await this._elasticService.QueryPeopleAsync(condition, 0, 3);
Assert.IsTrue(result.IsSuccess);
}
利用 Wireshark 抓包分析HTTP調用細節
將抓包的數據轉換為HTTP流,查看請求細節:
提示:通過wireshark抓包是排查錯誤很有效的方式,有時候通過查詢文檔進行分析,還不如先抓包查看請求數據來得直接,同時可以將抓包數據放在Kabana所提供的 Dev Tools中驗證自己的想法。
利用 Kibana 提供的 Dev Tools 驗證/測試 查詢條件
3.總結
從.NET Framework 平臺轉向 .Net Core 平臺,其實不僅僅是開發框架的升級,或者從 Windows 轉向 Linux 的遷移,而是需要我們有更多的開源思維,即:
- 由於會使用到更多的三方組件,開發人員需要更多關註社區的變化
- 開源代碼,意味著開發人員可以並且需要更多關註源代碼的底層實現
本文示例代碼地址:https://github.com/xBoo/articles/tree/master/src/ElasticSearchNetDemo