前言 當別人做大數據用Java、Python的時候,我使用.NET做大數據、數據挖掘,這確實是值得一說的事。 寫的並不全面,但都是實際工作中的內容。 .NET在大數據項目中,可以做什麼? 寫腳本(使用控制台程式+頂級語句) 寫工具(使用Winform) 寫介面、寫服務 使用C#寫代碼的優點是什麼? ...
前言
當別人做大數據用Java、Python的時候,我使用.NET做大數據、數據挖掘,這確實是值得一說的事。
寫的並不全面,但都是實際工作中的內容。
.NET在大數據項目中,可以做什麼?
- 寫腳本(使用控制台程式+頂級語句)
- 寫工具(使用Winform)
- 寫介面、寫服務
使用C#寫代碼的優點是什麼?
- 靜態類型+匿名類型,一次性使用的實體類就用匿名類型,多次或多個地方使用的實體類就用靜態類型,靜態類型優於Python,匿名類型優於Java。你是不是想說Python也有靜態類型?你倒是寫啊?!
- 代碼的可維護性好,這是相對於Python說的,不一定是語言的鍋,還有固有的代碼組織習慣,靜態類型本身就是很好的註釋
- 性能好,非同步併發的代碼易編寫。
想起來一個事,就是前同事用Python2做數據挖掘,先用的es,性能差,改用的clickhouse,我就納悶,es性能差?現在我想我明白了,我看了其中一個挖掘演算法,它需要在雙層迴圈遍歷中去請求es進行查詢,它沒有使用非同步,也沒有使用多線程,那不就是一個線程在查詢嗎?我們現網es伺服器配置這麼強這麼多,它居然用一個線程去同步請求,能快才怪!實際上一個請求耗時極短,因為es有各種緩存,而查詢條件精確可以命中緩存,所以可以併發請求多個es節點。
那前同事為什麼沒有使用非同步併發或多線程呢?Python2不支持嗎?或者Python2支持,但寫起來不方便?或者前同事不會寫?(原因:寫起來不方便,C#一樣也不太方便,而且會使整個程式的併發請求量變得難以控制,可以針對某個介面單獨優化,但所有介面都這樣寫,也挺麻煩的)
使用.NET開發的優點是什麼?
其中一個優點是應用程式類型豐富,目前我用到的應用程式類型有:
- 控制台
- Winform
- Web API
- Blazor
你是不是想說Java和Python也可以寫控制台、窗體程式、Web API?一個熟悉Ptyhon的程式員,可不一定會寫窗體程式,需要一點時間學習,一個做了幾年.NET的程式員天然會寫Winform,就是拖控制項啊。當然,也可能他們不用Windows。
每一種應用程式類型,都意味著學習成本,而這些我已經會了,時間就省下了(Blazor一開始不會,學習花了一兩天)。
.NET與ClickHouse
我寫了一個大雜燴腳本項目,裡面有很多工程是查詢ClickHouse統計分析,代碼流程就是讀取Excel數據作為查詢輸入條件,查詢ClickHouse統計分析,統計結果導出到Excel。一個統計分析工作任務小半天就完成了。
用的ORM是我自己寫的Dapper.LiteSql。沒什麼人用,可能是功能不強吧。不過很適合我自己的需求,我自己經常用。
比如:
int count = session.CreateSql<XXX>(@"
select count(distinct t.xxx, t.xxx, t.xxx) as cnt
from xxx t
")
.Where(t => t.PassTime >= startTime && t.PassTime <= endTime)
.Where("t.Name in (" + kkNames + ")")
.QuerySingle<int>();
再比如:
var query = session.CreateSql<XXX>(@"
select t.xxx, t.xxx, t.xxx
from xxx t
")
.Where(t => t.PassTime >= firstTime && t.PassTime <= firstTime.AddDays(7).AddSeconds(-1));
query.Where(t => plateList.Skip((page - 1) * pageSize).Take(pageSize).ToList().Contains(t.PlateNo));
var temp = query.ToList();
對於統計查詢,我經常SQL和Lambda表達式混寫,感覺這樣非常靈活。
某些情況下,混寫比純Lambda寫法,是要清晰的:
List<XXX> list = session.CreateSql<XXX>(@"
select xxx, xxx as xxx, max(xxx) as xxx
from (
select xxx, toDate(xxx) as xxx, xxx, count(*) as xxx
from (
select distinct t.xxx, t.xxx, t.xxx
from xxx t
").Where(t => t.Xxx != "xxx")
.Where(t => t.XxxTime >= startTime && t.XxxTime <= endTime)
.Where(t => xxxList.Contains(t.Xxx))
.Where(@"(
(formatDateTime(t.xxx_time ,'%H:%M:%S') >= '07:00:00' and formatDateTime(t.xxx_time ,'%H:%M:%S') <= '08:59:59') or
(formatDateTime(t.xxx_time ,'%H:%M:%S') >= '14:00:00' and formatDateTime(t.xxx_time ,'%H:%M:%S') <= '20:59:59')
)")
.Append(@")")
.GroupBy("xxx, xxx, xxx")
.Append(@")
group by xxx, xxx
")
.QueryList<XXX>();
上述代碼說明:
- group by寫了兩種寫法比較隨意
- 三層select嵌套,當然主流ORM都能實現,但不一定易編寫、易閱讀
- 我不用針對ClickHouse去實現formatDateTime,也不用實現toDate、max、distinct、count,也不用糾結是count(*)還是count(1),只要實現的功能足夠少,BUG就少。
.NET與ElasticSearch
本打算使用Elasticsearch.Net,為什麼沒有使用?
- 學習成本,項目中沒有學習時間,雖然造測試數據是本職工作,但寫小工具不是本職工作不能耽誤太多時間,所以沒有學習時間
- 我使用HttpClient查詢es,這種查詢es的方式和kibana中寫的查詢語句、以及前同事留下的創建索引的文檔、模板最接近,方便抄現成的。下麵是一個完整的查詢es方法:
public async Task<TicketAgg> QueryAgg(string strStartTime, string strEndTime, string idCard)
{
Stopwatch sw = Stopwatch.StartNew();
string esUrl = $"http://{esIPs[_rnd.Next(0, esIPs.Length)]}:24100/out_xxx/_search";
var esQueryBody = new
{
size = 0,
query = new
{
@bool = new
{
must = new dynamic[]
{
new
{
range = new
{
travel_time = new
{
gte = strStartTime,
lte = strEndTime,
format = "yyyyMMddHHmmss"
}
}
},
new
{
match_phrase = new
{
zjhm = idCard
}
}
}
}
},
aggs = new
{
countByZjhm = new
{
terms = new
{
field = "zjhm",
size = 10000
}
}
}
};
string esPostData = JsonConvert.SerializeObject(esQueryBody);
Console.WriteLine($"ES請求URL:{esUrl}");
Console.WriteLine($"ES請求參數:{esPostData}");
HttpClient httpClient = HttpClientFactory.GetClient();
HttpContent content = new StringContent(esPostData);
content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
string strEsResult = await (await httpClient.PostAsync(esUrl, content)).Content.ReadAsStringAsync();
var resultObj = new
{
took = 0,
aggregations = new
{
countByZjhm = new
{
buckets = new[]
{
new
{
key = "",
doc_count = 0
}
}
}
}
};
var esResult = JsonConvert.DeserializeAnonymousType(strEsResult, resultObj);
TicketAgg agg = new TicketAgg();
agg.IdCard = idCard;
agg.Count = esResult.aggregations.countByZjhm.buckets[0].doc_count;
sw.Stop();
Console.WriteLine($"統計數據,耗時:{sw.Elapsed.TotalSeconds.ToString("0.000")} 秒");
return agg;
}
代碼中esQueryBody和resultObj都是一次性使用的,直接用匿名動態類型,而TicketAgg是需要實例化作為返回值給其它方法使用的,所以定義成靜態類型。
評論區有人問可選條件怎麼寫,代碼如下:
string strStartTime = DateTime.Now.AddDays(-7).ToString("yyyyMMddHHmmss");
string strEndTime = DateTime.Now.ToString("yyyyMMddHHmmss");
string idCard = "33";
var esQueryBody = new
{
size = 10000,
query = new
{
@bool = new
{
must = new List<dynamic>
{
new
{
range = new
{
travel_time = new
{
gte = strStartTime,
lte = strEndTime,
format="yyyyMMddHHmmss"
}
}
}
}
}
}
};
if (idCard != null)
{
[email protected](new
{
match_phrase = new
{
zjhm = idCard
}
});
}
string esPostData = JsonConvert.SerializeObject(esQueryBody);
上述代碼說明:
- must原來是dynamic[],它的長度是不可變的,不方便追加,所以修改成List
,就可以動態追加了。 - 寫這段代碼,我沒有百度,沒有找文檔,花了幾分鐘試出來的。優秀的語法可以讓使用者舉一反三。
下麵一段代碼,生產測試數據用的:
public async Task MockXxxData(string indexName, int count, DateTime startDate, DateTime endDate, string[] departures, string[] destinations, dynamic peoples)
{
int days = (int)endDate.Subtract(startDate).TotalDays;
List<Task> taskList = new List<Task>();
for (int i = 0; i < count; i++)
{
DateTime date = startDate.AddDays(_rnd.Next(0, days + 1));
long time = (long)(_rnd.NextDouble() * 3600 * 24);
var people = peoples[_rnd.Next(0, peoples.Length)];
var esRequestBody = new
{
xxx_type = _rnd.Next(1, 4).ToString(),
zjlx = "xxx",
zjhm = people.zjhm,
xm = people.xm,
departure = departures[_rnd.Next(0, departures.Length)],
destination = destinations[_rnd.Next(0, destinations.Length)],
xxx_date = date.ToString("yyyyMMdd"),
xxx_time = date.AddSeconds(time).ToString("yyyyMMddHHmmss"),
xxx_time = date.AddSeconds(time).AddHours(0.5 + _rnd.NextDouble()).ToString("yyyyMMddHHmmss"),
xxx_time = date.AddSeconds(time).AddDays(-2 + _rnd.NextDouble()).ToString("yyyyMMddHHmmss"),
xxx = "",
xxx = ""
};
var task = ServiceFactory.Get<EsWriteService>().Write(indexName, esRequestBody);
taskList.Add(task);
}
await Task.WhenAll(taskList);
}
上述代碼說明:
- 程式跑起來生產數據,一般會有幾十個線程,也就是請求es的併發量是幾十
- 如果你覺得幾十的併發量,還是有點高,可以在調用的Write非同步方法中使用Semaphore類限制一下併發量,代碼如下:
private Semaphore _sem = new Semaphore(20, 20); //限制非同步請求的併發數量
public async Task<bool> Write(string indexName, dynamic esRequestBody)
{
_sem.WaitOne();
try
{
Stopwatch sw = new Stopwatch();
sw.Start();
indexName = $"{indexName}-{DateTime.Now.Year}-{DateTime.Now.Month:00}";
string esUrl = $"http://{esIPs[_rnd.Next(0, esIPs.Length)]}:24100/{indexName}/doc";
string esRequestData = JsonConvert.SerializeObject(esRequestBody);
HttpContent content = new StringContent(esRequestData);
content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
HttpClient httpClient = HttpClientFactory.GetClient();
string strEsResult = await (await httpClient.PostAsync(esUrl, content)).Content.ReadAsStringAsync();
var resultObj = new
{
status = 0
};
var esResult = JsonConvert.DeserializeAnonymousType(strEsResult, resultObj);
sw.Stop();
_log?.Info($"【寫入ES索引】【{(esResult.status == 0 ? "成功" : "失敗")}】耗時:{sw.Elapsed.TotalSeconds:0.000} 秒,索引名稱:{indexName},請求URL:{esUrl},請求參數:{esRequestData}");
return esResult.status == 0;
}
catch
{
throw;
}
finally
{
_sem.Release();
}
}
用到的庫
評論區有人問技術棧,這裡列一下主要的庫:
- Microsoft.Extensions.DependencyInjection 和 Autofac (依賴註入)
- AutoMapper (實體類映射)
- Microsoft.Extensions.Http (HttpClient,用於操作ElasticSearch、網路請求)
- Quartz (定時任務)
- Dapper、Dapper.LiteSql (ORM)
- Newtonsoft.Json (Json序列化)
- ClickHouse.Client (操作ClickHouse)
- Oracle.ManagedDataAccess.Core (操作Oracle)
- MySqlConnector (操作MySQL)
我最近寫了哪些工程
- 大雜燴腳本工程,包括查詢clickhouse統計分析輸出Excel、查詢MySQL和Oracle、各種小腳本工具
- Blazor工程,做了一套簡單的增刪改查,精力有限,自己測試用,不用手動改資料庫了
- 數據挖掘服務,主要是Web API和定時任務
- Winform工具,用於測試時創建ES索引、生產模擬數據。為什麼寫這個?因為做數據挖掘,不給數據,只能自己造了。
為什麼從這篇博客看起來這個項目只有我一個人在做?沒團隊?
還有項目經理、產品經理、前端等一共幾個人,項目資金投入少,所以不可能有很多人的。
為什麼沒有使用Python?
我一開始是想使用Python的,但就我用.NET寫的這些東西,如果改用Python,沒個2、3年經驗,寫不順暢。
我用.NET做一個項目,Swagger有了,創建工程時自帶的,當然Python的Swagger也是有的,你可以百度"python 從註釋自動生成 swagger",之前看到過一個不錯的,沒保存,一時半會就找不到了。
用Blazor做了簡單的配置頁面,測試時不用去手動修改資料庫了
寫了一個Mock工程,生產模擬測試數據,寫入速度可以達到6000條/秒(一條數據請求一次,不是批量寫入),界面如下:
最後
寫此博客是為了給.NET正名,在大數據項目中,.NET大有可為。
我寫代碼沒有用到什麼特別的技術,看起來很簡單,但也不是隨便學學就能寫,沒個3、5年經驗,很難寫的這麼快。
我寫代碼也沒有什麼條條框框,可能不規範,但很靈活。
例如,winform程式註入日誌工具類怎麼寫?來不急百度了,就這麼寫吧,一樣每秒6000條的狂寫日誌,還不卡界面:
public partial class Form1 : Form, ILog
{
...省略
public Form1()
{
InitializeComponent();
...省略
//註入日誌工具類
ServiceFactory.Get<IndexCreationService>().InjectLog(this);
ServiceFactory.Get<EsWriteService>().InjectLog(this);
ServiceFactory.Get<MockDataService>().InjectLog(this);
}
}
internal class EsWriteService : ServiceBase
{
...省略
private ILog? _log;
public void InjectLog(ILog log) => _log = log;
public async Task<bool> Write(string indexName, dynamic esRequestBody)
{
...省略
_log?.Info("xxx");
...省略
}
}
就目前這些項目、腳本、工具而言,感覺這就是我寫的最佳實踐。不知道最佳實踐,代碼也能寫,容易寫成屎山,要麼寫的服務三天兩頭崩。