From ac3c7cb62d4064949f5f48808e093cb00221ad0f Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Mon, 3 Feb 2025 12:52:57 +0100 Subject: [PATCH] update TCP Com IF --- pyproject.toml | 3 ++- src/tmtccmd/com/tcp.py | 31 ++++++++++++++++++++++--------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b58748ee..a75d4525 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ dependencies = [ "Deprecated~=1.2", "pyserial~=3.5", "dle-encoder~=0.2.3", - "spacepackets>=0.24.0, <=0.27", + # "spacepackets>=0.24.0, <=0.27", + "spacepackets @ git+/~https://github.com/us-irs/spacepackets-py.git@71f42112e0cea9f6c7d47e5dd34fccbf13e58242", "cfdp-py>=0.1.1, <=0.5", ] diff --git a/src/tmtccmd/com/tcp.py b/src/tmtccmd/com/tcp.py index e1374690..2175390d 100644 --- a/src/tmtccmd/com/tcp.py +++ b/src/tmtccmd/com/tcp.py @@ -12,7 +12,10 @@ from collections import deque from typing import Any, Optional, Sequence -from spacepackets.ccsds.spacepacket import parse_space_packets, PacketId +from spacepackets.ccsds.spacepacket import ( + PacketId, + parse_space_packets_from_deque, +) from tmtccmd.com import ComInterface, SendError from tmtccmd.com.tcpip_utils import EthAddr @@ -115,7 +118,7 @@ def __connect_socket(self): finally: self.__tcp_socket.settimeout(None) - def close(self, args: any = None) -> None: + def close(self, args: Any = None) -> None: if not self.is_open(): return self.__thread_kill_signal.set() @@ -128,7 +131,7 @@ def close(self, args: any = None) -> None: def send(self, data: bytes | bytearray): self.__tc_queue.put(data) - def receive(self, poll_timeout: float = 0) -> list[bytes]: + def receive(self, parameters: float = 0) -> list[bytes]: self.__tm_queue_to_packet_list() tm_packet_list = self.tm_packet_list self.tm_packet_list = [] @@ -139,13 +142,23 @@ def __tm_queue_to_packet_list(self): self.__analysis_queue.append(self.__tm_queue.get()) # TCP is stream based, so there might be broken packets or multiple packets in one recv # call. We parse the space packets contained in the stream here - if self.com_type == TcpCommunicationType.SPACE_PACKETS: - self.tm_packet_list.extend( - parse_space_packets( - analysis_queue=self.__analysis_queue, - packet_ids=self.space_packet_ids, - ) + if self.com_type == TcpCommunicationType.SPACE_PACKETS and self.__analysis_queue: + result = parse_space_packets_from_deque( + analysis_queue=self.__analysis_queue, + packet_ids=self.space_packet_ids, ) + flattened = bytearray() + for packet in result.tm_list: + self.tm_packet_list.append(packet) + while self.__analysis_queue: + flattened.extend(self.__analysis_queue.popleft()) + # Might be spammy, but I consider this a configuration error, and the user + # should be notified about it. + for skipped_range in result.skipped_ranges: + _LOGGER.warning("skipped bytes in received TCP datastream:") + print(flattened[skipped_range.start : skipped_range.stop]) + _LOGGER.warning("list of valid packet IDs might be incomplete") + self.__analysis_queue.append(flattened[result.scanned_bytes :]) else: while self.__analysis_queue: self.tm_packet_list.append(self.__analysis_queue.popleft())