天天看點

C#流轉輸分段上傳

wcf 流傳輸上傳檔案不能分段上傳的問題已解決。

技巧就是上傳的流參數不用FileStream 用MemoryStream即可。

以下是代碼:

服務契約類:

[ServiceContract()]
    public interface IMyServer
    {
        [OperationContract]
        void UploadFile(UploadPartMessage msg);


      
    }
           

消息契約:

[MessageContract]
    public class UploadPartMessage
    {
        [MessageHeader]
        public string FileName { get; set; }
        [MessageHeader]
        public long FileLength { get; set; }

        /// <summary>
        /// 儲存上傳的資料
        /// </summary>
        [MessageBodyMember]
        public Stream Stream { get; set; }

        /// <summary>
        /// 目前分塊的大小
        /// </summary>
           [MessageHeader]
        public int PartSize { get; set; }
       
        /// <summary>
        /// 總分塊數
        /// </summary>
        [MessageHeader]
        public int TotalPartCount { get; set; }
           [MessageHeader]
        public int PartID { get; set; }
        [MessageHeader]
        public bool IsLastPart { get; set; }

        /// <summary>
        /// 分塊在檔案中的位置
        /// </summary>
        [MessageHeader]
        public long Position { get; set; }

       
    }
           

服務的實作

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall, IncludeExceptionDetailInFaults = true)]
    class MyServer : IMyServer
    {




        string baseDirectory =Path.Combine(AppDomain.CurrentDomain.BaseDirectory,"server")  ;
        public void UploadFile(UploadPartMessage msg)
        {
            Console.WriteLine("已經接收Part: "+msg.PartID);
            string partFile = Path.Combine(baseDirectory, string.Format("{0}_{1}.prt", Path.GetFileNameWithoutExtension(msg.FileName), msg.PartID));
            using (FileStream fs = File.OpenWrite(partFile))
            {
                msg.Stream.CopyTo(fs);
            }
         

            //最後一塊上傳就合并
            if (msg.IsLastPart)
            {
                _mareg(msg);
            }
        }
        private void _mareg(UploadPartMessage msg)
        {
            FileInfo[] files = new DirectoryInfo(baseDirectory).GetFiles(Path.GetFileNameWithoutExtension(msg.FileName) + "_*");
            if (files.Length == msg.TotalPartCount)
            {
                Console.WriteLine("準備合并");
                Array.Sort(files, new FileInfoScoreComparer());
                using (FileStream fWrite = File.OpenWrite(Path.Combine(baseDirectory, Path.GetFileName(msg.FileName))))
                {
                    foreach (FileInfo file in files)
                    {
                        using (FileStream stream = file.OpenRead())
                        {
                            byte[] bs = new byte[stream.Length];

                            stream.Read(bs, 0, bs.Length);

                            fWrite.Write(bs, 0, bs.Length);


                        }
                    }
                    fWrite.Flush();
                    fWrite.Close();
                }
            }
        }

    }
           

服務的實作:

AppDomain.CreateDomain("server").DoCallBack(delegate()
            {
                Console.WriteLine("正在開戶服務 ");

                //服務 abc
                ContractDescription contract = ContractDescription.GetContract(typeof(IMyServer));
                EndpointAddress address = new EndpointAddress("net.tcp://localhost:8880/myserver");
                NetTcpBinding binding = new NetTcpBinding();
                binding.MaxReceivedMessageSize = 1024 * 1024 * 30;
                binding.ReceiveTimeout = TimeSpan.FromMinutes(5);
                binding.SendTimeout = TimeSpan.FromMinutes(5);
                binding.TransferMode = TransferMode.Streamed;
                ServiceEndpoint endpoint = new ServiceEndpoint(contract, binding, address);

                ServiceHost host = new ServiceHost(typeof(MyServer));
                host.AddServiceEndpoint(endpoint);
                host.OpenTimeout = TimeSpan.FromSeconds(3);
                host.Opened += host_Opened;
                host.UnknownMessageReceived += host_UnknownMessageReceived;

                host.Open();
                ;
            });
           

實作端以下使用BackgroundWorker異步實作

BackgroundWorker bw = new BackgroundWorker();
            bw.WorkerReportsProgress = true;//可顯示進度
            bw.WorkerSupportsCancellation = true;//可異步取消
            bw.ProgressChanged += bw_ProgressChanged;   //進度事件
            bw.RunWorkerCompleted += bw_RunWorkerCompleted;     //完成事件
            bw.DoWork += bw_DoWork;     //主要執行的任務

            bw.RunWorkerAsync();


            Console.ReadKey();
           
/// <summary>
        /// 上傳線程數
        /// </summary>
        static int threadCount = 5;
        static int partSize = 1024 * 100;
        static string fileSource = @"D:\360安全浏覽器下載下傳\c#開發Android應用實戰.pdf";
        private static void bw_DoWork(object sender, DoWorkEventArgs e)
        {
            Console.WriteLine("開始上傳");
            UploadPartMessage[] usmList;
            ConcurrentQueue<UploadPartMessage> pars = _getParts(fileSource,partSize ) ;
         
            
            UploadPartMessage lastPart;
            pars.TryDequeue(out lastPart);
            lastPart.IsLastPart = true;
            Task[] ts = new Task[threadCount]; ;
            for (int i = 0; i < threadCount; i++)
            {
                ts[i] = Task.Run(() => _uploadMethod(pars, sender as BackgroundWorker));
            }

            Task.WaitAll(ts);

            //上傳最後一個part

            pars.Enqueue(lastPart);
            _uploadMethod(pars, sender as BackgroundWorker);
          
            (sender as BackgroundWorker).ReportProgress(100);

        }
  
        static  string clientDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "client");
        private static void _uploadMethod(ConcurrentQueue<UploadPartMessage> pars, BackgroundWorker bw)
        {
            IMyServer channel = ChannelFactory<IMyServer>.CreateChannel
            (new NetTcpBinding() { TransferMode = TransferMode.Streamed },
            new EndpointAddress("net.tcp://localhost:8880/myserver"));
            using (channel as IDisposable)
            {
                using (FileStream fs = File.OpenRead(fileSource))
                {
                    while (pars.Count > 0)
                    {
                        if (bw.CancellationPending)
                            break;
                        UploadPartMessage msg;
                        if (pars.TryDequeue(out msg))
                        {
                            fs.Position = msg.Position;

                            byte[] mBs = new byte[msg.PartSize];
                            fs.Read(mBs, 0, msg.PartSize);
                            msg.Stream =  new MemoryStream(mBs) ;
                            channel.UploadFile(msg);
                            Array.Clear(mBs, 0, mBs.Length);
                            bw.ReportProgress((msg.TotalPartCount - pars.Count) / msg.TotalPartCount*100);//進度通知 
                        }
                       
                    }
                }
            }





        }
        private static ConcurrentQueue<UploadPartMessage> _getParts(string fileSource, int partSize)
        {
            FileInfo fs = new FileInfo(fileSource);

            //擷取part分組
            List<Tuple<long, long>> list = Partitioner.Create(0, fs.Length, partSize).GetDynamicPartitions().ToList();
            ConcurrentQueue<UploadPartMessage> pars = new ConcurrentQueue<UploadPartMessage>();
            for (int i = 0; i < list.Count; i++)
            {
                Tuple<long, long> range = list[i];
                partSize = (int)(range.Item2 - range.Item1);

                UploadPartMessage usm = new UploadPartMessage()
                {
                    FileName = fs.Name,
                    FileLength = fs.Length,
                    TotalPartCount = list.Count,
                    PartSize = partSize,
                    PartID = i + 1,
                    Position=range.Item1
                };
                pars.Enqueue(usm);
            }
            return pars;
        }