簡單的介紹一下,我使用MongoDB的場景。 我們現在的物聯網環境下,有部分數據,採樣頻率為2000條記錄/分鐘,這樣下來一天24*60*2000=2880000約等於300萬條數據,以後必然還會增加。之前資料庫使用的是mssql,對於資料庫的壓力很大,同時又需要保證歷史查詢的響應速度,這種情況下, ...
簡單的介紹一下,我使用MongoDB的場景。
我們現在的物聯網環境下,有部分數據,採樣頻率為2000條記錄/分鐘,這樣下來一天24*60*2000=2880000約等於300萬條數據,以後必然還會增加。之前資料庫使用的是mssql,對於資料庫的壓力很大,同時又需要保證歷史查詢的響應速度,這種情況下,在單表中數據量大,同時存在讀寫操作。不得已採用MongoDB來存儲數據。如果使用MongoDB,則至少需要三台機器,兩台實現讀寫分離,一臺作為仲裁(當然條件不允許也可以不用),每台機器的記憶體暫時配置在16G,公司小,沒辦法,據說,使用這個MongoDB需要機器記憶體最少92G,我沒有驗證過,但是吃記憶體是公認的,所以記憶體絕對要保證,就算保證了,也不一定完全就沒有意外發生。我們上面的這些特殊的數據是允許少量的丟失的,這些只是做分析使用的,幾個月了,暫時還沒出現數據丟失的情況,可能最新版本早就修複了吧,新手使用建議多看下官網上的說明。下麵直接奔入主題:
一、安裝部署和配置環境
1.安裝部署mongo-server(V3.4)
參考http://blog.csdn.net/qq_27093465/article/details/54574948 這個時候不要啟動,接著配置config文件2.配置Config文件
dbpath=C:/Program Files/MongoDB/Server/3.4/bin/data/db logpath=C:/Program Files/MongoDB/Server/3.4/bin/data/log/master.log pidfilepath=C:/Program Files/MongoDB/Server/3.4/bin/master.pid directoryperdb=true logappend=true replSet=testrs bind_ip=10.1.5.25 port=27016 oplogSize=10000 noauth = true storageEngine = wiredTiger wiredTigerCacheSizeGB = 2 syncdelay = 30 wiredTigerCollectionBlockCompressor = snappy 以上是詳細的配置參數,其中路徑部分根據需要更改, 這裡設置的oplogsize大小為10G,根據業務場景進行調整,另外auth許可權為null,因為設置許可權會增加服務開銷,影響效率,最下麵幾行是記憶體引擎,可以控制副本集同步及記憶體限制,防止記憶體泄露。3.啟動mongo-server
4.添加副本集配置
conf= { "_id" : "testrs", "members" : [ { "_id" : 0, "host" : "10.1.5.25:27016" }, { "_id" : 1, "host" : "10.1.5.26:27016" }, { "_id" : 2, "host" : "10.1.5.27:27016" } ] } rs.initiate(conf) 此時副本集集群配置已經完成,然後在命令行中輸入:rs.status(),查看副本集狀態,需要查看同步情況,可以輸入命令:db.serverStatus().5.設置副本集可讀寫
Rs.slaveOk()6..NET操作mongo
連接設置,請參考個人封裝Unitoon.Mongo代碼所示。7.性能對比
讀寫速度:Redis>Mongo>Mssqlserver 可容納數據量:Mssqlserver~Mongo>Redis 存儲數據類型:Mongo>Mssqlserver>Redis Note:記憶體持續上升,內部沒有記憶體回收機制,若限制記憶體 ,則可能出現查詢速度變慢,數據丟失等問題,建議優化查詢效率,建立索引 Db.test.ensureIndex({"username":1, "age":-1}) 強制釋放記憶體命令:db.runCommand({closeAllDatabases:1})二、倉儲設計
1.基類BaseEntity
namespace UnitoonIot.Mongo { /// <summary> /// 實體基類,方便生成ObjId /// </summary> [Serializable] [ProtoContract(ImplicitFields = ImplicitFields.AllPublic)] //[ProtoInclude(10, typeof(NormalHistory))] public class BaseEntity { //[BsonRepresentation(BsonType.ObjectId)] public ObjectId Id { get; set; } /// <summary> /// 資料庫名稱 /// </summary> public string DbName { get; set; } /// <summary> /// 給對象初值 /// </summary> public BaseEntity() { // this.ObjId = ObjectId.GenerateNewId().ToString(); //this.Id = ObjectId.NewObjectId().ToString(); } } }
這裡需要註意時間格式,MongoDB預設時間格式為國際時間,所以在寫入數據時和讀取數據時,時間格式要一致,此例中沒有對時間進行特殊處理,由傳入的時間格式確定。
2.Repository繼承介面IMongoRepository
namespace UnitoonIot.Mongo { public interface IMongoRepository<TEntity> where TEntity : class { } }
3.MongoRepository
using MongoDB.Driver; using MongoDB.Bson; using System; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; using MongoDB.Bson.Serialization.Attributes; using MongoDB.Driver.Linq; using System.Configuration; using System.IO; using UnitoonIot.AppSetting; namespace UnitoonIot.Mongo { public class MongoDb { private static string ConnectionStringHost ; private static string UserName ; private static string Password; private static IMongoDatabase _db = null; private static readonly object LockHelper = new object(); /// <summary> /// mongodb初始化 /// </summary> public static void Init() { ConnectionStringHost = "10.1.5.24:27016,10.1.5.24:27016,10.1.5.26:27017"; //AppSettings.GetConfigValue("MongoHost");//"10.1.5.24:27016"; UserName = AppSettings.GetConfigValue("MongoUserName"); Password = AppSettings.GetConfigValue("MongoPwd"); } static MongoDb() { } public static IMongoDatabase GetDb(string dbName,string options=null) { if (_db != null) return _db; lock (LockHelper) { if (_db != null) return _db; var database = dbName; var userName = UserName; var password = Password; var authentication = string.Empty; var host = string.Empty; if (!string.IsNullOrWhiteSpace(userName)) { authentication = string.Concat(userName, ':', password, '@'); } if (!string.IsNullOrEmpty(options) && !options.StartsWith("?")) { options = string.Concat('?', options); } host = string.IsNullOrEmpty(ConnectionStringHost) ? "localhost" : ConnectionStringHost; database = database ?? "testdb"; //mongodb://[username:password@]host1[:port1][,host2[:port2],…[,hostN[:portN]]][/[database][?options]] var conString = options!=null? $"mongodb://{authentication}{host}/{database}{options}" : $"mongodb://{authentication}{host}/{database}"; var url = new MongoUrl(conString); var mcs = MongoClientSettings.FromUrl(url); mcs.MaxConnectionLifeTime = TimeSpan.FromMilliseconds(1000); var client = new MongoClient(mcs); _db = client.GetDatabase(url.DatabaseName); } return _db; } } /// <summary> /// MongoDb 資料庫操作類 /// </summary> public class MongoRepository<T>: IMongoRepository<T> where T : BaseEntity { #region readonly field /// <summary> /// 表名 /// </summary> private readonly IMongoCollection<T> _collection = null; /// <summary> /// 資料庫對象 /// </summary> private readonly IMongoDatabase _database; #endregion /// <summary> /// 構造函數 /// </summary> public MongoRepository() { this._database = MongoDb.GetDb(Activator.CreateInstance<T>().DbName, "readPreference =secondaryPreferred ");//primaryPreferred/secondaryPreferred/nearest _collection = _database.GetCollection<T>(typeof(T).Name); } #region 增加 /// <summary> /// 插入對象 /// </summary> /// <param name="t">插入的對象</param> public virtual T Insert(T t) { // var flag = ObjectId.GenerateNewId(); // t.GetType().GetProperty("Id").SetValue(t, flag); //t.Time = DateTime.Now; _collection.InsertOne(t); return t; } /// <summary> /// 批量插入 /// </summary> /// <param name="ts">要插入的對象集合</param> public virtual IEnumerable<T> InsertBatch(IEnumerable<T> ts) { _collection.InsertMany(ts); return ts; } /// <summary> /// 插入對象 /// </summary> /// <param name="t">插入的對象</param> public virtual void InsertAsync(T t) { //var flag = ObjectId.GenerateNewId(); // t.GetType().GetProperty("Id").SetValue(t, flag); // t.Time = DateTime.Now; _collection.InsertOneAsync(t); } /// <summary> /// 批量插入 /// </summary> /// <param name="ts">要插入的對象集合</param> public virtual void InsertBatchAsync(IEnumerable<T> ts) { _collection.InsertManyAsync(ts); } #endregion #region 刪除 /// <summary> /// 刪除 /// </summary> /// <returns></returns> public virtual long Delete(T t) { var filter = Builders<T>.Filter.Eq("Id", t.Id); var result = _collection.DeleteOne(filter); return result.DeletedCount; } /// <summary> /// 刪除 /// </summary> /// <returns></returns> public virtual void DeleteAsync(T t) { var filter = Builders<T>.Filter.Eq("Id", t.Id); _collection.DeleteOneAsync(filter); } /// <summary> /// 按條件表達式刪除 /// </summary> /// <param name="predicate">條件表達式</param> /// <returns></returns> public virtual long Delete(Expression<Func<T, bool>> predicate) { var result = _collection.DeleteOne(predicate); return result.DeletedCount; } /// <summary> /// 按條件表達式刪除 /// </summary> /// <param name="predicate">條件表達式</param> /// <returns></returns> public virtual void DeleteAsync(Expression<Func<T, bool>> predicate) { _collection.DeleteOneAsync(predicate); } /// <summary> /// 按條件表達式批量刪除 /// </summary> /// <param name="predicate">條件表達式</param> /// <returns></returns> public virtual long DeleteBatch(Expression<Func<T, bool>> predicate) { var result = _collection.DeleteMany(predicate); return result.DeletedCount; } /// <summary> /// 按條件表達式批量刪除 /// </summary> /// <param name="predicate">條件表達式</param> /// <returns></returns> public virtual void DeleteBatchAsync(Expression<Func<T, bool>> predicate) { _collection.DeleteManyAsync(predicate); } /// <summary> /// 按檢索條件刪除 /// 建議用Builders<T>構建複雜的查詢條件 /// </summary> /// <param name="filter">條件</param> /// <returns></returns> public virtual long Delete(FilterDefinition<T> filter) { var result = _collection.DeleteOne(filter); return result.DeletedCount; } /// <summary> /// 按檢索條件刪除 /// 建議用Builders<T>構建複雜的查詢條件 /// </summary> /// <param name="filter">條件</param> /// <returns></returns> public virtual void DeleteAsync(FilterDefinition<T> filter) { _collection.DeleteOneAsync(filter); } #endregion #region 修改 /// <summary> /// 修改(Id不變) /// </summary> /// <returns></returns> public virtual long Update(T t) { var filterBuilder = Builders<T>.Filter; var filter = filterBuilder.Eq("Id",t.Id); var update = _collection.ReplaceOne(filter, t, new UpdateOptions() { IsUpsert = true }); return update.ModifiedCount; } /// <summary> /// 修改(Id不變) /// </summary> /// <returns></returns> public virtual void UpdateAsync(T t) { var filterBuilder = Builders<T>.Filter; var filter = filterBuilder.Eq("Id", t.Id); _collection.ReplaceOneAsync(filter, t, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 用新對象替換新文檔 /// </summary> /// <param name="filter">查詢條件</param> /// <param name="t">對象</param> /// <returns>修改影響文檔數</returns> public virtual long Update(Expression<Func<T, bool>> filter, T t) { var update = _collection.ReplaceOne(filter, t, new UpdateOptions() { IsUpsert = true }); return update.ModifiedCount; } /// <summary> /// 用新對象替換新文檔 /// </summary> /// <param name="filter">查詢條件</param> /// <param name="t">對象</param> /// <returns>修改影響文檔數</returns> public virtual long Update(FilterDefinition<T> filter, T t) { var update = _collection.ReplaceOne(filter, t, new UpdateOptions() { IsUpsert = true }); return update.ModifiedCount; } /// <summary> /// 用新對象替換新文檔 /// </summary> /// <param name="filter">查詢條件</param> /// <param name="t">對象</param> /// <returns>修改影響文檔數</returns> public virtual void UpdateAsync(Expression<Func<T, bool>> filter, T t) { _collection.ReplaceOneAsync(filter, t, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 用新對象替換新文檔 /// </summary> /// <param name="filter">查詢條件</param> /// <param name="t">對象</param> /// <returns>修改影響文檔數</returns> public virtual void UpdateAsync(FilterDefinition<T> filter, T t) { _collection.ReplaceOneAsync(filter, t, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 根據Id和條件文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="id">對象Id</param> /// <returns>修改影響文檔數</returns> public virtual long Update(string id, UpdateDefinition<T> update) { var filterBuilder = Builders<T>.Filter; var filter = filterBuilder.Eq("Id", new ObjectId(id)); var result = _collection.UpdateOne(filter, update, new UpdateOptions() { IsUpsert = true }); return result.ModifiedCount; } /// <summary> /// 根據Id和條件文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="id">對象Id</param> /// <returns>修改影響文檔數</returns> public virtual void UpdateAsync(string id, UpdateDefinition<T> update) { var filterBuilder = Builders<T>.Filter; var filter = filterBuilder.Eq("Id", new ObjectId(id)); _collection.UpdateOneAsync(filter, update, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 根據條件修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual void Update(UpdateDefinition<T> update,Expression<Func<T, bool>> filter) { _collection.UpdateOne(filter, update, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 根據條件修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual long Update(UpdateDefinition<T> update, FilterDefinition<T> filter) { var result = _collection.UpdateOne(filter, update, new UpdateOptions() { IsUpsert = true }); return result.ModifiedCount; } /// <summary> /// 根據條件修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual void UpdateAsync(UpdateDefinition<T> update, Expression<Func<T, bool>> filter) { _collection.UpdateOneAsync(filter, update, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 根據條件修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual void UpdateAsync(UpdateDefinition<T> update, FilterDefinition<T> filter) { _collection.UpdateOneAsync(filter, update, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 根據條件批量修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual long UpdateBatch(UpdateDefinition<T> update, Expression<Func<T, bool>> filter) { var result = _collection.UpdateMany(filter, update, new UpdateOptions() { IsUpsert = true }); return result.ModifiedCount; } /// <summary> /// 根據條件批量修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual long UpdateBatch(UpdateDefinition<T> update, FilterDefinition<T> filter) { var result = _collection.UpdateMany(filter, update, new UpdateOptions() { IsUpsert = true }); return result.ModifiedCount; } /// <summary> /// 根據條件批量修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual void UpdateBatchAsync(UpdateDefinition<T> update, Expression<Func<T, bool>> filter) { _collection.UpdateManyAsync(filter, update, new UpdateOptions() { IsUpsert = true }); } /// <summary> /// 根據條件批量修改文檔 /// </summary> /// <param name="update">修改條件-形如:Builders/<T/>.Update.Set(filed, value)</param> /// <param name="filter">查詢條件Builders/<T/>.Filter.Eq(filed, value)</param> /// <returns>修改影響文檔數</returns> public virtual void UpdateBatchAsync(UpdateDefinition<T> update, FilterDefinition<T> filter) { _collection.UpdateManyAsync(filter, update, new UpdateOptions() { IsUpsert = true }); } #endregion #region 查詢 #region GetCollection /// <summary> /// 獲取操作對象的IMongoCollection集合,強類型對象集合 /// </summary> /// <returns></returns> public virtual IMongoCollection<T> GetCollection() { return _database.GetCollection<T>(typeof(T).Name); } #endregion #region GetSingle /// <summary> /// 查詢資料庫,檢查是否存在指定ID的對象 /// </summary> /// <param name="id">對象的ID值</param> /// <returns>存在則返回指定的對象,否則返回Null</returns> public virtual T GetById(string id) { var filterBuilder = Builders<T>.Filter; var filter = filterBuilder.Eq("Id", new ObjectId(id)); var data = _collection.Find(filter).FirstOrDefault(); return data; } /// <summary> /// 查詢資料庫,檢查是否存在指定ID的對象 /// </summary> /// <param name="id">對象的ID值</param> /// <returns>存在則返回指定的對象,否則返回Null</returns> public virtual async Task<T> GetAsyncById(string id) { var filterBuilder = Builders<T>.Filter; var filter = filterBuilder.Eq("Id", new ObjectId(id)); var data = await _collection.FindAsync(filter); return await data.SingleOrDefaultAsync(); } /// <summary> /// 查詢數據 /// </summary> /// <param name="filter">過濾條件</param> /// <returns></returns> public virtual T Get(FilterDefinition<T> filter) { return _collection.Find(filter).FirstOrDefault(); } /// <summary> /// 查詢數據 /// </summary> /// <param name="filter">條件表達式</param> /// <returns></returns> public virtual T Get(Expression<Func<T,bool>> filter) { return _collection.Find(filter).FirstOrDefault(); } /// <summary> /// 查詢數據 /// </summary> /// <param name="filter">過濾條件</param> /// <returns></returns> public virtual async Task<T> GetAsync(FilterDefinition<T> filter) { var data = await _collection.FindAsync(filter); return await data.SingleOrDefaultAsync(); } /// <summary> /// 查詢數據 /// </summary> /// <param name="filter">條件表達式</param> /// <returns></returns> public virtual async Task<T> GetAsync(Expression<Func<T, bool>> filter) { var data = await _collection.FindAsync(filter); return await data.SingleOrDefaultAsync(); } #endregion #region GetMany /// <summary> /// 查詢部分數據 /// </summary> /// <param name="filter">過濾條件</param> /// <returns></returns> public virtual IEnumerable<T> GetMany(FilterDefinition<T> filter) { return _collection.Find(filter).ToEnumerable(); } /// <summary> /// 查詢部分數據 /// </summary> /// <param name="filter">條件表達式</param> /// <returns></returns> public virtual IEnumerable<T> GetMany(Expression<Func<T,bool>> filter) { //return _collection.AsQueryable().Where(filter).ToList(); //return _collection.AsQueryable().Where(filter); return _collection.Find(filter).ToEnumerable(); //.ToEnumerable(); } /// <summary> /// 查詢部分數據 /// </summary> /// <param name="filter">過濾條件</param> /// <returns></returns> public virtual async Task<IEnumerable<T>> GetManyAsync(FilterDefinition<T> filter) { var data = await _collection.FindAsync(filter); return await data.ToListAsync(); } /// <summary> /// 查詢部分數據 /// </summary> /// <param name="filter">過濾條件</param> /// <returns></returns> public virtual async Task<IEnumerable<T>> GetManyAsync(Expression<Func<T, bool>> filter) { var data = await _collection.FindAsync(filter); return await data.ToListAsync(); } #endregion #region GetAll /// <summary> /// 查詢所有記錄,複雜查詢直接用Linq處理(避免全表掃描) /// </summary> /// <returns>要查詢的對象</returns> public virtual IEnumerable<T> GetAll() { var data = _collection.AsQueryable(); return data.ToEnumerable(); } /// <summary> /// 查詢所有記錄,複雜查詢直接用Linq處理(避免全表掃描) /// </summary> /// <returns>要查詢的對象</returns> public virtual async Task<IEnumerable<T>> GetAllAsync() { var data = _collection.AsQueryable(); return await data.ToListAsync(); } /// <summary> /// 查詢所有記錄,複雜查詢直接用Linq處理(避免全表掃描) /// </summary> /// <returns>要查詢的對象</returns> public virtual IQueryable<T> GetAllQueryable() { return _collection.AsQueryable(); } #endregion #region MapReduce /// <summary> /// MapReduce /// </summary> /// <returns>返回一個List列表數據</returns> public IEnumerable<T> GetMap(BsonJavaScript map,BsonJavaScript reduce) { return _collection.MapReduce<T>(map,reduce).ToList(); } #endregion #endregion } }
好了,就介紹到這裡。