Source code for golosscripts.feed

import asyncio
import logging
import statistics
from collections import namedtuple
from datetime import datetime
from enum import Enum
from typing import List, Optional, Union

from golos.utils import parse_time
from golos.witness import Witness
from websockets.exceptions import ConnectionClosedError

from .bitshares_helper import BitSharesHelper
from .functions import fetch_ticker, get_price_usd_gold_cbr, price_troyounce_to_price_1mg
from .golos_helper import GolosHelper

[docs]log = logging.getLogger(__name__)
[docs]bitshares_markets = [ 'RUDEX.GOLOS/GPH', 'RUDEX.GOLOS/RUDEX.BTC', 'RUDEX.GOLOS/RUDEX.USDT',
]
[docs]market_data = namedtuple('price_data', ['price', 'volume', 'market'])
[docs]class PriceSource(Enum):
[docs] graphene = 1
[docs] kuna = 2
[docs]class Metric(Enum):
[docs] median = 1
[docs] mean = 2
[docs] weighted_average = 3
[docs]class FeedUpdater(GolosHelper): """ This class is used to calculate and update 'sbd_exchange_rate' for witness. :param str,list node: golos node to connect to :param str,list keys: witness active keys :param str witness: witness name to update feed for :param bool dry_run: only do price calculation without sending transaction :param str,list node_gph: graphene node :param list markets: list of bitshares markets to use to obtain price, in format ['QUOTE/BASE'] :param str metric: what metric to use to calculate price :param float depth_pct: how deeply measure market for volume :param float threshold_pct: price change threshold to trigger price feed publish :param int interval: how often calculate new price :param float k: correction coefficient, adjusts price up or down :param int max_age: max age of price feed to trigger force-publish """ def __init__( self, node: Union[str, List[str]], keys: Union[str, List[str]], witness: str, dry_run: bool = False, source: str = 'graphene', node_gph: Optional[Union[str, List[str]]] = None, markets: Optional[List[str]] = None, metric: str = 'weighted_average', depth_pct: float = 20.0, threshold_pct: float = 10.0, interval: int = 3600, k: float = 1.0, # noqa: VNE001 max_age: int = 86400, ) -> None: try: self.price_source = getattr(PriceSource, source) except AttributeError: raise ValueError('unknown price source: %s', source) try: self.metric = getattr(Metric, metric) except AttributeError: raise ValueError('unknown metric: %s', metric) # Helper setup super().__init__(nodes=node, keys=keys) if self.price_source == PriceSource.graphene: if not node_gph: raise ValueError('node_gph should be specified') self.bitshares = BitSharesHelper(node=node_gph) # TODO: workaround for https://github.com/xeroc/python-graphenelib/pull/168 self.node_gph = node_gph self.witness = witness self.dry_run = dry_run self.markets = markets or bitshares_markets self.depth_pct = depth_pct self.threshold_pct = threshold_pct / 100 self.interval = interval self.correction = k self.max_age = max_age @staticmethod
[docs] def calc_weighted_average_price(prices: List[market_data]) -> float: """ Calculate weighted average price using "volume" key. :param list prices: list of market_data tuples """ sum_volume = sum(i.volume for i in prices) weighted_average_price = sum(i.price * i.volume / sum_volume for i in prices) return weighted_average_price
@staticmethod
[docs] def is_last_price_too_old(witness_data: Witness, max_age: int) -> bool: """ Check last price update time and return True if older than max_age. :param witness_data: witness object dict, usually :py:class:`golos.witness.Witness` Witness instance :param int max_age: max seconds since last update """ last_update = parse_time(witness_data['last_sbd_exchange_update']) log.debug('last price update: %s', last_update) log.debug('max_age: %s', max_age) delta = datetime.utcnow() - last_update log.debug('time passed since last update: %s seconds', delta.total_seconds()) if delta.total_seconds() > max_age: log.debug('price too old, need update') return True return False
[docs] async def calc_price_gph_golos(self) -> float: """Calculate price GPH/GOLOS using GOLOS markets on bitshares.""" await self.bitshares.connect() price_data = [] for market in self.markets: quote, base = self.bitshares.split_pair(market) target_market = '{}/GPH'.format(quote) price, volume = await self.bitshares.get_price_across_2_markets( target_market, via=base, depth_pct=self.depth_pct ) log.debug('Derived price from market {}: {:.8f} GPH/GOLOS, volume: {:.0f}'.format(market, price, volume)) price_data.append(market_data(price, volume, market)) prices = [element.price for element in price_data if element.price > 0] price_gph_golos_median = statistics.median(prices) log.debug('Median market price: {:.8f} GPH/GOLOS'.format(price_gph_golos_median)) price_gph_golos_mean = statistics.mean(prices) log.debug('Mean market price: {:.8f} GPH/GOLOS'.format(price_gph_golos_mean)) price_gph_golos_wa = self.calc_weighted_average_price(price_data) log.debug('Weighted average market price: {:.8f} GPH/GOLOS'.format(price_gph_golos_wa)) if self.metric == Metric.median: price_gph_golos = price_gph_golos_median elif self.metric == Metric.mean: price_gph_golos = price_gph_golos_mean elif self.metric == Metric.weighted_average: price_gph_golos = price_gph_golos_wa else: raise ValueError('unsupported metric') return price_gph_golos
[docs] async def calc_price_gbg_golos_bitshares(self) -> float: """Calculate price GBG/GOLOS using GOLOS markets on bitshares.""" await self.bitshares.connect() price_usd_gold = await get_price_usd_gold_cbr() log.info('Gold price from cbr.ru: %.5f USD/1mgGOLD', price_usd_gold) market = 'GPH/RUDEX.USDT' price_usd_gph, _ = await self.bitshares.get_market_center_price(market, depth_pct=self.depth_pct) log.debug('Price USD/GPH taken from market %s: %s', market, price_usd_gph) price_gph_gold = price_usd_gold / price_usd_gph log.info('Gold price in GPH: {:.8f} GPH/1mgGOLD'.format(price_gph_gold)) price_gph_golos = await self.calc_price_gph_golos() log.info(f'GOLOS price is: {price_gph_golos:.8f} GPH/GOLOS') price_gold_golos = price_gph_golos / price_gph_gold log.info('Calculated price {:.3f} GBG/GOLOS'.format(price_gold_golos)) return price_gold_golos
[docs] async def calc_price_kuna(self) -> float: """Calculate price GBG/GOLOS using kuna.io GOL ticker.""" price_usd_gold = await get_price_usd_gold_cbr() log.info('Gold price from cbr.ru: %s USD/1mgGOLD', price_usd_gold) ticker = await fetch_ticker('kuna', 'GOL/BTC') price_btc_golos = ticker['last'] log.info(f'BTC/GOLOS: {price_btc_golos:.8f}') ticker = await fetch_ticker('kuna', 'BTC/USDT') price_usd_btc = ticker['last'] log.info(f'USDT/BTC: {price_usd_btc:.8f}') price_btc_gold = price_usd_gold / price_usd_btc log.info(f'BTC/1mgGOLD: {price_btc_gold:.8f}') price_gold_golos = price_btc_golos / price_btc_gold log.info('Calculated price {:.3f} GBG/GOLOS'.format(price_gold_golos)) return price_gold_golos
[docs] async def publish_price(self, force: bool = False) -> None: """ Publish price feed once. :param bool force: force-publish price even if update is not needed """ # flag variable which determine should we actually update price or not need_publish = False # calculate prices if self.price_source == PriceSource.graphene: price = await self.calc_price_gbg_golos_bitshares() elif self.price_source == PriceSource.kuna: price = await self.calc_price_kuna() else: raise ValueError("Unknown price source") witness_data = Witness(self.witness) old_price = self.get_witness_pricefeed(witness_data) median_price = self.converter.sbd_median_price() log.info('Current conversion price: {:.3f}'.format(median_price)) # apply correction if k defined if self.correction != 1: price = price * self.correction log.info('Price after correction: {:.3f}'.format(price)) # check whether our price is too old last_price_update_too_old = self.is_last_price_too_old(witness_data, self.max_age) if last_price_update_too_old: log.info('Our last price update older than max_age, forcing update') need_publish = True # check for price difference between our old price and new price diff_rel = abs((old_price / price) - 1) if diff_rel > self.threshold_pct: log.info('Publishing price, difference is: {:.2%}'.format(diff_rel)) need_publish = True else: log.debug('Price difference is too low, not publishing price') # finally publish price if needed if need_publish or force: if self.dry_run: log.info('--dry-run mode, not publishing price feed') else: final_gbg_price = format(price, '.3f') log.info('Price to publish: %s' % final_gbg_price) self.witness_feed_publish(final_gbg_price, quote='1.000', account=self.witness)
[docs] async def run_forever(self) -> None: """ Run in continuous mode to make price feed updates periodically. Example for python 3.7+: .. code-block:: python feed_updater = FeedUpdater() asyncio.run(feed_updater.run_forever()) """ while True: try: await self.publish_price() except ConnectionClosedError: # TODO: workaround for https://github.com/xeroc/python-graphenelib/pull/168 self.bitshares = BitSharesHelper(node=self.node_gph) continue except Exception: log.exception('Exception in main loop:') log.info('Sleeping for %s seconds', self.interval) await asyncio.sleep(self.interval)