콘텐츠로 이동

Protobuf 메시지 보내기 및 받기

메시지를 보내고 받는 과정은 TCP와 WebSocket 표준 간에 차이가 있습니다. 아래에서는 두 가지 연결 유형에 대해 이 과정을 자세히 설명합니다.

TCP 사용

메시지 보내기

TCP 연결을 통해 메시지를 보내려면 다음을 수행하세요:

  1. Protobuf 메시지를 선택한 프로그래밍 언어의 공식 Google Protocol Buffer SDK를 사용하여 바이트 배열로 변경합니다(Protobuf 인코딩 사용).

  2. 1단계에서 생성된 배열의 길이를 가져옵니다. 이 정수에서 새 바이트 배열을 만듭니다. 새 바이트 배열을 역순으로 만듭니다.

  3. 새 바이트 배열과 원래 Protobuf 메시지를 포함하는 바이트 배열을 연결합니다.

  4. 연결된 배열을 연결 스트림으로 보냅니다.

아래 예제에서는 이러한 단계가 공식 Open API SDK에서 어떻게 수행되는지 보여줍니다.

1
2
3
4
5
6
7
private async Task WriteTcp(byte[] messageByte, CancellationToken cancellationToken)
{
    byte[] array = BitConverter.GetBytes(messageByte.Length).Reverse().Concat(messageByte)
        .ToArray();
    await _sslStream.WriteAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
    await _sslStream.FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
1
2
3
client = Client(EndPoints.PROTOBUF_LIVE_HOST if hostType.lower() == "live" else EndPoints.PROTOBUF_DEMO_HOST, EndPoints.PROTOBUF_PORT, TcpProtocol)
request = ProtoOAApplicationAuthReq() # Can be any message
deferred = client.send(request)

Python 예제

Twisted를 사용하여 Python 예제는 C# 예제와 거의 동일한 작업을 수행합니다. client.send(request)는 다음과 같이 설명할 수 있습니다.

1
2
3
request = ProtoOAApplicationAuthReq() # Can be any message
requestAsString = request.SerializeToString() # This method is a part of the Google Protobuf SDK
requestAsInt32String = struct.pack("!H", len(requestAsString)) # The message is concatenated with the reversed array
메시지를 보내기 위해 Python SDK는 Protocol.transport.write() 메서드를 사용합니다.

메시지 읽기

비동기 코드

모든 Open API SDK는 비동기 실행에 의존합니다. 즉, 메시지가 도착할 때까지 기다리지 않고 동적으로 도착하는 메시지에 반응합니다. 결과적으로, 메시지를 수신하는 것은 일반적으로 이벤트 핸들러를 통해 수행됩니다.

메시지를 읽으려면 메시지를 보내는 데 필요한 단계를 역순으로 수행해야 합니다.

  1. 바이트 배열의 처음 4바이트를 수신합니다(이들은 메시지 길이를 나타냅니다). 이 4바이트를 역순으로 바꾸고 정수로 변환합니다.

  2. X는 1단계에서 얻은 정수이며, 스트림에서 X만큼의 바이트를 읽습니다.

  3. Google Protobuf SDK를 사용하여 메시지를 유효한 ProtoMessage로 역직렬화합니다.

  4. ProtoMessage 객체의 payloadType 필드를 사용하여 실제 유형을 찾습니다. Google Protobuf SDK를 통해 ProtoMessage를 필요한 ProtoOA... 유형의 객체로 변환합니다.

아래 코드 스니펫은 공식 Open API SDK가 메시지를 읽는 방법을 보여줍니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
_tcpClient = new TcpClient
{
    LingerState = new LingerOption(enable: true, 10)
};
await _tcpClient.ConnectAsync(Host, Port).ConfigureAwait(continueOnCapturedContext: false);
SslStream _sslStream = new SslStream(_tcpClient.GetStream(), leaveInnerStreamOpen: false);
await _sslStream.AuthenticateAsClientAsync(Host).ConfigureAwait(continueOnCapturedContext: false);

        private async void ReadTcp(CancellationToken cancellationToken)
    {
        byte[] dataLength = new byte[4];
        byte[] data = null;
        try
        {
            while (!IsDisposed)
            {
                int num = 0;
                do
                {
                    int count = dataLength.Length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(dataLength, num, count, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < dataLength.Length);
                int length = GetLength(dataLength);
                if (length <= 0)
                {
                    continue;
                }

                data = ArrayPool<byte>.Shared.Rent(length);
                num = 0;
                do
                {
                    int count2 = length - num;
                    int num2 = num;
                    num = num2 + await _sslStream.ReadAsync(data, num, count2, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
                    if (num == 0)
                    {
                        throw new InvalidOperationException("Remote host closed the connection");
                    }
                }
                while (num < length);
                ProtoMessage protoMessage = ProtoMessage.Parser.ParseFrom(data, 0, length);
                ArrayPool<byte>.Shared.Return(data);
                OnNext(protoMessage);
            }
        }
        catch (Exception innerException)
        {
            if (data != null)
            {
                ArrayPool<byte>.Shared.Return(data);
            }

            ReceiveException exception = new ReceiveException(innerException);
            OnError(exception);
        }
    }

    private int GetLength(byte[] lengthBytes)
    {
        Span<byte> span = lengthBytes.AsSpan();
        span.Reverse();
        return BitConverter.ToInt32(span);
    }

Python 예제

Python 예제에서는 메시지를 수신할 때 바이트와 관련된 모든 작업이 아래와 같이 dataReceived() 메서드에 의해 처리됩니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def dataReceived(self, recd):
    """
    Convert int prefixed strings into calls to stringReceived.
    """
    self.recvd = self.recvd + recd
    while len(self.recvd) >= self.prefixLength and not self.paused:
        length ,= struct.unpack(
            self.structFormat, self.recvd[:self.prefixLength])
        if length > self.MAX_LENGTH:
            self.lengthLimitExceeded(length)
            return
        if len(self.recvd) < length + self.prefixLength:
            break
        packet = self.recvd[self.prefixLength:length + self.prefixLength]
        self.recvd = self.recvd[length + self.prefixLength:]
        self.stringReceived(packet)
1
2
3
4
5
6
7
8
def stringReceived(self, data):
    msg = ProtoMessage()
    msg.ParseFromString(data)

    if msg.payloadType == ProtoHeartbeatEvent().payloadType:
        self.heartbeat()
    self.factory.received(msg)
    return data

WebSocket 사용

메시지 보내기

WebSocket 연결을 통해 메시지를 보내려면 다음 작업을 수행하십시오:

  1. 메시지를 적절한 데이터 형식(예: 문자열)으로 직렬화합니다.

  2. 직렬화된 메시지를 전송 큐에 추가합니다.

아래 예제는 공식 Open API SDK에서 이러한 작업이 수행되는 방법을 보여줍니다.

C# SDK는 Websocket.Client 패키지의 일부인 WebsocketClient 클래스를 사용합니다. 아래와 같이 WebsocketClient.Send() 메서드는 다음과 같이 작동합니다.

1
2
3
4
5
public void Send(byte[] message)
{
    Websocket.Client.Validations.Validations.ValidateInput(message, "message");
    _messagesBinaryToSendQueue.Writer.TryWrite(new ArraySegment<byte>(message));
}

보시다시피, 클라이언트는 단순히 전송 큐에 배열 세그먼트를 추가합니다.

Python SDK는 WebSocket 표준을 지원하지 않습니다.

메시지 수신

WebSocket 연결을 통해 메시지를 수신하려면 다음을 수행하십시오:

  1. cTrader 백엔드에서 수신된 데이터를 검색합니다.

  2. 데이터를 유효한 Protobuf 메시지로 역직렬화합니다.

공식 Open API SDK에서 이 작업이 수행되는 방법에 대한 예시는 아래 스니펫을 참조하십시오.

메시지를 수신하기 위해 WebsocketClient는 새 메시지를 수락할 때 클라이언트가 수행할 작업을 처리하는 콜백 함수에 구독해야 합니다.

1
2
var client = new WebsocketClient();
client.MessageReceived.Subscribe(msg => {Console.WriteLine(msg);})

구독하기 전에 .NET SDK는 메시지를 Protobuf 메시지로 파싱합니다. 필요한 구독은 ConnectWebSocket() 콜백의 본문에 추가됩니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
private async Task ConnectWebSocket()
{
    DefaultInterpolatedStringHandler defaultInterpolatedStringHandler = new DefaultInterpolatedStringHandler(7, 2);
    defaultInterpolatedStringHandler.AppendLiteral("wss://");
    defaultInterpolatedStringHandler.AppendFormatted(Host);
    defaultInterpolatedStringHandler.AppendLiteral(":");
    defaultInterpolatedStringHandler.AppendFormatted(Port);
    Uri url = new Uri(defaultInterpolatedStringHandler.ToStringAndClear());
    _websocketClient = new WebsocketClient(url, () => new ClientWebSocket())
    {
        IsTextMessageConversionEnabled = false,
        ReconnectTimeout = null,
        IsReconnectionEnabled = false,
        ErrorReconnectTimeout = null
    };
    _webSocketMessageReceivedDisposable = _websocketClient.MessageReceived.Select((ResponseMessage msg) => ProtoMessage.Parser.ParseFrom(msg.Binary)).Subscribe(new Action<ProtoMessage>(OnNext));
    _webSocketDisconnectionHappenedDisposable = _websocketClient.DisconnectionHappened.Subscribe(new Action<DisconnectionInfo>(OnWebSocketDisconnectionHappened));
    await _websocketClient.StartOrFail();
}

OnNext() 콜백에서 ProtoMessageMessageFactory.GetMessage() 콜백으로 전달됩니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void OnNext(ProtoMessage protoMessage)
{
    foreach (KeyValuePair<int, IObserver<IMessage>> observer2 in _observers)
    {
        observer2.Deconstruct(out var _, out var value);
        IObserver<IMessage> observer = value;
        try
        {
            IMessage message = MessageFactory.GetMessage(protoMessage);
            if (protoMessage.HasClientMsgId || message == null)
            {
                observer.OnNext(protoMessage);
            }

            if (message != null)
            {
                observer.OnNext(message);
            }
        }
        catch (Exception innerException)
        {
            ObserverException exception = new ObserverException(innerException, observer);
            OnError(exception);
        }
    }
}

Python SDK는 WebSocket 표준을 지원하지 않습니다.