Bỏ qua

Gửi và nhận tin nhắn Protobuf

Quá trình gửi và nhận một tin nhắn khác nhau giữa các tiêu chuẩn TCP và WebSocket. Bên dưới, chúng tôi giải thích quá trình này chi tiết cho cả hai loại kết nối.

Sử dụng TCP

Gửi tin nhắn

Để gửi tin nhắn qua kết nối TCP, hãy thực hiện các bước sau:

  1. Chuyển đổi tin nhắn Protobuf thành một mảng byte (sử dụng mã hóa Protobuf) bằng cách sử dụng SDK Google Protocol Buffer chính thức cho ngôn ngữ lập trình bạn chọn.

  2. Lấy độ dài của mảng được tạo trong bước 1. Tạo một mảng byte mới từ số nguyên này. Đảo ngược mảng byte mới.

  3. Nối mảng byte mới và mảng byte chứa tin nhắn Protobuf gốc.

  4. Gửi mảng nối đến luồng kết nối.

Các ví dụ bên dưới minh họa cách các bước này được thực hiện trong các SDK Open API chính thức.

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

Ví dụ Python

Sử dụng Twisted, ví dụ Python thực hiện gần như các thao tác giống như ví dụ C#. client.send(request) có thể được giải thích như sau.

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
Để gửi tin nhắn, SDK Python sử dụng phương thức Protocol.transport.write().

Đọc tin nhắn

Mã không đồng bộ

Tất cả các SDK Open API dựa trên thực thi không đồng bộ, có nghĩa là chúng không chờ tin nhắn đến mà thay vào đó phản ứng với các tin nhắn đến động. Kết quả là, việc nhận một tin nhắn thường được thực hiện thông qua các trình xử lý sự kiện

Để đọc một tin nhắn, bạn sẽ phải thực hiện một chuỗi hành động đảo ngược các bước cần thiết để gửi tin nhắn.

  1. Nhận bốn byte đầu tiên của một mảng byte (hãy nhớ, chúng biểu thị độ dài tin nhắn). Đảo ngược bốn byte này và chuyển đổi chúng thành một số nguyên.

  2. Đọc X lượng byte từ một luồng trong đó X là số nguyên bạn đã nhận được trong bước 1.

  3. Sử dụng SDK Google Protobuf để giải tuần tự hóa tin nhắn thành một ProtoMessage hợp lệ.

  4. Sử dụng trường payloadType của đối tượng ProtoMessage để tìm kiểu thực tế của nó. Thông qua SDK Google Protobuf, chuyển đổi ProtoMessage thành một đối tượng của kiểu ProtoOA... cần thiết.

Các đoạn mã bên dưới minh họa cách các SDK Open API chính thức tiếp cận việc đọc tin nhắn.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

Ví dụ Python

Trong ví dụ Python, tất cả các thao tác với byte khi nhận tin nhắn được xử lý bởi phương thức dataReceived() như được hiển thị bên dưới.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

Sử dụng WebSocket

Gửi tin nhắn

Để gửi tin nhắn qua kết nối WebSocket, hãy thực hiện các hành động sau:

  1. Tuần tự hóa tin nhắn thành bất kỳ định dạng dữ liệu phù hợp nào (ví dụ: một chuỗi).

  2. Thêm tin nhắn đã tuần tự hóa vào hàng đợi gửi của bạn.

Các ví dụ bên dưới minh họa cách các hành động này được thực hiện trong các SDK Open API chính thức.

SDK C# sử dụng lớp WebsocketClient là một phần của gói Websocket.Client. Như được hiển thị bên dưới, phương thức WebsocketClient.Send() hoạt động như sau.

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

Như bạn có thể thấy, client chỉ đơn giản thêm một phân đoạn mảng vào hàng đợi gửi.

SDK Python không hỗ trợ tiêu chuẩn WebSocket.

Nhận tin nhắn

Để nhận tin nhắn qua kết nối WebSocket, hãy thực hiện các bước sau:

  1. Truy xuất dữ liệu nhận được từ backend của cTrader.

  2. Giải tuần tự hóa dữ liệu thành một tin nhắn Protobuf hợp lệ.

Để minh họa cách điều này được thực hiện trong các SDK Open API chính thức, hãy xem các đoạn mã bên dưới.

Để nhận tin nhắn, một WebsocketClient cần được đăng ký vào một hàm callback xử lý những gì client làm khi chấp nhận một tin nhắn mới.

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

Trước khi đăng ký, SDK .NET phân tích tin nhắn thành một tin nhắn Protobuf. Các đăng ký cần thiết được thêm vào phần thân của callback ConnectWebSocket().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

Trong callback OnNext(), ProtoMessage được chuyển đến callback MessageFactory.GetMessage().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

SDK Python không hỗ trợ tiêu chuẩn WebSocket.