I've been working on tax software to help you calculate the taxes on all those hard-earned
losses gains you made on crypto over the last year. It has actually been a very interesting learning experience since there are so many exchanges out there and every one of them seems to have their own special way of building out their API. Earlier, I covered an amazing python module that helps bring some sanity to the chaos - CCXT.
And it has helped speed things along with software development. I just finished a plugin for Binance.com that allows users to pull in most of the information they need to retrieve and process their transaction information from that exchange. However, only Binance.com is available currently, and I would like to make it easier to build out more plugins for more exchanges more quickly.
That's why we decided to abstract out the CCXT goodness into an abstract class that other users can then extend and add exchange-specific logic to. Easy enough, right?
Every exchange seems to have a different way to do things though. One issue with retrieving transaction data from a server is that there could potentially be hundreds if not thousands of transactions that need to be downloaded. That obviously takes up a lot of resources that an exchange wouldn't like to tie up. So, it breaks larger pieces of data into pieces called pages. Moving through this data is done by pagination.
Well, every exchange seems to have its own way of pagination. Binance.com uses date-based pagination while Coinbase uses page-based pagination. Still other exchanges use id-based pagination. All of these pagination methods are essentially while loops that keep pulling data from the server until all the data has been retrieved. Parameters will have to be adjusted on each call of the endpoint based on the data previously retrieved.
My first idea was to build a class that resolved the condition of the while loop. It would take the previous result set, evaluate and return a
bool. In that same function, it would readjust the values of the parameters to pull the new set of data.
def evaluate_loop_expression(self, current_results: Any) -> bool: # First time the loop is executed, results will be None if current_results is None: return True # Did we reach the end of this market? end_market: bool = False if len(current_results): # All times are inclusive self.__since = current_results[len(current_results) - 1][_TIMESTAMP] + 1 else: end_market = True if end_market and self.has_more_markets(): self.__since = self.__exchange_start_time self.next_market() return True return False
Basically, it shifts
self.__since, which marks the first timestamp to start pulling data from forward to the end of the
current_results. Or, if we pulled everything from this market (eg. BTCUSD) we can move on to the next market (eg. ETHUSD). Then, it returns
True if there is more data to pull or
False if not.
This would be the condition evaluated in order to keep the while loop going:
while self.evaluate_loop_expression(results): # Fetch a new set of results with changed parameters. results = client.fetchTrades(self.parameters)
This worked okay. However, one function was serving two purposes and that is a bit of no-no in programming. So, how do you make this more readable and keep the same function?
A lot of things are happening here. We need to change the fields being used to pull data based on previous data and then break out of the while loop if there is no more data to be retrieved.
A simple iterator doesn't seem to work since we need to edit what is being returned by the iterator depending on what the previous results were.
To make something iterable in Python, we need to define
__iter__, which will initialize and return an iterator.
An iterator is a class that implements
__next__, which returns the next set of objects being iterated over.
Although an iterator can be iterable (implement
__iter__), it doesn't have to be. For example,
str is iterable and returns an iterator that iterates over the characters in the string.
So, what we need here is an iterator that we can update on each loop.
What I ended up with was 3 classes:
1) a pagination detail set that was iterable (returns an iterator when
iter() is called on it)
2) a pagination iterator returned from 1) that returns new pagination details when
next() is called on it.
NamedTuple of the pagination details.
This is a lot more work, but it has a much cleaner design.
Here is the
DateBasedPaginationDetailSet version of 1):
class AbstractPaginationDetailSet: def __iter__(self) -> "AbstractPaginationDetailsIterator": raise NotImplementedError("Abstract method") class DateBasedPaginationDetailSet(AbstractPaginationDetailSet): def __init__( self, exchange_start_time: int, limit: Optional[int] = None, markets: Optional[List[str]] = None, params: Optional[Dict[str, Union[int, str, None]]] = None, window: Optional[int] = None, ) -> None: super().__init__() self.__exchange_start_time: int = exchange_start_time self.__limit: Optional[int] = limit self.__markets: Optional[List[str]] = markets self.__params: Optional[Dict[str, Union[int, str, None]]] = params self.__window: Optional[int] = window def __iter__(self) -> "DateBasedPaginationDetailsIterator": return DateBasedPaginationDetailsIterator( self.__exchange_start_time, self.__limit, self.__markets, self.__params, self.__window, )
Notice that this essentially only contains the details needed to build the iterator. This makes it so all we need to do is pass 2-3 parameters to initialize it and the plugin pulling the data will do the rest.
And the iterator that
class AbstractPaginationDetailsIterator: def __init__(self, limit: Optional[int], markets: Optional[List[str]] = None, params: Optional[Dict[str, Union[int, str, None]]] = None) -> None: self.__limit: Optional[int] = limit self.__markets: Optional[List[str]] = markets self.__market_count: int = 0 self.__params: Optional[Dict[str, Union[int, str, None]]] = params def _get_market(self) -> Optional[str]: return self.__markets[self.__market_count] if self.__markets else None def _has_more_markets(self) -> bool: return self.__market_count <= len(self.__markets) if self.__markets else False def _next_market(self) -> None: self.__market_count += 1 def _get_limit(self) -> Optional[int]: return self.__limit def _get_params(self) -> Optional[Dict[str, Union[int, str, None]]]: return self.__params def _get_since(self) -> Optional[int]: return None def update_fetched_elements(self, current_results: Any) -> None: raise NotImplementedError("Abstract method") def __next__(self) -> PaginationDetails: raise NotImplementedError("Abstract method") class DateBasedPaginationDetailsIterator(AbstractPaginationDetailsIterator): def __init__( self, exchange_start_time: int, limit: Optional[int] = None, markets: Optional[List[str]] = None, params: Optional[Dict[str, Union[int, str, None]]] = None, window: Optional[int] = None, ) -> None: super().__init__(limit, markets, params) self.__end_of_data = False self.__since: int = exchange_start_time self.__exchange_start_time: int = exchange_start_time self.__now: int = int(datetime.now().timestamp()) * _MS_IN_SECOND self.__window: int = window if window else _DEFAULT_WINDOW def update_fetched_elements(self, current_results: Any) -> None: end_of_market: bool = False # Update Since if needed otherwise end_of_market if len(current_results): # All times are inclusive self.__since = current_results[len(current_results) - 1][_TIMESTAMP] + 1 elif self.__window: self.__since += self.__window if self.__since > self.__now: end_of_market = True if end_of_market and self._has_more_markets(): # we have reached the end of one market, now let's move on to the next self.__since = self.__exchange_start_time self._next_market() else: self.__end_of_data = True def _is_end_of_data(self) -> bool: return self.__end_of_data def _get_since(self) -> int: return self.__since def _get_end_of_window(self) -> int: return self.__since + self.__window def __next__(self) -> PaginationDetails: while not self._is_end_of_data(): return PaginationDetails( symbol=self._get_market(), since=self._get_since(), limit=self._get_limit(), params=self._get_params(), ) raise StopIteration(self)
Here, I added a
update_fetched_elements to shift the window of time backward if we pulled the record limit (ie. we didn't pull all the records with one call). If the number of retrieve records is under the limit, I just need to move on to the next market if one exists.
And finally the
class PaginationDetails(NamedTuple): symbol: Optional[str] since: Optional[int] limit: Optional[int] params: Optional[Dict[str, Union[int, str, None]]]
Now, we have a custom iterator to use when pulling data via the REST API of the exchange.