Skip to content

Sending/Receiving Protobuf Messages

The process of sending and receiving a message differs between the TCP and WebSocket standards. Below, we explain this process in detail for both types of connections.

Using TCP

Sending a Message

To send a message via a TCP connection, do the following.

  1. Change the Protobuf message to an array of bytes (using the Protobuf encoding) by using the official Google Protocol Buffer SDK for your chosen programming language.

  2. Get the length of the array created during Step 1. Create a new byte array created from this integer. Reverse the new byte array.

  3. Concatenate the new byte array and the byte array containing the original Protobuf message.

  4. Send the concatenated array to the connection stream.

The examples below demonstrate how these steps are performed in the official Open API SDKs.

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

The Python Example

Using Twisted, the Python example performs nearly the same operations as the C# one. The client.send(request) can be explained as follows.

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
For sending the message, the Python SDK uses the Protocol.transport.write() method.

Reading a Message

Asynchronous Code

All Open API SDKs rely on asynchronous execution, meaning that they do not wait for messages to arrive but, instead, react to dynamically arriving messages. As a rresult, receiving a message is typically done via event handlers

To read a message, you will have to perform a sequence of actions that reverses the steps required for sending a message.

  1. Receive the first four bytes of a byte array (remember, they denote the message length). Reverse these four bytes and change them to an integer.

  2. Read X amount of bytes from a stream where X is the integer you have gotten during Step 1.

  3. Use the Google Protobuf SDK to deserialize the message into a valid ProtoMessage.

  4. Use the payloadType field of the ProtoMessage object to find its actual type. Via the Google Protobuf SDK, change the ProtoMessage to an object of the needed ProtoOA... type.

The code snippets below demonstrate how the official Open API SKDs approach reading messages.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

!!! "The Python Example" In the Python example, all operations with bytes on receiving messages are handled by the dataReceived() method as shown below.

```python linenums="1"
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
```
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

Using WebSocket

Sending a Message

To send a message over a WebSocket connection, perform the following actions.

  1. Serialize the message into any suitable data format (e.g., a string).

  2. Add the serialized message to your send queue.

The examples below demonstrate how these actions are performed in the official Open API SDKs.

The C# SDK uses the WebsocketClient class which is a part of the Websocket.Client package. As shown below, the WebsocketClient.Send() method works as follows.

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

As you can see, the client simply adds an array segment to the send queue.

The Python SDK does not support the WebSocket standard.

Receiving a Message

To receive a message over a WebSocket connection, do the following.

  1. Retrieve the data received from the cTrader backend.

  2. Deserialize data into a valid Protobuf message.

For an illustration of how this is done in the official Open API SDKs, see the below snippets.

To receive messages, a WebsocketClient needs to be subscribed to a callback function that handles what the client does on accepting a new message.

1
2
var client = new WebsocketCliebnt();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

Before subscribing, the .NET SDK parses the message into a Protobuf message. The necessary subscriptions are added in the body of the ConnectWebSocket() callback.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebScoket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

In the OnNext() callback, the ProtoMessage is passed to the MessageFactory.GetMessage() callback.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

The Python SDK does not support the WebSocket standard.


Last update: November 30, 2023