跳转至

发送和接收 Protobuf 消息

发送和接收消息的过程在 TCP 和 WebSocket 标准之间有所不同。 下面,我们详细解释了这两种连接的过程。

使用 TCP

发送消息

要通过 TCP 连接发送消息,请执行以下操作:

  1. 使用您选择的编程语言的官方 Google Protocol Buffer SDK 将 Protobuf 消息更改为字节数组(使用 Protobuf 编码)。

  2. 获取步骤 1 中创建的字节数组的长度。 从此整数创建一个新的字节数组。 反转新的字节数组。

  3. 将新的字节数组和包含原始 Protobuf 消息的字节数组连接起来。

  4. 将连接的数组发送到连接流。

下面的示例演示了如何在官方 Open API SDK 中执行这些步骤。

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

Python 示例

使用 Twisted,Python 示例执行的操作与 C# 示例几乎相同。 client.send(request) 可以解释如下。

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
为了发送消息,Python SDK 使用 Protocol.transport.write() 方法。

读取消息

异步代码

所有 Open API SDK 都依赖于异步执行,这意味着它们不会等待消息到达,而是对动态到达的消息做出反应。 因此,接收消息通常通过事件处理程序完成

要读取消息,您需要执行一系列操作,这些操作与发送消息所需的步骤相反。

  1. 接收字节数组的前四个字节(记住,它们表示消息长度)。 反转这四个字节并将它们转换为整数。

  2. 从流中读取 X 个字节,其中 X 是您在步骤 1 中获得的整数。

  3. 使用 Google Protobuf SDK 将消息反序列化为有效的 ProtoMessage

  4. 使用 ProtoMessage 对象的 payloadType 字段查找其实际类型。 通过 Google Protobuf SDK,将 ProtoMessage 转换为所需的 ProtoOA... 类型的对象。

下面的代码片段展示了官方 Open API SDK 如何处理读取消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

Python 示例

在 Python 示例中,接收消息时的所有字节操作都由 dataReceived() 方法处理,如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

使用 WebSocket

发送消息

要通过 WebSocket 连接发送消息,请执行以下操作:

  1. 将消息序列化为任何合适的数据格式(例如,字符串)。

  2. 将序列化的消息添加到发送队列中。

下面的示例展示了这些操作在官方 Open API SDK 中是如何执行的。

C# SDK 使用 WebsocketClient 类,该类是 Websocket.Client 包的一部分。 如下所示,WebsocketClient.Send() 方法的工作原理如下。

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

如您所见,客户端只是将数组段添加到发送队列中。

Python SDK 不支持 WebSocket 标准。

接收消息

要通过 WebSocket 连接接收消息,请执行以下操作:

  1. 检索从 cTrader 后端接收到的数据。

  2. 将数据反序列化为有效的 Protobuf 消息。

有关官方 Open API SDK 如何处理此操作的示例,请参见下面的代码片段。

要接收消息,WebsocketClient 需要订阅一个回调函数,该函数处理客户端在接受新消息时的操作。

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

在订阅之前,.NET SDK 将消息解析为 Protobuf 消息。 必要的订阅在 ConnectWebSocket() 回调的主体中添加。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

OnNext() 回调中,ProtoMessage 被传递给 MessageFactory.GetMessage() 回调。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

Python SDK 不支持 WebSocket 标准。