Langkau tajuk talian

Hantar dan terima mesej Protobuf

Proses menghantar dan menerima mesej berbeza antara piawaian TCP dan WebSocket. Di bawah, kami menerangkan proses ini secara terperinci untuk kedua-dua jenis sambungan.

Gunakan TCP

Hantar mesej

Untuk menghantar mesej melalui sambungan TCP, lakukan yang berikut:

  1. Tukar mesej Protobuf kepada tatasusunan bait (menggunakan penyiratan Protobuf) dengan menggunakan SDK Google Protocol Buffer rasmi untuk bahasa pengaturcaraan pilihan anda.

  2. Dapatkan panjang tatasusunan yang dicipta semasa langkah 1. Cipta tatasusunan bait baharu dari integer ini. Balikkan tatasusunan bait baharu.

  3. Gabungkan tatasusunan bait baharu dan tatasusunan bait yang mengandungi mesej Protobuf asal.

  4. Hantar tatasusunan yang digabungkan ke aliran sambungan.

Contoh di bawah menunjukkan bagaimana langkah-langkah ini dilakukan dalam SDK Open API rasmi.

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

Python example

Menggunakan Twisted, contoh Python melakukan hampir operasi yang sama seperti C#. client.send(request) boleh dijelaskan seperti berikut.

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
Untuk menghantar mesej, SDK Python menggunakan kaedah Protocol.transport.write().

Baca mesej

Kod Asinkron

Semua SDK Open API bergantung pada pelaksanaan asinkron, bermaksud mereka tidak menunggu mesej tiba tetapi sebaliknya bertindak balas terhadap mesej yang tiba secara dinamik. Akibatnya, menerima mesej biasanya dilakukan melalui pengendali acara

Untuk membaca mesej, anda perlu melakukan urutan tindakan yang membalikkan langkah yang diperlukan untuk menghantar mesej.

  1. Terima empat bait pertama dari tatasusunan bait (ingat, mereka menandakan panjang mesej). Balikkan empat bait ini dan tukarkannya kepada integer.

  2. Baca X jumlah bait dari aliran di mana X adalah integer yang anda dapatkan dalam langkah 1.

  3. Gunakan SDK Google Protobuf untuk menyahserialkan mesej menjadi ProtoMessage yang sah.

  4. Gunakan medan payloadType dari objek ProtoMessage untuk mencari jenis sebenarnya. Melalui SDK Google Protobuf, tukar ProtoMessage kepada objek jenis ProtoOA... yang diperlukan.

Coretan kod di bawah menunjukkan bagaimana SDK Open API rasmi mendekati pembacaan mesej.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

Python example

Dalam contoh Python, semua operasi dengan bait pada penerimaan mesej dikendalikan oleh kaedah dataReceived() seperti yang ditunjukkan di bawah.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

Gunakan WebSocket

Hantar mesej

Untuk menghantar mesej melalui sambungan WebSocket, lakukan tindakan berikut:

  1. Serialkan mesej ke dalam format data yang sesuai (contohnya, rentetan).

  2. Tambahkan mesej yang diserialkan ke dalam barisan hantar anda.

Contoh di bawah menunjukkan bagaimana tindakan ini dilakukan dalam SDK Open API rasmi.

SDK C# menggunakan kelas WebsocketClient yang merupakan sebahagian daripada pakej Websocket.Client. Seperti yang ditunjukkan di bawah, kaedah WebsocketClient.Send() berfungsi seperti berikut.

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

Seperti yang anda lihat, klien hanya menambah segmen tatasusunan ke barisan hantar.

SDK Python tidak menyokong piawaian WebSocket.

Terima mesej

Untuk menerima mesej melalui sambungan WebSocket, lakukan yang berikut:

  1. Ambil data yang diterima dari backend cTrader.

  2. Sahserialkan data menjadi mesej Protobuf yang sah.

Untuk ilustrasi bagaimana ini dilakukan dalam SDK Open API rasmi, lihat coretan di bawah.

Untuk menerima mesej, WebsocketClient perlu dilanggan kepada fungsi panggilan balik yang mengendalikan apa yang dilakukan klien pada penerimaan mesej baru.

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

Sebelum melanggan, SDK .NET mengurai mesej menjadi mesej Protobuf. Langganan yang diperlukan ditambah dalam badan panggilan balik ConnectWebSocket().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

Dalam panggilan balik OnNext(), ProtoMessage diserahkan kepada panggilan balik MessageFactory.GetMessage().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

SDK Python tidak menyokong piawaian WebSocket.