打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
回调的线程关联性

对于一般的多线程操作,比如异步地进行基于文件系统的IO操作;异步地调用Web Service;或者是异步地进行数据库访问等等,是和具体的线程无关的。也就是说,对于这些操作,任意创建一个新的线程来执行都是等效的。但是有些情况下,有些操作却只能在固定的线程下执行。比如,在GUI应用下,对控件的访问就需要在创建该控件的线程下执行;或者我们在某个固定的线程中通过TLSThread Local Storage)设置了一些Context信息,供具体的操作使用,我们把操作和某个固定的线程的依赖称为线程关联性(Thread Affinity)。在这种情况下,我们的异步操作就需要被Marshal到固定的线程执行。在WCF并发或者Callback的情况下也具有这样的基于线程关联性的问题。

一、从基于Windows Application客户端的WCF回调失败谈起

"我的WCF之旅"系列文章中,有一篇(WinForm Application中调用Duplex Service出现TimeoutException的原因和解决方案)专门介绍在一个Windows Application客户端应用通过WCF Duplex通信方式进行回调失败的文章.我们今天以此作为出发点介绍WCFThread Affinity下的表现和解决方案.

我们来创建一个WCF的应用来模拟该场景客户端是一个基于Windows Form应用完成一个计算器的功能用户输入操作数,点击"计算"按钮后台通过调用WCF service, 并传递一个用于显示计算结果的Callback对象; service进行相应的计算得到最后的运算结果,调用该Callback对象将运算结果显示到客户端界面.这是我们的WCF四层结构:

 

1ContractICalculate & ICalculateCallback

  1: namespace Artech.ThreadAffinity.Contracts

  2: {

  3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]

  4:     public interface ICalculate

  5:     {

  6:         [OperationContract]

  7:         void Add(double op1, double op2);

  8:     }

  9: }

这是Service Contract,下面是Callback Contract,用于显示运算结果:

  1: namespace Artech.ThreadAffinity.Contracts

  2: {

  3:    public interface ICalculateCallback

  4:     {

  5:         [OperationContract]

  6:         void DisplayResult(double result);

  7:     }

  8: }

2ServiceCalculateService

  1: namespace Artech.ThreadAffinity.Services

  2: {

  3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]

  4:     public class CalculateService:ICalculate

  5:     {

  6:         public static ListBox DisplayPanel

  7:         { get; set; }        

  8: 

  9:         #region ICalculate Members

 10: 

 11:         public void Add(double op1, double op2)

 12:         {

 13:             double result = op1 + op2;

 14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();           

 15: 

 16: DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));

 17: 

 18:             callback.DisplayResult(result);

 19:         }

 20: 

 21:         #endregion

 22:     }

 23: }


由于需要进行callback, 我们把ConcurrencyMode 设为Reentrant。当得到运算的结果后,通过OperationContext.Current.GetCallbackChannel得到callback对象,并调用之。还有一点需要提的是,该service是通过一个Windows Form application进行host的。并且有一个ListBox列出所有service执行的结果,就像这样: 

 

3Hosting 

Hosting的代码写在FormLoad事件中: 

  1: private void HostForm_Load(object sender, EventArgs e)

  2: {   

  3:     this._serviceHost = new ServiceHost(typeof(CalculateService));

  4:     CalculateService.DisplayPanel = this.listBoxResult;

  5:     CalculateService.SynchronizationContext = SynchronizationContext.Current;

  6:     this._serviceHost.Opened += delegate

  7:     {

  8: this.Text = "The calculate service has been started up!";

  9:     };

 10: 

 11:     this._serviceHost.Open();

 12: }

我们注意到了CalculateService使用到的用于显示所有预算结果的ListBox就是在这了通过static property传递的。

这么配置文件

  1: <configuration>

  2:     <system.serviceModel>

  3:         <services>

  4:             <service name="Artech.ThreadAffinity.Services.CalculateService">

  5:                 <endpoint binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" />

  6:                 <host>

  7:                     <baseAddresses>

  8:                         <add baseAddress="net.tcp://127.0.0.1:8888/calculateservice" />

  9:                     </baseAddresses>

 10:                 </host>

 11:             </service>

 12:         </services>

 13:     </system.serviceModel>

 14: </configuration>

4Client

Client的界面很简单:输入两个操作数,点击=按钮,将运算结果显示出来。

 

先来看看client端对callback contract的实现:

  1: namespace Clients

  2: {

  3:     public class CalculateCallback : ICalculateCallback

  4:     {

  5:         public static TextBox ResultPanel;              

  6: 

  7:         #region ICalculateCallback Members

  8: 

  9:         public void DisplayResult(double result)

 10:         {

 11:             ResultPanel.Text = result.ToString();

 12:         }        

 13: 

 14:         #endregion

 15:     }

 16: }

这是配置:

  1: <configuration>

  2:     <system.serviceModel>

  3:         <client>

  4:             <endpoint address="net.tcp://127.0.0.1:8888/calculateservice"

  5:                 binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate"

  6:                 name="calculateservice" />

  7:         </client>

  8:     </system.serviceModel>

  9: </configuration>

然后是我们=按钮的单击事件对运算的实现: 

  1: private void buttonCalculate_Click(object sender, EventArgs e)

  2: {

  3:     CalculateCallback.ResultPanel = this.textBoxResult;

  4:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");

  5:     ICalculate calculator = channelFactory.CreateChannel();

  6:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));

  7: }

CalculateCallback 用于显示运算结果的TextBox通过statis property实现传递。这个实现很简单,貌似没有什么问题,但是我们运行程序,在客户端就会抛出这样的exception。可以看出是一个TimeoutException

 

二、是什么导致TimeoutException

我们现在来分析是什么导致了TimeoutException的抛出。原因很简单:由于我们对service的调用的是在UI 线程调用的,所以在开始调用到最终得到结果,这个UI Thread会被锁住;但是当service进行了相应的运算的到运算的结果后,需要调用callback对象对client进行回调,默认的情况下,Callback的执行是在UI线程执行的。当Callback试图执行的时候,发现UI 线程被锁,只能等待。这样形成一个死锁,UI线程需要等待CalculateService执行返回后才能解锁,而CalculateService需要Callback执行完成;而Callback需要等到UI线程解锁才能执行。

基于上门的原因,我们有两种解决方案:

· CalculateService不必等到Callback执行完成就返回,我们可以通过异步调用Callback。或者让Client异步方式调用CalculateService,以便及时释放UI线程,我们可以通过One-way的方式来进行service的调用。

· 让Callback的执行不必绑定到UI线程

三、解决方案一:通过异步调用或者One-way回调

为了简单起见,我们通过ThreadPool实现了异步回调:

  1: public void Add(double op1, double op2)

  2: {

  3:     double result = op1 + op2;

  4:     ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>(); 

  5: 

  6:     ThreadPool.QueueUserWorkItem(delegate{ callback.DisplayResult(result); }, null);

  7: }

这是一种方案,另一种是将Add操作设成One-way的:

  1: namespace Artech.ThreadAffinity.Contracts

  2: {

  3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]

  4:     public interface ICalculate

  5:     {

  6:         [OperationContract(IsOneWay = true)]

  7:         void Add(double op1, double op2);

  8:     }

  9: }

这两种方案都可以解决问题。

四、方案二、通过解除Callback操作和UI线程的关联性

现在我们才进入我们今天讨论的主题:WCF并发操作的线程关联性问题。在这之前,我们需要了解一个重要的对象:SynchonizationContextSystem.Threading.SynchronizationContext)。SynchonizationContext就是为了解决这种线程关联性问题而设计的。SynchonizationContext提供了两个主要的API将操作和对应的Thread关联:PostSend

  1: public virtual void Post(SendOrPostCallback d, object state)

  2: public virtual void Send(SendOrPostCallback d, object state)

SendPost分别以同步和异步的方式将以Delegate表示的具体的操作和SynchonizationContext对象对应的Thread关联,而SendOrPostCallback delegate对象代表你需要的线程关联操作,state代表传入delegate的参数:

public delegate void SendOrPostCallback(object state); 

对于某些具有线程关联的应用,比如Windows Form application,在程序启动的时候,会设置当前的SynchonizationContext对象(Windows Form application使用的是继承了SynchonizationContextWindowsFormsSynchronizationContext System.Windows.Forms.WindowsFormsSynchronizationContext)。当前SynchonizationContext被成功初始化后,你就可以通过SynchonizationContext的静态属性Current得到它。在你自己的应用中,如何有需要,你也可以自定义SynchonizationContext,并通过静态方法SetSynchronizationContext将其设置为current SynchronizationContext

对应WCF来说,无论是host一个service,还是在调用service时制定callback,在默认的情况下,servicecallback的操作将自动和当前的SynchonizationContext进行关联(如何有的话)。也就是说,如过我们的servicehostWindows Form application下,那么service的操作将在UI 线程下执行;同理,如何我们在一个Windows Forms UI线程下调用duplex service并制定callback,那么callback的最终执行将在UI线程。

关于WCF对线程关联性的控制,可以通过ServiceBehavior或者CallbackBehaviorUseSynchronizationContext属性进行设定,该属性默认为true,这正式WCF默认具有线程关联性的原因。

现在我们来实现我们的第二套方案:让Callback的执行不必绑定到UI线程。为此我们只需要加上如何的CallbackBehavior attribute就可以了。

  1: namespace Artech.ThreadAffinity.Clients

  2: {

  3:     [CallbackBehavior(UseSynchronizationContext = false)]

  4:     public class CalculateCallback : ICalculateCallback

  5:     {

  6:         public static TextBox ResultPanel;

  7: 

  8:         #region ICalculateCallback Members

  9: 

 10:         public void DisplayResult(double result)

 11:         {

 12:             ResultPanel.Text = result.ToString();

 13: 

 14:         }

 16:         #endregion

 17:     }

 18: }

 19: 

但是现在我们运行我们的程序,将会出现如下的InvalidOperation异常:

 

原因很简单,由于我们将callbaclkUseSynchronizationContext 设置成false,那么callback的操作将不会再UI线程下执行。但是我们需要运算的结果输入到UITextBox上,对UI上控件的操作需要在UI线程上执行,显然会抛出异常了。

为了我们引入SynchonizationContextCalculateCallback中:将SynchonizationContext定义成一个static属性,通过Post方法异步地实现对运算结果的显示。

  1: namespace Artech.ThreadAffinity.Clients

  2: {

  3:     [CallbackBehavior(UseSynchronizationContext = false)]

  4:     public class CalculateCallback : ICalculateCallback

  5:     {

  6:         public static TextBox ResultPanel;

  7:        public static SynchronizationContext SynchronizationContext;

  8: 

  9:         #region ICalculateCallback Members

 10: 

 11:         public void DisplayResult(double result)

 12:         {

 13:              SynchronizationContext.Post(delegate { ResultPanel.Text = result.ToString(); }, null);          

 14:         }       

 15: 

 16:         #endregion

 17:     }

 18: }

SynchonizationContext在调用service的时候指定:

  1: private void buttonCalculate_Click(object sender, EventArgs e)

  2: {

  3:     CalculateCallback.ResultPanel = this.textBoxResult;

  4:     CalculateCallback.SynchronizationContext = SynchronizationContext.Current;

  5: 

  6:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");

  7:     ICalculate calculator = channelFactory.CreateChannel();

  8:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));

  9: }

 

现在我们程序能够正常运行了。

五、另一种可选方案:通过ISynchronizeInvokeInvoke/BeginInvoke

熟悉Windows Form编程的读者应该都知道,WinForm空间的基类ControlSystem.Windows.Forms.Control)都实现了System.ComponentModel.ISynchronizeInvoke接口,而ControlISynchronizeInvoke的实现就是为了解决Control的操作必须在创建Control线程的问题,ISynchronizeInvoke定义InvokeBeginInvoke方法方面我们以同步或者异步的方式操作Control

  1: public interface ISynchronizeInvoke

  2: {

  3:     // Methods

  4:     [HostProtection(SecurityAction.LinkDemand, Synchronization=true, ExternalThreading=true)]

  5:     IAsyncResult BeginInvoke(Delegate method, object[] args);

  6:     object EndInvoke(IAsyncResult result);

  7:     object Invoke(Delegate method, object[] args);

  8: 

  9:     // Properties

 10:     bool InvokeRequired { get; }

 11: }

 12: 

如何我们放弃基于SynchonizationContext的解决方案,我们也可以通过基于ISynchronizeInvoke的方式来解决这个问题。为此我们这样定义CalculateCallback

  1: namespace Artech.ThreadAffinity.Clients

  2: {

  3:     [CallbackBehavior(UseSynchronizationContext = false)]

  4:     public class CalculateCallback : ICalculateCallback

  5:     {

  6:         public static TextBox ResultPanel;

  7:         public delegate void DisplayResultDelegate(TextBox resultPanel, double result);

  8: 

  9:         #region ICalculateCallback Members

 10: 

 11:         public void DisplayResult(double result)

 12:         {

 13:             DisplayResultDelegate displayResultDelegate = new DisplayResultDelegate(DisplayResult);

 14:            ResultPanel.BeginInvoke(displayResultDelegate, new object[] { ResultPanel, result });                  

 15:         }

 16: 

 17:         private void DisplayResult(TextBox resultPanel, double result)

 18:         {

 19:             resultPanel.Text = result.ToString();

 20:         }

 21: 

 22:         #endregion

 23:     }

 24: }

 25: 

由于BeginInvoke方式只能接受一个具体的delegate对象(不能使用匿名方法),所以需要定义一个具体的DelegateDisplayResultDelegate)和对应的方法(DisplayResult),参数通过一个object[]传入。

从本质上将,这两种方式的实现完全是一样的,如何你查看System.Windows.Forms.WindowsFormsSynchronizationContext的代码,你会发现其SendPost方方法就是通过调用InvokeBeginInvoke方式实现的。

六、Service Hosting的线程关联性

我们花了很多的精力介绍了WCF Duplex通信中Callback操作的线程关联性问题,实际上我们使用到更多的还是service操作的线程关联性问题。就以我们上面的程序为例,我们通过一个Windows Form applicationhost我们的service,并且要求service的运算结束后将结果输出到server端的Window formListBox中,对ListBox的操作肯定需要的Host程序的UI线程中执行。

按照我们一般的想法,我们的Service面向若干client,肯定是并发的接收client端的请求,以多线程的方式执行service的操作,那么操作中UI 控件的操作肯定会出现错误。

我们的程序依然可以正常运行,其根本原因是WCFservice操作默认实现了对Host service的当前线程的SynchonizationContext实现了关联。与Callback操作的线程关联性通过CallbackBehaviorUseSynchronizationContext 进行控制一样,service的线程关联性通过ServiceBehavirUseSynchronizationContext 进行设定。UseSynchronizationContext 的默认值为true

如何我们将CalculateServiceUseSynchronizationContext 设为false

  1: namespace Artech.ThreadAffinity.Services

  2: {

  3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]

  4:     public class CalculateService:ICalculate

  5:     {

  6:         public static ListBox DisplayPanel

  7:         { get; set; }      

  8: 

  9:         #region ICalculate Members

 10: 

 11:         public void Add(double op1, double op2)

 12:         {

 13:             double result = op1 + op2;

 14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();           

 15: 

 16:            DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));

 17: 

 18:             callback.DisplayResult(result);

 19:         }

 20: 

 21:         #endregion

 22:     }

 23: }

 24: 

control被不是创建它的线程操作,肯定会抛出一个InvalidOperationException,就像这样:

 

我们一样可以通过SynchonizationContext或者ISynchronizeInvoke的方式来解决这样的问题,我们只讨论前面一种,为此我们改变了CalculateService的定义:通过SynchonizationContextPost方法实现对ListBox的访问。

  1: namespace Artech.ThreadAffinity.Services

  2: {

  3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]

  4:     public class CalculateService:ICalculate

  5:     {

  6:         public static ListBox DisplayPanel

  7:         { get; set; }

  8: 

  9:         public static SynchronizationContext SynchronizationContext

 10:         { get; set; }

 11: 

 12:         #region ICalculate Members

 13: 

 14:         public void Add(double op1, double op2)

 15:         {

 16:             double result = op1 + op2;

 17:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();

 18:            SynchronizationContext.Post(delegate

 19:             {

 20:                 DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));

 21:             }, null);

 22: 

 23:             callback.DisplayResult(result);           

 24:         }

 25: 

 26:         #endregion

 27:     }

 

 28: }

 29: 

通过static属性定义的SynchonizationContexthost的时候指定:

  1: private void HostForm_Load(object sender, EventArgs e)

  2: {   

  3:     this._serviceHost = new ServiceHost(typeof(CalculateService));

  4:     CalculateService.DisplayPanel = this.listBoxResult;

  5:    CalculateService.SynchronizationContext = SynchronizationContext.Current;

  6:     this._serviceHost.Opened += delegate

  7:     {

  8: this.Text = "The calculate service has been started up!";

  9:     };

 10: 

 11:     this._serviceHost.Open();

 12: }

 13: 

这样我们的程序又可以正常运行了

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
线程关联性(Thread Affinity)对WCF并发访问的影响
异步编程 In .NET
WCF其他参考(异步、异常和错误处理、MSMQ)
WCF中的异步调用
浅谈Excel开发:十 Excel 开发中与线程相关的若干问题
搞懂 SynchronizationContext(第一部分)【翻译】
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服