Saturday, August 6, 2011

How to perform Cross-thread communication in .NET programs

Multithreading is a quite useful technique in programming, especially in complex applications that seem to do a lot of things at the same time. The System.Threading class in .NET provides members that allow us to seamlessly create, stop and abort threads at will. But this article is not about managing threads, there are already several texts available for that. This is about managing a very typical scenario that is faced by a developer working on a multi threaded application.

Quite recently, while developing a network application and using multiple threads, I came across this very problem. The program was basically a TCP Message Server that accepted TCP and UDP requests on the network and replied to them with some processed data. We had one main GUI that acts as the user interface for the server. Now, at the click of a start button, there are two different threads that need to start: a TCPListener and a UDPListener thread. So now, we had three threads:-
  1. The main GUI thread started by the user.
  2. TCPListener thread listening for TCP requests on a port.
  3. UDPListener thread listening for UDP requests on a different port.
Here are some parts of the implementation:
GUI: Basic Server GUI with a Start/Stop button and a text box for last message received.



GUI Code:
public partial class MsgServer : Form
{
private TcpServer Server;

public MsgServer()
{
InitializeComponent();
}

//call back function called when a message is received at the server
private void MsgServer_MessageReceived(object sender, MessageReceivedArgs e)
{
this.txtLastMessage.Text = e.Body;

//Perform any other processing as per the requirements.
//Call Server.SendTcpMessage() function to reply if required.
}

private void MsgServer_Load(object sender, EventArgs e)
{
}

private void MsgServer_FormClosed(object sender, FormClosedEventArgs e)
{
//stop the message server
try
{
Server.StopServer();
}
catch { }
}

private void btnStart_Click(object sender, EventArgs e)
{
if (btnStart.Text == "Start")
{
//create a delegate for the call back function
TcpServer.MessageReceivedDelegate callBack = new TcpServer.MessageReceivedDelegate(MsgServer_MessageReceived);
//start the message server at port 2055
Server.StartServer(Dns.Resolve("localhost").HostName, 2055, callBack,(ISynchronizeInvoke)this);
btnStart.Text = "Stop";
}
else
{
Server.StopServer();
btnStart.Text = "Start";
}
}
}

Server Code:
public class TcpServer
{
public string ServerName; //name of this server
public int PortNo;
//public MessageReceivedArgs LastMessage = null;

private Socket TcpServerSocket;

private Thread ListenTcpThread;
private Thread ListenUdpThread;
private UdpClient Udpc;
private bool IsStarted = false;

public Hashtable TcpClients=new Hashtable(); //all clients (ip-end points) associated with this server

//callback when a new message is received
public delegate void MessageReceivedDelegate(object sender,MessageReceivedArgs e);
public MessageReceivedDelegate MessageReceivedCallback;
public ISynchronizeInvoke Sync;
//public event MessageReceivedDelegate MessageReceived;

//sends a tcp message to a remote host
public void SendTcpMessage(IPEndPoint remoteHost,string command, string messageText)
{
if (!IsStarted)
throw new Exception("Server is not running");
string hostName = Dns.Resolve(remoteHost.Address.ToString()).HostName;
if (hostName == Dns.Resolve("localhost").HostName)
{
hostName = "localhost";
}
TcpClient client = new TcpClient(hostName, remoteHost.Port);
Stream s = client.GetStream();
StreamWriter sw = new StreamWriter(s);
sw.AutoFlush = true;
sw.WriteLine(ServerName + "#" + command + "#" + messageText);
sw.Close();
s.Close();
client.Close();
}

//ping broadcast request to find the specified server
public void Ping(string remoteServer)
{
if (!IsStarted)
throw new Exception("Server is not running");
TcpClient client = new TcpClient(); //broadcast
client.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast, true);
Stream s = client.GetStream();
StreamWriter sw = new StreamWriter(s);
sw.AutoFlush = true;
sw.WriteLine(this.ServerName + "#PING#" + remoteServer);
sw.Close();
s.Close();
client.Close();
}

//udp broadcast request to all servers
public void Broadcast()
{
if (!IsStarted)
throw new Exception("Server is not running");
UdpClient broadcaster = new UdpClient("230.0.0.1", 8899);
byte[] sdata = Encoding.ASCII.GetBytes(this.ServerName + "#BROADCAST#" + this.PortNo.ToString());
broadcaster.Send(sdata, sdata.Length);
}

//udp broadcast to all servers that i'm shutting down
public void ShutdownBroadcast()
{
if (!IsStarted)
throw new Exception("Server is not running");
UdpClient broadcaster = new UdpClient("230.0.0.1", 8899);
byte[] sdata = Encoding.ASCII.GetBytes(this.ServerName + "#SHUTDOWN#" + this.PortNo.ToString());
broadcaster.Send(sdata, sdata.Length);
}
public void StartServer(string serverName, int portNo, MessageReceivedDelegate callBack, ISynchronizeInvoke sync)
{
this.ServerName = serverName;
this.PortNo = portNo;
this.MessageReceivedCallback = callBack;
this.Sync = sync;
//start the tcp thread
ThreadStart ts = new ThreadStart(ListenTcpRequests);
ListenTcpThread = new Thread(ts);
ListenTcpThread.Start();

//start the udp thread
ts = new ThreadStart(ListenUdpRequests);
ListenUdpThread = new Thread(ts);
ListenUdpThread.Start();

IsStarted = true;

//broadcast to all servers in the network
//that i'm listening on this port
System.Threading.Thread.Sleep(250);
Broadcast();
}

public void StopServer()
{
try
{
IsStarted = false;
ListenTcpThread.Abort();
ListenUdpThread.Abort();
//Listener.Stop();
TcpServerSocket.Close();
Udpc.Close();
}
catch { }
}

private void ListenUdpRequests()
{
Udpc = new UdpClient(8899);
Udpc.JoinMulticastGroup(IPAddress.Parse("230.0.0.1"));
IPEndPoint ep=null;
while (true)
{
byte[] rdata = Udpc.Receive(ref ep); //blocks until a request is received
string message = Encoding.ASCII.GetString(rdata);
Debug.WriteLine(message);

//decode the entire message
//SENDER_NAME#COMMAND#BODY
string sender = message.Split("#".ToCharArray())[0];
string command = message.Split("#".ToCharArray())[1];
string body = message.Split("#".ToCharArray())[2];

//read and process the message command

switch (command)
{
case "PING":
if (body == ServerName)
{
SendTcpMessage((ep), "PING_REPLY", "");
}
break;
case "BROADCAST":
{
ep.Port = Convert.ToInt32(body);
SendTcpMessage((ep), "BROADCAST_REPLY", "XXXXX");
if (!TcpClients.Contains(sender))
{
TcpClients.Add(sender, ep);
}
}
break;
case "SHUTDOWN":
{
if (TcpClients.Contains(sender))
{
TcpClients.Remove(sender);
}
}
break;
}
}
}


//method to start from a thread that constantly listens to
//new tcp messages received
private void ListenTcpRequests()
{
try
{
//configure the tcp listener
//Listener = new TcpListener(portNo);
//Listener.Start();
TcpServerSocket = new Socket(IPAddress.Loopback.AddressFamily, SocketType.Stream, ProtocolType.IP);
//IPEndPoint ep = new IPEndPoint(IPAddress.Loopback, PortNo);
IPEndPoint ep = new IPEndPoint(Dns.Resolve("localhost").AddressList[0], PortNo);
TcpServerSocket.Bind(ep);
TcpServerSocket.Listen(10);

while (true)
{
Socket soc = TcpServerSocket.Accept(); //Listener.AcceptSocket(); //blocks until a tcp connection request is received
Stream s = new NetworkStream(soc);
StreamReader sr = new StreamReader(s);
string message = sr.ReadToEnd();
Debug.WriteLine(message);

//decode the entire message
//SENDER_NAME#COMMAND#BODY
string sender = message.Split("#".ToCharArray())[0];
string command = message.Split("#".ToCharArray())[1];
string body = message.Split("#".ToCharArray())[2];
switch (command)
{
case "MESSAGE":
//SendTcpMessage((soc.RemoteEndPoint as IPEndPoint ), "RECEIPT", "");
if (TcpClients.Contains(sender))
{
SendTcpMessage((TcpClients[sender] as IPEndPoint), "RECEIPT", "");
}
break;
case "RECEIPT":
break;
}

//raise the event
MessageReceivedArgs e = new MessageReceivedArgs();
e.Sender = sender;
e.Command = command;
e.Body = body;
//MessageReceived(this, e);
//
object[] items = new object[2];
items[0] = this;
items[1] = e;
//MessageReceivedCallback.Invoke(this, e);
Sync.Invoke(MessageReceivedCallback, items);
//LastMessage = e;

//clean up
sr.Close();
s.Close();
soc.Close();
}
}
catch (Exception ex)
{
Debug.WriteLine("Error occured: " + ex.Message);
}
}
}

public class MessageReceivedArgs : EventArgs
{
public string Sender;
public string Command;
public string Body;
}


Until now, everything worked fine. We started facing the issue when we had to send the data received by the TCP listener to the main GUI for processing and display on the form. In our implementation, the TCPListener function passed the received data through a CallBack function to the main GUI. Since the TCP Listener function resided on a different thread than the main GUI, the main GUI thread could not display any of this data on its text-box, form, etc. We faced the following exception when we tried to display the data on these form components:
Cross-thread operation not valid: Control 'txtMessage' accessed from a thread other than the thread it was created on.
We came to know that the above error occurred on account of the calling thread making using the data returned from the called thread in an inappropriate manner (in this case changing a TextBox value). But this was a practical case where we wanted this cross thread communication done. There were, of course, several other alternatives possible like changing the application to single threaded, using a timer control on the main GUI instead of passing a callback to read the messages, etc. But I was very curious to find a way out for cross-thread communication, since it was quite probable that we will face a similar cross-threading obstacle in some future development too. The solution for this came through an MSDN article about System.ComponentModel.ISynchronizeInvoke. This simple Interface, that allows thread-safe communication between two different threads, so seamlessly solved our problem that I decided to share it with others too.
The solution is quite simple. Apart from passing the Callback delegate to the called function, we also need to pass an ISynchronizeInvoke object. So instead of calling the normal Invoke method of the delegate, we will use this object’s Invoke which will synchronize with the main GUI thread ! Here is our new implementation in the server code:
//MessageReceivedCallback.Invoke(this, e);
Sync.Invoke(MessageReceivedCallback, items);

After the above implementation, our display issue was solved. Hope this will be useful to you. Happy Programming!!

No comments:

Post a Comment