事情是這樣的,在一個新項目中引入了fastdfs,用這玩意做一些小數據的存儲還是很方便的,然後在nuget上就找到了一個FastDFS的sdk,如下圖: 一眼就看到了這個top1的sdk,應該會比較靠譜。。。簡單的在項目中應用了一下沒啥問題就忽悠上線了,然後就悲劇了,測試那邊反饋說上傳了一個 人群, ...
事情是這樣的,在一個新項目中引入了fastdfs,用這玩意做一些小數據的存儲還是很方便的,然後在nuget上就找到了一個FastDFS的sdk,如下圖:
一眼就看到了這個top1的sdk,應該會比較靠譜。。。簡單的在項目中應用了一下沒啥問題就忽悠上線了,然後就悲劇了,測試那邊反饋說上傳了一個
人群,拉下來的時候少了幾個人,我的使用方式是將一批customerid按照bitmap的形式存到byte[]數組傳到fastdfs,最後硬著頭皮追蹤下來發現是這個所謂
的sdk在upload的時候在bytes數組處理上出了bug,這下無語了,哎,nuget上這寫sdk的估計也就是個人寫著玩玩丟上去的,哪裡敢用到生產上,還好在測
試環境發現了,不然又得出什麼亂子了。
一:解決辦法
問題還得要解決,不過慶幸的是,fastdfs是阿裡的一個大牛YuQing寫的,那應該有java的sdk更靠譜一點,用maven的話更方便。
<dependency> <groupId>net.oschina.zcx7878</groupId> <artifactId>fastdfs-client-java</artifactId> <version>1.27.0.0</version> </dependency>
pull下來以後,這個sdk果然是fastdfs的作者寫的,這下子安全感暴增,測試了一下,那個bug用這個sdk果然就沒有問題了。。。開心~~~~
然後流程圖大概就變成了這個樣子。
二:解決C# 和 JAVA的互通問題
互通方式比較多,除了走rest這種面向http的方式,還可以使用thrift,grpc這種tcp的模式,最後我決定還是採用thrift走一遭,目前最新的版本是0.11了。
網址:http://thrift.apache.org/。 看了一下C#的thrift sdk,貌似最高支持0.9.1,網址為:http://archive.apache.org/dist/thrift/0.9.1/ ,
有了這個thrift-0.9.1.exe之後,接下來就可以定義Thrift的DSL,這個DSL可以讓thrift-0.9.1.exe 生成各個語言版本的sdk。
1. 定義Thrift DSL
service ThriftService { string Upload(1: binary data), binary Download(1: string path), bool Remove(1: string path) }
有人可能會問,這個DSL怎麼寫,這個大家可以看看官方的DSL的各個關鍵詞描述的網址:http://thrift.apache.org/docs/idl 還是比較簡單的,如果不清楚的
話,這個是示例大全: https://git-wip-us.apache.org/repos/asf?p=thrift.git;a=blob_plain;f=test/ThriftTest.thrift;hb=HEAD ,然後保存為1.thrift。
2. 通過thrift生成 C# SDK
生成的方式可以參考一下官網的模板:
thrift --gen <language> <Thrift filename>
C:\Users\hxc>cd C:\java\lib\thrift C:\java\lib\thrift>thrift-0.9.1.exe -gen csharp C:\java\lib\thrift\1.thrift
可以看到,執行完之後,就多了一個gen-csharp文件夾,點進去看一下,會發現有一個文件名為DSL中定義的ThriftService.cs文件。
3. 通過thrift生成 JAVA SDK
執行完下麵這條語句,你會發現你的文件夾又多了一份gen-java 。
C:\java\lib\thrift>thrift-0.9.1.exe -gen java C:\java\lib\thrift\1.thrift
三:SDK集成
改造之後,我們使用JAVA作為服務端,C#作客戶端,服務端要做的事情就是通過JAVA來封裝FastDFS,然後讓C#來調用。
1. JAVA服務端
《1》使用fastDFS 和 Thrift的Maven地址:
<!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift --> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.1</version> </dependency> <!-- https://mvnrepository.com/artifact/net.oschina.zcx7878/fastdfs-client-java --> <dependency> <groupId>net.oschina.zcx7878</groupId> <artifactId>fastdfs-client-java</artifactId> <version>1.27.0.0</version> </dependency>
《2》 ThriftServiceImpl.java 實現類:
1 package com.datamip.thrift; 2 3 import java.io.IOException; 4 import java.nio.ByteBuffer; 5 import java.util.Date; 6 7 import org.apache.log4j.Logger; 8 import org.apache.thrift.TException; 9 import org.csource.common.MyException; 10 import org.csource.fastdfs.StorageClient; 11 12 import com.fasterxml.jackson.databind.ObjectMapper; 13 14 /* 15 * thrift 服務端 16 */ 17 public class ThriftServiceImpl implements ThriftService.Iface { 18 19 public static Logger logger1 = Logger.getLogger(App.class); 20 21 StorageClient client = null; 22 23 ObjectMapper objectMapper=new ObjectMapper(); 24 25 public ThriftServiceImpl() throws IOException, MyException { 26 client = new FastService().Create(); 27 } 28 29 //上傳文件 30 public String Upload(ByteBuffer data) { 31 32 byte[] bytes = data.array(); 33 34 logger1.info("已成功接受到upload請求: bytes.length="+bytes.length); 35 36 if(bytes==null || bytes.length==0) return ""; 37 38 // 目前給的 “尾碼名為 g1",以後可以動態變更,通過‘阿波羅’動態配置 39 String[] result = null; 40 41 try { 42 result = client.upload_file(bytes, "g1", null); 43 44 logger1.info("update 上傳結果為: "+objectMapper.writeValueAsString(result)); 45 46 if (result.length < 2) return ""; 47 48 }catch (Exception e) { 49 logger1.error("upload異常",e); 50 } 51 52 return result[1]; 53 } 54 55 // 文件下載 56 public ByteBuffer Download(String path) throws TException { 57 58 logger1.info("已成功接受到download請求:"+path); 59 60 if (path == null || path == "") 61 return ByteBuffer.allocate(0); 62 63 String[] arr = path.split("\\."); 64 65 if (arr.length < 2) 66 return ByteBuffer.allocate(0); 67 68 String group_name = arr[1]; 69 70 try { 71 byte[] bytes = client.download_file(group_name, path); 72 73 logger1.info(String.format("根據path=%s,獲取的bytes長度為:%s",path,bytes.length)); 74 75 return ByteBuffer.wrap(bytes); 76 77 }catch (Exception e) { 78 logger1.error("download異常",e); 79 } 80 81 // TODO Auto-generated method stub 82 return ByteBuffer.allocate(0); 83 } 84 85 // 刪除文件 86 public boolean Remove(String path) throws TException { 87 88 logger1.info("已成功接受到remove請求:"+path); 89 90 if (path == null || path == "") return false; 91 92 String[] arr = path.split("\\."); 93 94 if(arr==null || arr.length<2) return false; 95 96 String group_name = arr[1]; 97 98 try { 99 int code = client.delete_file(group_name, path); 100 101 logger1.info(String.format("當前path=%s, groupname=%s,返回狀態值=%s", 102 path,group_name,code)); 103 104 if(code==0) { 105 return true; 106 } 107 108 }catch (Exception e) { 109 logger1.error("Remove異常",e); 110 } 111 112 return false; 113 } 114 }
《3》 FastDFS的封裝類
1 package com.datamip.thrift; 2 3 import java.io.IOException; 4 5 import org.csource.common.MyException; 6 import org.csource.fastdfs.ClientGlobal; 7 import org.csource.fastdfs.StorageClient; 8 import org.csource.fastdfs.StorageServer; 9 import org.csource.fastdfs.TrackerClient; 10 import org.csource.fastdfs.TrackerServer; 11 12 import com.datamip.utils.PropertiesUtils; 13 14 public class FastService { 15 16 public StorageClient Create() throws IOException, MyException { 17 18 //讀取配置文件 19 String path = PropertiesUtils.getProperties("setting.properties","fastdfs"); 20 return this.Create(path); 21 } 22 23 public StorageClient Create(String host) throws IOException, MyException { 24 25 ClientGlobal.initByTrackers(host); 26 27 // 3、創建一個TrackerClient對象。 28 TrackerClient trackerClient = new TrackerClient(); 29 30 // 4、創建一個TrackerServer對象。 31 TrackerServer trackerServer = trackerClient.getConnection(); 32 33 // 5、聲明一個StorageServer對象,null。 34 StorageServer storageServer = null; 35 36 // 6、獲得StorageClient對象。 37 StorageClient storageClient = new StorageClient(trackerServer, storageServer); 38 39 return storageClient; 40 } 41 }
《4》最後就是AppMain,Thrift開啟19999埠。
1 package com.datamip.thrift; 2 3 import java.io.IOException; 4 5 import org.apache.log4j.Logger; 6 import org.apache.thrift.TProcessor; 7 import org.apache.thrift.protocol.TBinaryProtocol; 8 import org.apache.thrift.server.TServer; 9 import org.apache.thrift.server.TSimpleServer; 10 import org.apache.thrift.transport.TServerSocket; 11 import org.apache.thrift.transport.TTransportException; 12 import org.csource.common.MyException; 13 14 public class App { 15 16 public static Logger logger1 = Logger.getLogger(App.class); 17 18 public static void main(String[] args) throws IOException, MyException { 19 20 try { 21 TProcessor tprocessor = new ThriftService.Processor<ThriftService.Iface>(new ThriftServiceImpl()); 22 23 TServerSocket serverTransport = new TServerSocket(9999); 24 TServer.Args tArgs = new TServer.Args(serverTransport); 25 26 tArgs.processor(tprocessor); 27 tArgs.protocolFactory(new TBinaryProtocol.Factory()); 28 29 logger1.debug("thrift 服務端開啟,開放埠 19999"); 30 31 TServer server = new TSimpleServer(tArgs); 32 server.serve(); 33 } catch (TTransportException e) { 34 e.printStackTrace(); 35 } 36 } 37 }
2. C#客戶端
《1》 從negut上把dll拉下來,然後把生成的ThriftService.cs引入到我們的解決方案中
public partial class ThriftService { public interface Iface { string Upload(byte[] data); #if SILVERLIGHT IAsyncResult Begin_Upload(AsyncCallback callback, object state, byte[] data); string End_Upload(IAsyncResult asyncResult); #endif byte[] Download(string path); #if SILVERLIGHT IAsyncResult Begin_Download(AsyncCallback callback, object state, string path); byte[] End_Download(IAsyncResult asyncResult); #endif bool Remove(string path); #if SILVERLIGHT IAsyncResult Begin_Remove(AsyncCallback callback, object state, string path); bool End_Remove(IAsyncResult asyncResult); #endif } public class Client : IDisposable, Iface { public Client(TProtocol prot) : this(prot, prot) { } public Client(TProtocol iprot, TProtocol oprot) { iprot_ = iprot; oprot_ = oprot; } protected TProtocol iprot_; protected TProtocol oprot_; protected int seqid_; public TProtocol InputProtocol { get { return iprot_; } } public TProtocol OutputProtocol { get { return oprot_; } } #region " IDisposable Support " private bool _IsDisposed; // IDisposable public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { if (!_IsDisposed) { if (disposing) { if (iprot_ != null) { ((IDisposable)iprot_).Dispose(); } if (oprot_ != null) { ((IDisposable)oprot_).Dispose(); } } } _IsDisposed = true; } #endregion #if SILVERLIGHT public IAsyncResult Begin_Upload(AsyncCallback callback, object state, byte[] data) { return send_Upload(callback, state, data); } public string End_Upload(IAsyncResult asyncResult) { oprot_.Transport.EndFlush(asyncResult); return recv_Upload(); } #endif public string Upload(byte[] data) { #if !SILVERLIGHT send_Upload(data); return recv_Upload(); #else var asyncResult = Begin_Upload(null, null, data); return End_Upload(asyncResult); #endif } #if SILVERLIGHT public IAsyncResult send_Upload(AsyncCallback callback, object state, byte[] data) #else public void send_Upload(byte[] data) #endif { oprot_.WriteMessageBegin(new TMessage("Upload", TMessageType.Call, seqid_)); Upload_args args = new Upload_args(); args.Data = data; args.Write(oprot_); oprot_.WriteMessageEnd(); #if SILVERLIGHT return oprot_.Transport.BeginFlush(callback, state); #else oprot_.Transport.Flush(); #endif } public string recv_Upload() { TMessage msg = iprot_.ReadMessageBegin(); if (msg.Type == TMessageType.Exception) { TApplicationException x = TApplicationException.Read(iprot_); iprot_.ReadMessageEnd(); throw x; } Upload_result result = new Upload_result(); result.Read(iprot_); iprot_.ReadMessageEnd(); if (result.__isset.success) { return result.Success; } throw new TApplicationException(TApplicationException.ExceptionType.MissingResult, "Upload failed: unknown result"); } #if SILVERLIGHT public IAsyncResult Begin_Download(AsyncCallback callback, object state, string path) { return send_Download(callback, state, path); } public byte[] End_Download(IAsyncResult asyncResult) { oprot_.Transport.EndFlush(asyncResult); return recv_Download(); } #endif public byte[] Download(string path) { #if !SILVERLIGHT send_Download(path); return recv_Download(); #else var asyncResult = Begin_Download(null, null, path); return End_Download(asyncResult); #endif } #if SILVERLIGHT public IAsyncResult send_Download(AsyncCallback callback, object state, string path) #else public void send_Download(string path) #endif { oprot_.WriteMessageBegin(new TMessage("Download", TMessageType.Call, seqid_)); Download_args args = new Download_args(); args.Path = path; args.Write(oprot_); oprot_.WriteMessageEnd(); #if SILVERLIGHT return oprot_.Transport.BeginFlush(callback, state); #else oprot_.Transport.Flush(); #endif } public byte[] recv_Download() { TMessage msg = iprot_.ReadMessageBegin(); if (msg.Type == TMessageType.Exception) { TApplicationException x = TApplicationException.Read(iprot_); iprot_.ReadMessageEnd(); throw x; } Download_result result = new Download_result(); result.Read(iprot_); iprot_.ReadMessageEnd(); if (result.__isset.success) { return result.Success; } throw new TApplicationException(TApplicationException.ExceptionType.MissingResult, "Download failed: unknown result"); } #if SILVERLIGHT public IAsyncResult Begin_Remove(AsyncCallback callback, object state, string path) { return send_Remove(callback, state, path); } public bool End_Remove(IAsyncResult asyncResult) { oprot_.Transport.EndFlush(asyncResult); return recv_Remove(); } #endif public bool Remove(string path) { #if !SILVERLIGHT send_Remove(path); return recv_Remove(); #else var asyncResult = Begin_Remove(null, null, path); return End_Remove(asyncResult); #endif } #if SILVERLIGHT public IAsyncResult send_Remove(AsyncCallback callback, object state, string path) #else public void send_Remove(string path) #endif { oprot_.WriteMessageBegin(new TMessage("Remove", TMessageType.Call, seqid_)); Remove_args args = new Remove_args(); args.Path = path; args.Write(oprot_); oprot_.WriteMessageEnd(); #if SILVERLIGHT return oprot_.Transport.BeginFlush(callback, state); #else oprot_.Transport.Flush(); #endif } public bool recv_Remove() { TMessage msg = iprot_.ReadMessageBegin(); if (msg.Type == TMessageType.Exception) { TApplicationException x = TApplicationException.Read(iprot_); iprot_.ReadMessageEnd(); throw x; } Remove_result result = new Remove_result(); result.Read(iprot_); iprot_.ReadMessageEnd(); if (result.__isset.success) { return result.Success; } throw new TApplicationException(TApplicationException.ExceptionType.MissingResult, "Remove failed: unknown result"); } } public class Processor : TProcessor { public Processor(Iface iface) { iface_ = iface; processMap_["Upload"] = Upload_Process; processMap_["Download"] = Download_Process; processMap_["Remove"] = Remove_Process; } protected delegate void ProcessFunction(int seqid, TProtocol iprot, TProtocol oprot); private Iface iface_; protected Dictionary<string, ProcessFunction> processMap_ = new Dictionary<string, ProcessFunction>(); public bool Process(TProtocol iprot, TProtocol oprot) { try { TMessage msg = iprot.ReadMessageBegin(); ProcessFunction fn; processMap_.TryGetValue(msg.Name, out fn); if (fn == null) { TProtocolUtil.Skip(iprot, TType.Struct); iprot.ReadMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.ExceptionType.UnknownMethod, "Invalid method name: '" + msg.Name + "'"); oprot.WriteMessageBegin(new TMessage(msg.Name, TMessageType.Exception, msg.SeqID)); x.Write(oprot); oprot.WriteMessageEnd(); oprot.Transport.Flush(); return true; } fn(msg.SeqID, iprot, oprot); } catch (IOException) { return false; } return true; } public void Upload_Process(int seqid, TProtocol iprot, TProtocol oprot) { Upload_args args = new Upload_args(); args.Read(iprot); iprot.ReadMessageEnd(); Upload_result result = new Upload_result(); result.Success = iface_.Upload(args.Data); oprot.WriteMessageBegin(new TMessage("Upload", TMessageType.Reply, seqid)); result.Write(oprot); oprot.WriteMessageEnd(); oprot.Transport.Flush(); } public void Download_Process(int seqid, TProtocol iprot, TProtocol oprot) { Download_args args = new Download_args(); args.Read(iprot); iprot.ReadMessageEnd(); Download_result result = new Download_result(); result.Success = iface_.Download(args.Path); oprot.WriteMessageBegin(new TMessage("Download", TMessageType.Reply, seqid)); result.Write(oprot); oprot.WriteMessageEnd(); oprot.Transport.Flush(); } public void Remove_Process(int seqid, TProtocol iprot, TProtocol oprot) { Remove_args args = new Remove_args(); args.Read(iprot); iprot.ReadMessageEnd(); Remove_result result = new Remove_result(); result.Success = iface_.Remove(args.Path); oprot.WriteMessageBegin(new TMessage("Remove", TMessageType.Reply, seqid)); result.Write(oprot); oprot.WriteMessageEnd(); oprot.Transport.Flush(); } } #if !SILVERLIGHT [Serializable] #endif public partial class Upload_args : TBase { private byte[] _data; public byte[] Data { get { return _data; } set { __isset.data = true; this._data = value; } } public Isset __isset; #if !SILVERLIGHT [Serializable] #endif public struct Isset { public bool data; } public Upload_args() { } public void Read(TProtocol iprot) { TField field; iprot.ReadStructBegin(); while (true) { field = iprot.ReadFieldBegin(); if (field.Type == TType.Stop) { break; } switch (field.ID) { case 1: if (field.Type == TType.String) { Data = iprot.ReadBinary(); } else { TProtocolUtil.Skip(iprot, field.Type); } break; default: TProtocolUtil.Skip(iprot, field.Type); break; } iprot.ReadFieldEnd(); } iprot.ReadStructEnd(); } public void Write(TProtocol oprot) { TStruct struc = new TStruct("Upload_args"); oprot.WriteStructBegin(struc); TField field = new TField(); if (Data != null && __isset.data) { field.Name =