Ir para o conteúdo

Enviar e receber mensagens Protobuf

O processo de enviar e receber uma mensagem difere entre os padrões TCP e WebSocket. Abaixo, explicamos este processo em detalhe para ambos os tipos de ligações.

Usar TCP

Enviar uma mensagem

Para enviar uma mensagem via ligação TCP, faça o seguinte:

  1. Transforme a mensagem Protobuf num array de bytes (usando a codificação Protobuf) utilizando o SDK oficial do Google Protocol Buffer para a sua linguagem de programação escolhida.

  2. Obtenha o comprimento do array criado durante o passo 1. Crie um novo array de bytes a partir deste integer. Inverta o novo array de bytes.

  3. Concatene o novo array de bytes e o array de bytes contendo a mensagem Protobuf original.

  4. Envie o array concatenado para o stream de ligação.

Os exemplos abaixo demonstram como estes passos são executados nos SDKs oficiais da Open API.

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

Exemplo Python

Usando Twisted, o exemplo Python executa praticamente as mesmas operações que o C#. O client.send(request) pode ser explicado da seguinte forma.

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
Para enviar a mensagem, o SDK Python utiliza o método Protocol.transport.write().

Ler uma mensagem

Código assíncrono

Todos os SDKs da Open API dependem de execução assíncrona, o que significa que não aguardam a chegada de mensagens mas, em vez disso, reagem a mensagens que chegam dinamicamente. Como resultado, a receção de uma mensagem é normalmente feita através de gestores de eventos

Para ler uma mensagem, terá de executar uma sequência de ações que inverte os passos necessários para enviar uma mensagem.

  1. Receba os primeiros quatro bytes de uma matriz de bytes (lembre-se, eles indicam o comprimento da mensagem). Inverta estes quatro bytes e converta-os num número inteiro.

  2. Leia X bytes de um fluxo onde X é o número inteiro que obteve durante o passo 1.

  3. Utilize o SDK Google Protobuf para desserializar a mensagem numa ProtoMessage válida.

  4. Utilize o campo payloadType do objeto ProtoMessage para encontrar o seu tipo real. Através do SDK Google Protobuf, converta a ProtoMessage num objeto do tipo ProtoOA... necessário.

Os excertos de código abaixo demonstram como os SDKs oficiais da Open API abordam a leitura de mensagens.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

Exemplo Python

No exemplo Python, todas as operações com bytes na receção de mensagens são geridas pelo método dataReceived() como mostrado abaixo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

Utilizar WebSocket

Enviar uma mensagem

Para enviar uma mensagem através de uma ligação WebSocket, execute as seguintes ações:

  1. Serialize a mensagem em qualquer formato de dados adequado (por exemplo, uma string).

  2. Adicione a mensagem serializada à sua fila de envio.

Os exemplos abaixo demonstram como estas ações são executadas nos SDKs oficiais da Open API.

O SDK C# utiliza a classe WebsocketClient que faz parte do pacote Websocket.Client. Como mostrado abaixo, o método WebsocketClient.Send() funciona da seguinte forma.

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

Como pode ver, o cliente simplesmente adiciona um segmento de matriz à fila de envio.

O SDK Python não suporta o padrão WebSocket.

Receber uma mensagem

Para receber uma mensagem através de uma ligação WebSocket, faça o seguinte:

  1. Recupere os dados recebidos do backend do cTrader.

  2. Desserialize os dados numa mensagem Protobuf válida.

Para uma ilustração de como isto é feito nos SDKs oficiais da Open API, veja os excertos abaixo.

Para receber mensagens, um WebsocketClient precisa de estar subscrito a uma função de callback que gere o que o cliente faz ao aceitar uma nova mensagem.

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

Antes de subscrever, o SDK .NET analisa a mensagem numa mensagem Protobuf. As subscrições necessárias são adicionadas no corpo do callback ConnectWebSocket().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

No callback OnNext(), a ProtoMessage é passada para o callback MessageFactory.GetMessage().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

O SDK Python não suporta o padrão WebSocket.