Lewati ke isi

Kirim dan terima pesan Protobuf

Proses mengirim dan menerima pesan berbeda antara standar TCP dan WebSocket. Di bawah ini, kami menjelaskan proses ini secara detail untuk kedua jenis koneksi.

Gunakan TCP

Kirim pesan

Untuk mengirim pesan melalui koneksi TCP, lakukan hal berikut:

  1. Ubah pesan Protobuf menjadi array byte (menggunakan pengkodean Protobuf) dengan menggunakan SDK Google Protocol Buffer resmi untuk bahasa pemrograman pilihan Anda.

  2. Dapatkan panjang array yang dibuat selama langkah 1. Buat array byte baru dari integer ini. Balikkan array byte baru.

  3. Gabungkan array byte baru dan array byte yang berisi pesan Protobuf asli.

  4. Kirim array yang digabungkan ke aliran koneksi.

Contoh di bawah ini menunjukkan bagaimana langkah-langkah ini dilakukan dalam SDK Open API resmi.

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

Python example

Menggunakan Twisted, contoh Python melakukan hampir semua operasi yang sama dengan contoh C#. client.send(request) dapat dijelaskan sebagai berikut.

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
Untuk mengirim pesan, SDK Python menggunakan metode Protocol.transport.write().

Baca pesan

Kode Asinkron

Semua SDK Open API mengandalkan eksekusi asinkron, artinya mereka tidak menunggu pesan tiba tetapi bereaksi terhadap pesan yang tiba secara dinamis. Sebagai hasilnya, menerima pesan biasanya dilakukan melalui penangan acara.

Untuk membaca pesan, Anda harus melakukan serangkaian tindakan yang membalikkan langkah-langkah yang diperlukan untuk mengirim pesan.

  1. Terima empat byte pertama dari array byte (ingat, mereka menunjukkan panjang pesan). Balikkan empat byte ini dan ubah menjadi integer.

  2. Baca X jumlah byte dari aliran di mana X adalah integer yang Anda dapatkan pada langkah 1.

  3. Gunakan SDK Google Protobuf untuk mendeserialisasi pesan menjadi ProtoMessage yang valid.

  4. Gunakan bidang payloadType dari objek ProtoMessage untuk menemukan jenis aktualnya. Melalui SDK Google Protobuf, ubah ProtoMessage menjadi objek dari jenis ProtoOA... yang dibutuhkan.

Cuplikan kode di bawah ini menunjukkan bagaimana SDK Open API resmi mendekati pembacaan pesan.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

Contoh Python

Dalam contoh Python, semua operasi dengan byte saat menerima pesan ditangani oleh metode dataReceived() seperti yang ditunjukkan di bawah ini.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

Gunakan WebSocket

Kirim pesan

Untuk mengirim pesan melalui koneksi WebSocket, lakukan tindakan berikut:

  1. Serialisasi pesan ke dalam format data yang sesuai (misalnya, string).

  2. Tambahkan pesan yang telah diserialisasi ke antrian kirim Anda.

Contoh di bawah ini menunjukkan bagaimana tindakan ini dilakukan dalam SDK Open API resmi.

SDK C# menggunakan kelas WebsocketClient yang merupakan bagian dari paket Websocket.Client. Seperti yang ditunjukkan di bawah ini, metode WebsocketClient.Send() bekerja sebagai berikut.

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

Seperti yang dapat Anda lihat, klien hanya menambahkan segmen array ke antrian kirim.

SDK Python tidak mendukung standar WebSocket.

Terima pesan

Untuk menerima pesan melalui koneksi WebSocket, lakukan hal berikut:

  1. Ambil data yang diterima dari backend cTrader.

  2. Deserialisasi data menjadi pesan Protobuf yang valid.

Untuk ilustrasi bagaimana ini dilakukan dalam SDK Open API resmi, lihat cuplikan di bawah ini.

Untuk menerima pesan, WebsocketClient perlu berlangganan fungsi callback yang menangani apa yang dilakukan klien saat menerima pesan baru.

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

Sebelum berlangganan, SDK .NET mem-parsing pesan menjadi pesan Protobuf. Langganan yang diperlukan ditambahkan dalam tubuh callback ConnectWebSocket().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

Dalam callback OnNext(), ProtoMessage diteruskan ke callback MessageFactory.GetMessage().

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

SDK Python tidak mendukung standar WebSocket.