Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the transport and dispatch logic to use configurable settings from RunContext instead of hardcoded constants, specifically for batch intervals, record limits per request, and thread join timeouts. Corresponding unit tests were updated to support these changes using mock contexts. The review feedback recommends adding validation to the new configuration fields in CoreSettings and improving the logging behavior in the transport thread's shutdown sequence to handle potential join timeouts more accurately.
| self._thread.join(timeout=self._finish_join_timeout) | ||
| console.debug("Transport finished.") |
There was a problem hiding this comment.
The log message "Transport finished." is misleading if the join operation times out. It is better to check if the thread is still alive and provide appropriate feedback, as a timeout might indicate that some records were not successfully uploaded before the shutdown.
| self._thread.join(timeout=self._finish_join_timeout) | |
| console.debug("Transport finished.") | |
| self._thread.join(timeout=self._finish_join_timeout) | |
| if self._thread.is_alive(): | |
| console.warning(f"Transport thread did not finish within {self._finish_join_timeout}s timeout.") | |
| else: | |
| console.debug("Transport finished.") |
| batch_interval: float = Field(default=5.0) | ||
| """ | ||
| Batch interval (seconds) for the Transport upload thread. | ||
| Default 5.0s. Use a smaller value (e.g. 0.5) in Converter scenarios for higher throughput. | ||
| """ | ||
|
|
||
| max_records_per_request: int = Field(default=10_000) | ||
| """ | ||
| Maximum number of records per HTTP request in Dispatch. Default 10000. | ||
| """ | ||
|
|
||
| finish_join_timeout: int = Field(default=30) | ||
| """ | ||
| Timeout (seconds) when waiting for the Transport thread to exit on finish. Default 30. | ||
| """ |
There was a problem hiding this comment.
It is recommended to add validation to these numeric fields to ensure they contain sensible values (e.g., positive intervals and non-negative timeouts). This prevents potential runtime errors or infinite loops if invalid values are provided via environment variables.
| batch_interval: float = Field(default=5.0) | |
| """ | |
| Batch interval (seconds) for the Transport upload thread. | |
| Default 5.0s. Use a smaller value (e.g. 0.5) in Converter scenarios for higher throughput. | |
| """ | |
| max_records_per_request: int = Field(default=10_000) | |
| """ | |
| Maximum number of records per HTTP request in Dispatch. Default 10000. | |
| """ | |
| finish_join_timeout: int = Field(default=30) | |
| """ | |
| Timeout (seconds) when waiting for the Transport thread to exit on finish. Default 30. | |
| """ | |
| batch_interval: float = Field(default=5.0, gt=0) | |
| """ | |
| Batch interval (seconds) for the Transport upload thread. | |
| Default 5.0s. Use a smaller value (e.g. 0.5) in Converter scenarios for higher throughput. | |
| """ | |
| max_records_per_request: int = Field(default=10_000, gt=0) | |
| """ | |
| Maximum number of records per HTTP request in Dispatch. Default 10000. | |
| """ | |
| finish_join_timeout: int = Field(default=30, ge=0) | |
| """ | |
| Timeout (seconds) when waiting for the Transport thread to exit on finish. Default 30. | |
| """ |
Description